Skip to content

Commit c242e73

Browse files
committed
Topic Writer Backpressure
1 parent f5a208d commit c242e73

11 files changed

Lines changed: 514 additions & 4 deletions

File tree

AGENTS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ source .venv/bin/activate && tox -e py -- tests/path/to/test_file.py -v
7676

7777
- Update `docs/` for any user-facing changes; create new sections if needed.
7878
- Extend `examples/` when adding new features.
79+
- **After every change to `docs/`**, rebuild the HTML output and verify there are no new errors:
80+
```sh
81+
source .venv/bin/activate && sphinx-build -b html docs docs/_build/html -q
82+
```
7983

8084
## Auto-generated Files — Do NOT Edit
8185

docker-compose.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ version: "3.9"
22
services:
33
ydb:
44
image: ydbplatform/local-ydb:trunk
5-
restart: always
65
ports:
76
- 2136:2136
87
- 2135:2135

docs/topic.rst

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,89 @@ For high-throughput pipelines, buffer writes and gather futures:
259259
raise f.exception()
260260
261261
262+
Writer Backpressure
263+
^^^^^^^^^^^^^^^^^^^
264+
265+
By default the writer's internal buffer is unbounded — ``write()`` always returns immediately
266+
regardless of how many unacknowledged messages are in flight. Enable backpressure by setting
267+
one or both limits:
268+
269+
.. code-block:: python
270+
271+
writer = driver.topic_client.writer(
272+
"/local/my-topic",
273+
max_buffer_size_bytes=50 * 1024 * 1024, # pause when 50 MB in flight
274+
max_buffer_messages=1000, # pause when 1000 messages in flight
275+
)
276+
277+
A message is counted as occupying the buffer from the moment it is passed to ``write()``
278+
until the server acknowledges it. Backpressure is active when **at least one** limit is set;
279+
setting both means either limit can trigger a wait (OR semantics).
280+
281+
The limits are **soft**: ``write()`` blocks only if the buffer is *already* at or above the
282+
limit when the call starts. Once unblocked, the entire batch is admitted regardless of its
283+
size. This means callers that batch multiple messages in a single ``write()`` call will never
284+
deadlock even when the batch is larger than the limit.
285+
286+
**Blocking behavior (default)**
287+
288+
When the buffer is at or above the limit, ``write()`` blocks until enough messages are
289+
acknowledged by the server. There is no timeout by default — the call waits indefinitely:
290+
291+
.. code-block:: python
292+
293+
# Producer pauses here if the buffer is full, then proceeds once space is freed.
294+
writer.write("message")
295+
296+
**Timeout**
297+
298+
Set ``buffer_wait_timeout_sec`` to raise :class:`~ydb.TopicWriterBufferFullError` if space
299+
does not free up in time. Use a positive value to wait up to that many seconds, or ``0`` to
300+
fail immediately without waiting (non-blocking):
301+
302+
.. code-block:: python
303+
304+
writer = driver.topic_client.writer(
305+
"/local/my-topic",
306+
max_buffer_messages=500,
307+
buffer_wait_timeout_sec=5.0, # raise after 5 seconds; use 0 to fail immediately
308+
)
309+
310+
try:
311+
writer.write("message")
312+
except ydb.TopicWriterBufferFullError:
313+
# handle overload — log, drop, or apply back-off
314+
...
315+
316+
**Async client**
317+
318+
The async writer behaves identically — ``await writer.write()`` suspends the coroutine
319+
instead of blocking the thread:
320+
321+
.. code-block:: python
322+
323+
writer = driver.topic_client.writer(
324+
"/local/my-topic",
325+
max_buffer_size_bytes=4 * 1024 * 1024,
326+
buffer_wait_timeout_sec=10.0,
327+
)
328+
329+
try:
330+
await writer.write("message")
331+
except ydb.TopicWriterBufferFullError:
332+
...
333+
334+
To apply your own timeout without raising an error, wrap the call with
335+
``asyncio.wait_for``:
336+
337+
.. code-block:: python
338+
339+
try:
340+
await asyncio.wait_for(writer.write("message"), timeout=2.0)
341+
except asyncio.TimeoutError:
342+
... # timed out waiting for buffer space
343+
344+
262345
Reading Messages
263346
----------------
264347

