buffer
ReusableBuffer, a subclass of BytesIO, which provides significant speed-ups on serial write by avoiding expensive instantiation time.
ReusableBuffer
: a thread-safe subclass ofio.BytesIO
whosegetbuffer()
returns only the portion of the buffer that has been written.get_thread_local_buffer()
: a helper to obtain a per-threadReusableBuffer
instance, allowing safe reuse without repeated allocations.
Classes:
Name | Description |
---|---|
ReusableBuffer |
Thread-safe buffer utilities for in-memory byte storage. |
Functions:
Name | Description |
---|---|
get_thread_local_buffer |
Return a thread-local reusable buffer. |
ReusableBuffer
¶
Bases: BytesIO
Thread-safe buffer utilities for in-memory byte storage.
Warning
This ReusableBuffer, although thread-safe, is not re-entrant. Two users on the same thread can still
interfere with one another. This risk can sometimes be mitigated by using the lock()
method's context manager,
which ensures that the buffer is used until available for the user. This can cause deadlocks, however,
when two users on the same thread try to acquire the lock simultaneously, like for example:
Examples:
Creating and writing to a ReusableBuffer:
>>> from stainedglass_core.utils import buffer
>>> buf = buffer.ReusableBuffer()
>>> buf.write(b"hello")
5
>>> buf.seek(0)
0
>>> buf.read(2)
b'he'
>>> buf.read(None) # read the rest
b'llo'
Getting a memoryview of only the written portion:
>>> buf = buffer.ReusableBuffer()
>>> buf.write(b"abcdef")
6
>>> mv = buf.getbuffer()
>>> bytes(mv)
b'abcdef'
>>> del mv # always release memoryview before truncation or resize
Truncating the buffer:
Using seek with different whence values:
>>> buf = buffer.ReusableBuffer()
>>> buf.write(b"12345")
5
>>> buf.seek(-2, io.SEEK_END) # move 2 bytes before EOF
3
>>> buf.read()
b'45'
Getting a thread-local reusable buffer:
>>> with buffer.get_thread_local_buffer() as local_buf:
... local_buf.write(b"thread data")
... local_buf.seek(0)
... local_buf.read()
11
0
b'thread data'
On subsequent calls in the same thread, the buffer is rewound and reused:
>>> with buffer.get_thread_local_buffer() as same_buf:
... same_buf.write(b"new data")
... same_buf.seek(0)
... same_buf.read()
8
0
b'new data'
Methods:
Name | Description |
---|---|
__init__ |
Initialize the buffer. |
getbuffer |
Return a view of the written bytes only. |
lock |
Context manager to acquire the internal lock and reset the buffer for a new user. |
read |
Read bytes from the buffer. |
reset |
Reset the buffer to an empty state. |
seek |
Move the file position. |
truncate |
Truncate the buffer. |
write |
Write bytes into the buffer. |
__init__
¶
getbuffer
¶
Return a view of the written bytes only.
The returned memoryview is sliced to the current file position as given
by tell()
. This prevents exposing uninitialized/unused capacity.
Returns:
Type | Description |
---|---|
<class 'memoryview'>
|
A view of the bytes in the range |
lock
¶
Context manager to acquire the internal lock and reset the buffer for a new user.
Examples:
>>> buf = ReusableBuffer()
>>> with buf.lock():
... buf.write(b"hello")
... buf.seek(0)
... print(buf.read())
5
0
b'hello'
Yields:
Type | Description |
---|---|
collections.abc.Generator[typing_extensions.Self]
|
The reset buffer instance with the lock held. |
read
¶
seek
¶
truncate
¶
get_thread_local_buffer
¶
get_thread_local_buffer() -> collections.abc.Generator[
stainedglass_core.utils.buffer.ReusableBuffer
]
Return a thread-local reusable buffer.
If a buffer does not yet exist for the current thread, it is created. Otherwise, the existing buffer is rewound to position 0 for reuse.
Because the buffer is provided in a context manager, it is automatically released when the context is exited.
Warning
You should never attempt to use the buffer outside of the context manager.
Examples:
Getting a reusable Buffer
>>> with get_thread_local_buffer() as buffer:
... buffer.write(b"Hello, world!")
... buffer.seek(0)
... print(buffer.read())
13
0
b'Hello, world!'
Two threads safely using the same buffer
>>> import threading
>>> def thread_func(thread_num: int, out: dict) -> None:
... with get_thread_local_buffer() as buffer:
... string = f"[THREAD {thread_num}] Hello from thread {thread_num}!"
... buffer.write(string.encode())
... buffer.seek(0)
... out["result"] = buffer.read()
>>> results = {}
>>> thread1 = threading.Thread(
... target=thread_func, args=(1, results.setdefault("t1", {}))
... )
>>> thread2 = threading.Thread(
... target=thread_func, args=(2, results.setdefault("t2", {}))
... )
>>> thread1.start()
>>> thread2.start()
>>> thread1.join()
>>> thread2.join()
>>> print(results["t1"]["result"])
b'[THREAD 1] Hello from thread 1!'
>>> print(results["t2"]["result"])
b'[THREAD 2] Hello from thread 2!'
Yields:
Type | Description |
---|---|
collections.abc.Generator[stainedglass_core.utils.buffer.ReusableBuffer]
|
A buffer instance scoped to the current thread. |