Skip to content

buffer

ReusableBuffer, a subclass of BytesIO, which provides significant speed-ups on serial write by avoiding expensive instantiation time.

  • ReusableBuffer: a thread-safe subclass of io.BytesIO whose getbuffer() returns only the portion of the buffer that has been written.
  • get_thread_local_buffer(): a helper to obtain a per-thread ReusableBuffer instance, allowing safe reuse without repeated allocations.

Classes:

Name Description
ReusableBuffer

Thread-safe buffer utilities for in-memory byte storage.

Functions:

Name Description
get_thread_local_buffer

Return a thread-local reusable buffer.

ReusableBuffer

Bases: BytesIO

Thread-safe buffer utilities for in-memory byte storage.

Warning

This ReusableBuffer, although thread-safe, is not re-entrant. Two users on the same thread can still interfere with one another. This risk can sometimes be mitigated by using the lock() method's context manager, which ensures that the buffer is used until available for the user. This can cause deadlocks, however, when two users on the same thread try to acquire the lock simultaneously, like for example:

buffer = ReusableBuffer()
with buffer.lock() as buffer1, buffer.lock() as buffer2:
    ...
Because buffer1 will not release the lock until the end of the context, and buffer2 will block execution within the context until it can acquire the lock, this can lead to a deadlock situation.

Examples:

Creating and writing to a ReusableBuffer:

>>> from stainedglass_core.utils import buffer
>>> buf = buffer.ReusableBuffer()
>>> buf.write(b"hello")
5
>>> buf.seek(0)
0
>>> buf.read(2)
b'he'
>>> buf.read(None)  # read the rest
b'llo'

Getting a memoryview of only the written portion:

>>> buf = buffer.ReusableBuffer()
>>> buf.write(b"abcdef")
6
>>> mv = buf.getbuffer()
>>> bytes(mv)
b'abcdef'
>>> del mv  # always release memoryview before truncation or resize

Truncating the buffer:

>>> buf.truncate(3)
3
>>> buf.seek(0)
0
>>> buf.read()
b'abc'

Using seek with different whence values:

>>> buf = buffer.ReusableBuffer()
>>> buf.write(b"12345")
5
>>> buf.seek(-2, io.SEEK_END)  # move 2 bytes before EOF
3
>>> buf.read()
b'45'

Getting a thread-local reusable buffer:

>>> with buffer.get_thread_local_buffer() as local_buf:
...     local_buf.write(b"thread data")
...     local_buf.seek(0)
...     local_buf.read()
11
0
b'thread data'

On subsequent calls in the same thread, the buffer is rewound and reused:

>>> with buffer.get_thread_local_buffer() as same_buf:
...     same_buf.write(b"new data")
...     same_buf.seek(0)
...     same_buf.read()
8
0
b'new data'

Methods:

Name Description
__init__

Initialize the buffer.

getbuffer

Return a view of the written bytes only.

lock

Context manager to acquire the internal lock and reset the buffer for a new user.

read

Read bytes from the buffer.

reset

Reset the buffer to an empty state.

seek

Move the file position.

truncate

Truncate the buffer.

write

Write bytes into the buffer.

__init__

__init__(*args: Any, **kwargs: Any) -> None

Initialize the buffer.

Parameters:

Name Type Description Default

*args

Any

Positional arguments forwarded to io.BytesIO.

required

**kwargs

Any

Keyword arguments forwarded to io.BytesIO.

required

getbuffer

getbuffer() -> <class 'memoryview'>

Return a view of the written bytes only.

The returned memoryview is sliced to the current file position as given by tell(). This prevents exposing uninitialized/unused capacity.

Returns:

Type Description
<class 'memoryview'>

A view of the bytes in the range [0, tell()).

lock

lock() -> collections.abc.Generator[typing_extensions.Self]

Context manager to acquire the internal lock and reset the buffer for a new user.

Examples:

>>> buf = ReusableBuffer()
>>> with buf.lock():
...     buf.write(b"hello")
...     buf.seek(0)
...     print(buf.read())
5
0
b'hello'

Yields:

Type Description
collections.abc.Generator[typing_extensions.Self]

The reset buffer instance with the lock held.

read

read(size: int | None = -1) -> <class 'bytes'>

Read bytes from the buffer.

Parameters:

Name Type Description Default

size

int | None

Maximum number of bytes to read. If -1 or None, read until EOF.

-1

Returns:

Type Description
<class 'bytes'>

The data read. At EOF, returns b"".

reset

reset() -> None

Reset the buffer to an empty state.

seek

seek(pos: int, whence: int = 0) -> <class 'int'>

Move the file position.

Parameters:

Name Type Description Default

pos

int

Byte offset relative to whence.

required

whence

int

Reference point: io.SEEK_SET (start), io.SEEK_CUR (current), or io.SEEK_END (end).

0

Returns:

Type Description
<class 'int'>

The new absolute position.

truncate

truncate(size: int | None = None) -> <class 'int'>

Truncate the buffer.

Parameters:

Name Type Description Default

size

int | None

New size in bytes. If None, truncate to the current position.

None

Returns:

Type Description
<class 'int'>

The new size of the buffer in bytes.

write

write(b: Buffer) -> <class 'int'>

Write bytes into the buffer.

Parameters:

Name Type Description Default

b

Buffer

Bytes to write.

required

Returns:

Name Type Description
int <class 'int'>

Number of bytes written.

get_thread_local_buffer

get_thread_local_buffer() -> collections.abc.Generator[
    stainedglass_core.utils.buffer.ReusableBuffer
]

Return a thread-local reusable buffer.

If a buffer does not yet exist for the current thread, it is created. Otherwise, the existing buffer is rewound to position 0 for reuse.

Because the buffer is provided in a context manager, it is automatically released when the context is exited.

Warning

You should never attempt to use the buffer outside of the context manager.

Examples:

Getting a reusable Buffer

>>> with get_thread_local_buffer() as buffer:
...     buffer.write(b"Hello, world!")
...     buffer.seek(0)
...     print(buffer.read())
13
0
b'Hello, world!'

Two threads safely using the same buffer

>>> import threading
>>> def thread_func(thread_num: int, out: dict) -> None:
...     with get_thread_local_buffer() as buffer:
...         string = f"[THREAD {thread_num}] Hello from thread {thread_num}!"
...         buffer.write(string.encode())
...         buffer.seek(0)
...         out["result"] = buffer.read()
>>> results = {}
>>> thread1 = threading.Thread(
...     target=thread_func, args=(1, results.setdefault("t1", {}))
... )
>>> thread2 = threading.Thread(
...     target=thread_func, args=(2, results.setdefault("t2", {}))
... )
>>> thread1.start()
>>> thread2.start()
>>> thread1.join()
>>> thread2.join()
>>> print(results["t1"]["result"])
b'[THREAD 1] Hello from thread 1!'
>>> print(results["t2"]["result"])
b'[THREAD 2] Hello from thread 2!'

Yields:

Type Description
collections.abc.Generator[stainedglass_core.utils.buffer.ReusableBuffer]

A buffer instance scoped to the current thread.