examples/topic/writer_example.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,23 @@ def send_message_without_block_if_internal_buffer_is_full(writer: ydb.TopicWrite
7171
return False
7272

7373

74+
def writer_with_buffer_limit(db: ydb.Driver, topic_path: str):
75+
"""Writer with backpressure: waits for buffer space, raises TopicWriterBufferFullError on timeout."""
76+
writer = db.topic_client.writer(
77+
topic_path,
78+
producer_id="producer-id",
79+
max_buffer_size_bytes=10 * 1024 * 1024, # 10 MB
80+
buffer_wait_timeout_sec=30.0,
81+
)
82+
try:
83+
writer.write(ydb.TopicWriterMessage("data"))
84+
except ydb.TopicWriterBufferFullError:
85+
# Buffer did not free up within timeout (e.g. server slow or disconnected)
86+
pass # handle: retry, drop, or back off
87+
finally:
88+
writer.close()
89+
90+
7491
def send_messages_with_manual_seqno(writer: ydb.TopicWriter):
7592
writer.write(ydb.TopicWriterMessage("mess")) # send text
7693

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ async def aio_connection(endpoint, database):
158158

159159

160160
@pytest.fixture()
161-
async def driver(endpoint, database, event_loop):
161+
async def driver(endpoint, database):
162162
driver_config = ydb.DriverConfig(
163163
endpoint,
164164
database,
@@ -173,7 +173,7 @@ async def driver(endpoint, database, event_loop):
173173

174174

175175
@pytest.fixture()
176-
async def driver_sync(endpoint, database, event_loop):
176+
async def driver_sync(endpoint, database):
177177
driver_config = ydb.DriverConfig(
178178
endpoint,
179179
database,

tests/topics/test_topic_writer.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import pytest
77

8+
import ydb
89
import ydb.aio
910

1011

@@ -136,6 +137,53 @@ class TestException(Exception):
136137
raise TestException()
137138

138139

140+
@pytest.mark.asyncio
141+
class TestTopicWriterBackpressureAsyncIO:
142+
async def test_write_and_read_with_backpressure_settings(
143+
self, driver: ydb.aio.Driver, topic_path: str, topic_consumer: str
144+
):
145+
messages = [b"msg-1", b"msg-2", b"msg-3"]
146+
147+
async with driver.topic_client.writer(
148+
topic_path,
149+
producer_id="bp-test",
150+
max_buffer_size_bytes=1024 * 1024,
151+
max_buffer_messages=100,
152+
buffer_wait_timeout_sec=10.0,
153+
) as writer:
154+
for data in messages:
155+
await writer.write(ydb.TopicWriterMessage(data=data))
156+
157+
async with driver.topic_client.reader(topic_path, consumer=topic_consumer) as reader:
158+
for expected in messages:
159+
msg = await asyncio.wait_for(reader.receive_message(), timeout=10)
160+
assert msg.data == expected
161+
reader.commit(msg)
162+
163+
164+
class TestTopicWriterBackpressureSync:
165+
def test_write_and_read_with_backpressure_settings(
166+
self, driver_sync: ydb.Driver, topic_path: str, topic_consumer: str
167+
):
168+
messages = [b"msg-1", b"msg-2", b"msg-3"]
169+
170+
with driver_sync.topic_client.writer(
171+
topic_path,
172+
producer_id="bp-sync-test",
173+
max_buffer_size_bytes=1024 * 1024,
174+
max_buffer_messages=100,
175+
buffer_wait_timeout_sec=10.0,
176+
) as writer:
177+
for data in messages:
178+
writer.write(ydb.TopicWriterMessage(data=data))
179+
180+
with driver_sync.topic_client.reader(topic_path, consumer=topic_consumer) as reader:
181+
for expected in messages:
182+
msg = reader.receive_message(timeout=10)
183+
assert msg.data == expected
184+
reader.commit(msg)
185+
186+
139187
class TestTopicWriterSync:
140188
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
141189
writer = driver_sync.topic_client.writer(topic_path, producer_id="test")

ydb/_topic_writer/topic_writer.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,26 @@ class PublicWriterSettings:
3737
encoder_executor: Optional[concurrent.futures.Executor] = None # default shared client executor pool
3838
encoders: Optional[typing.Mapping[PublicCodec, typing.Callable[[bytes], bytes]]] = None
3939
update_token_interval: Union[int, float] = 3600
40+
max_buffer_size_bytes: Optional[int] = None # None = no limit
41+
max_buffer_messages: Optional[int] = None # None = no limit
42+
# Backpressure is enabled when at least one of the limits above is set.
43+
# None = wait indefinitely for buffer space; positive value = raise TopicWriterBufferFullError on timeout.
44+
buffer_wait_timeout_sec: Optional[float] = None
4045

4146
def __post_init__(self):
4247
if self.producer_id is None:
4348
self.producer_id = uuid.uuid4().hex
49+
if self.max_buffer_size_bytes is not None and self.max_buffer_size_bytes <= 0:
50+
raise ValueError("max_buffer_size_bytes must be a positive integer, got %d" % self.max_buffer_size_bytes)
51+
if self.max_buffer_messages is not None and self.max_buffer_messages <= 0:
52+
raise ValueError("max_buffer_messages must be a positive integer, got %d" % self.max_buffer_messages)
53+
if self.buffer_wait_timeout_sec is not None and (
54+
self.buffer_wait_timeout_sec < 0
55+
or self.buffer_wait_timeout_sec != self.buffer_wait_timeout_sec # NaN check
56+
):
57+
raise ValueError(
58+
"buffer_wait_timeout_sec must be a non-negative number, got %r" % self.buffer_wait_timeout_sec
59+
)
4460

4561

4662
@dataclass
@@ -218,6 +234,12 @@ def __init__(self):
218234
super(TopicWriterStopped, self).__init__("topic writer was stopped by call close")
219235

220236

237+
class TopicWriterBufferFullError(TopicWriterError):
238+
"""Raised when write cannot proceed: buffer is full and timeout expired waiting for free space."""
239+
240+
pass
241+
242+
221243
def default_serializer_message_content(data: Any) -> bytes:
222244
if data is None:
223245
return bytes()
@@ -299,6 +321,15 @@ def get_message_size(msg: InternalMessage):
299321
return _split_messages_by_size(messages, connection._DEFAULT_MAX_GRPC_MESSAGE_SIZE, get_message_size)
300322

301323

324+
def internal_message_size_bytes(msg: InternalMessage) -> int:
325+
"""Approximate size in bytes for buffer accounting (data + metadata + overhead).
326+
327+
Uses uncompressed_size so the value stays consistent before and after encoding.
328+
"""
329+
meta_len = sum(len(k) + len(v) for k, v in msg.metadata_items.items()) if msg.metadata_items else 0
330+
return msg.uncompressed_size + meta_len + 64 # 64 bytes overhead per message (seq_no, timestamps, etc.)
331+
332+
302333
def _split_messages_by_size(
303334
messages: List[InternalMessage],
304335
split_size: int,

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
InternalMessage,
2020
TopicWriterStopped,
2121
TopicWriterError,
22+
TopicWriterBufferFullError,
23+
internal_message_size_bytes,
2224
messages_to_proto_requests,
2325
PublicWriteResult,
2426
PublicWriteResultTypes,
@@ -277,6 +279,9 @@ class WriterAsyncIOReconnector:
277279
else:
278280
_stop_reason: asyncio.Future
279281
_init_info: Optional[PublicWriterInitInfo]
282+
_buffer_bytes: int
283+
_buffer_messages: int
284+
_buffer_updated: asyncio.Event
280285

281286
def __init__(
282287
self, driver: SupportedDriverType, settings: WriterSettings, tx: Optional["BaseQueryTxContext"] = None
@@ -317,6 +322,12 @@ def __init__(
317322
self._messages = deque()
318323
self._messages_future = deque()
319324
self._new_messages = asyncio.Queue()
325+
self._backpressure_enabled = (
326+
settings.max_buffer_size_bytes is not None or settings.max_buffer_messages is not None
327+
)
328+
self._buffer_bytes = 0
329+
self._buffer_messages = 0
330+
self._buffer_updated = asyncio.Event()
320331
self._stop_reason = self._loop.create_future()
321332
connection_task = asyncio.create_task(self._connection_loop())
322333
connection_task.set_name("connection_loop")
@@ -371,7 +382,6 @@ async def wait_stop(self) -> BaseException:
371382
return stop_reason
372383

373384
async def write_with_ack_future(self, messages: List[PublicMessage]) -> List[asyncio.Future]:
374-
# todo check internal buffer limit
375385
self._check_stop()
376386

377387
if self._settings.auto_seqno:
@@ -380,6 +390,9 @@ async def write_with_ack_future(self, messages: List[PublicMessage]) -> List[asy
380390
internal_messages = self._prepare_internal_messages(messages)
381391
messages_future = [self._loop.create_future() for _ in internal_messages]
382392

393+
if self._backpressure_enabled:
394+
await self._acquire_buffer_space(internal_messages)
395+
383396
self._messages_future.extend(messages_future)
384397

385398
if self._codec is not None and self._codec == PublicCodec.RAW:
@@ -389,6 +402,46 @@ async def write_with_ack_future(self, messages: List[PublicMessage]) -> List[asy
389402

390403
return messages_future
391404

405+
async def _acquire_buffer_space(self, internal_messages: List[InternalMessage]) -> None:
406+
"""Wait until the buffer is below its limit, then admit the batch (soft-limit semantics).
407+
408+
Blocking starts only when the buffer is already at or above the limit at call time.
409+
Once unblocked, the entire batch is admitted regardless of its size, so callers that
410+
batch messages never get a permanent deadlock.
411+
"""
412+
max_buf = self._settings.max_buffer_size_bytes
413+
max_msgs = self._settings.max_buffer_messages
414+
timeout_sec = self._settings.buffer_wait_timeout_sec
415+
deadline = self._loop.time() + timeout_sec if timeout_sec is not None else None
416+
417+
while True:
418+
self._buffer_updated.clear()
419+
if (max_buf is None or self._buffer_bytes < max_buf) and (
420+
max_msgs is None or self._buffer_messages < max_msgs
421+
):
422+
break
423+
self._check_stop()
424+
if deadline is not None:
425+
assert timeout_sec is not None
426+
remaining = deadline - self._loop.time()
427+
if remaining <= 0:
428+
raise TopicWriterBufferFullError(
429+
"Topic writer buffer full: no free space within %.1f s"
430+
" (buffer_bytes=%d, max_bytes=%s, buffer_msgs=%d, max_msgs=%s)"
431+
% (timeout_sec, self._buffer_bytes, max_buf, self._buffer_messages, max_msgs)
432+
)
433+
try:
434+
await asyncio.wait_for(self._buffer_updated.wait(), timeout=min(0.5, remaining))
435+
except asyncio.TimeoutError:
436+
pass
437+
else:
438+
await self._buffer_updated.wait()
439+
440+
self._check_stop()
441+
new_bytes = sum(internal_message_size_bytes(m) for m in internal_messages)
442+
self._buffer_bytes += new_bytes
443+
self._buffer_messages += len(internal_messages)
444+
392445
def _add_messages_to_send_queue(self, internal_messages: List[InternalMessage]):
393446
self._messages.extend(internal_messages)
394447
for m in internal_messages:
@@ -648,6 +701,10 @@ def _handle_receive_ack(self, ack):
648701
"internal error - receive unexpected ack. Expected seqno: %s, received seqno: %s"
649702
% (current_message.seq_no, ack.seq_no)
650703
)
704+
if self._backpressure_enabled:
705+
self._buffer_bytes = max(0, self._buffer_bytes - internal_message_size_bytes(current_message))
706+
self._buffer_messages = max(0, self._buffer_messages - 1)
707+
self._buffer_updated.set()
651708
write_ack_msg = StreamWriteMessage.WriteResponse.WriteAck
652709
status = ack.message_write_status
653710
if isinstance(status, write_ack_msg.StatusSkipped):
@@ -716,7 +773,9 @@ def _stop(self, reason: BaseException):
716773

717774
for f in self._messages_future:
718775
f.set_exception(reason)
776+
f.exception() # mark as retrieved so asyncio does not log "Future exception was never retrieved"
719777

778+
self._buffer_updated.set() # wake any tasks blocked in _acquire_buffer_space
720779
self._state_changed.set()
721780
logger.info("Stop topic writer %s: %s" % (self._id, reason))
722781

0 commit comments

Comments
 (0)