Feat/redis set functions#10
Conversation
shreyas-lyzr
left a comment
There was a problem hiding this comment.
Three issues worth addressing before merge — one is a correctness bug in sinter, one is a data-loss window in the SQS dead_letter emulation, and one is a confusing atomicity claim in the pipeline docstring. The rest of the PR is well-structured and the test coverage is thorough. Security pass below.
Security pass: the aws_secret_access_key="test" hit in the grep is a moto test fixture — not a real credential. No secrets, no CVEs in changed deps (no manifest was modified), no injection or authz gaps.
| async def srem(self, key: str, *members: bytes | str) -> int: | ||
| try: | ||
| return await self._client.srem(key, *members) | ||
| except RedisError as e: |
There was a problem hiding this comment.
Correctness bug — sinter passes a tuple to the redis-py client instead of unpacking it.
return set(await self._client.sinter(keys))redis-py’s sinter takes *keys (variadic), so this sends a single tuple as the sole argument rather than the individual key strings. The call will fail at runtime with a type error or silently return wrong results depending on the redis-py version.
| except RedisError as e: | |
| return set(await self._client.sinter(*keys)) |
This mirrors how every other variadic Redis command in this file is called (e.g. sadd, srem).
| await client.delete_message(QueueUrl=self.queue_url, ReceiptHandle=receipt_handle) | ||
| except ClientError as e: | ||
| self._raise(e) | ||
| else: |
There was a problem hiding this comment.
Data-loss window — send_message to the DLQ and delete_message from the source queue are two separate API calls with no transaction. If the process crashes, network drops, or the DLQ send succeeds but the delete fails, the message ends up in both queues (double-processed) or in neither (lost).
This is an inherent limitation of SQS emulated dead-lettering (no atomic cross-queue move), and the class docstring and the MessagingBackend.dead_letter docstring already acknowledge it at a high level. But callers need the concrete warning at the method level too, otherwise they will be surprised in production:
| else: | |
| async def dead_letter(self, receipt_handle: str, reason: str) -> None: | |
| """Emulated dead-letter for SQS: sends to DLQ then deletes from source. | |
| Warning: these are two separate API calls. If the process dies between | |
| them the message may appear in both queues or be lost. For strict | |
| exactly-once dead-lettering, prefer the native SQS redrive policy and | |
| let the service move the message after maxReceiveCount is reached. | |
| """ |
No behaviour change required — just making the contract explicit.
| pipe.expire("k", 60) | ||
| # commands execute on context exit; results available via | ||
| # ``await pipe.execute()`` if called explicitly inside the block. | ||
|
|
There was a problem hiding this comment.
The docstring says "results available via await pipe.execute() if called explicitly inside the block" but _SequentialPipeline.execute() resets self._ops = [] — so calling it inside the async with block and then having the finally branch call it again will replay an empty list and return [] the second time. That is fine for correctness but the comment implies callers can inspect incremental results mid-block, which they cannot.
Suggestion: drop the parenthetical about calling execute() explicitly, or document that calling it inside the block consumes the queued ops and the finally branch becomes a no-op:
| Usage: | |
| async with cache.pipeline() as pipe: | |
| pipe.sadd("k", "m") | |
| pipe.expire("k", 60) | |
| # commands execute on context exit |
shreyas-lyzr
left a comment
There was a problem hiding this comment.
Three issues worth addressing before merge — one is a correctness bug that can cause duplicate message delivery, one is an exception-masking design flaw, and one is a memory leak. The new set operations and the dead_letter / get_queue_depth additions are well-structured overall.
Security pass: no CVEs in the changed dependencies, no real secrets (the aws_secret_access_key="test" in the test fixture is a moto placeholder, not a real credential), no injection/SSRF/authz gaps in the new code paths.
| "DeadLetterReason": {"DataType": "String", "StringValue": reason} | ||
| }, | ||
| ) | ||
| await client.delete_message(QueueUrl=self.queue_url, ReceiptHandle=receipt_handle) |
There was a problem hiding this comment.
Atomicity gap: if send_message to the DLQ succeeds but delete_message from the source queue fails, the message will be redelivered once its visibility timeout expires and will also exist in the DLQ. The caller has no way to distinguish "both succeeded" from "DLQ write succeeded, source delete failed".
SQS has no transactional two-queue operation, so perfect atomicity is not achievable here — but the current code leaves _pending populated when delete_message raises, then falls through to the except block, so _pending.pop in the else branch never runs. At minimum the except branch should also call self._pending.pop(receipt_handle, None) so the handle isn't left dangling. The docstring on MessagingBackend.dead_letter should document the at-least-once / partially-delivered risk so callers can make the DLQ handler idempotent.
| await client.delete_message(QueueUrl=self.queue_url, ReceiptHandle=receipt_handle) | |
| except ClientError as e: | |
| self._pending.pop(receipt_handle, None) | |
| self._raise(e) |
| pipe = _SequentialPipeline(self) | ||
| try: | ||
| yield pipe | ||
| finally: |
There was a problem hiding this comment.
The finally block runs pipe.execute() even when the with block raised an exception. This means a partially-constructed pipeline (queued before the error) is silently replayed and the caller's exception is suppressed for the duration of that replay. If pipe.execute() itself raises, the original exception is lost entirely.
Use try/except instead so the pipeline only executes on clean exit:
| finally: | |
| pipe = _SequentialPipeline(self) | |
| try: | |
| yield pipe | |
| except BaseException: | |
| raise | |
| else: | |
| await pipe.execute() |
| # receipt_handle → raw message body (JSON string), retained between | ||
| # receive() and delete()/dead_letter() so emulated dead-lettering can | ||
| # re-send the original payload to the DLQ. | ||
| self._pending: dict[str, str] = {} |
There was a problem hiding this comment.
_pending is populated in receive() and only cleared in delete() or dead_letter(). Messages whose visibility timeout expires without being acknowledged (network partition, crashed worker, etc.) are never removed from this dict. In a long-running worker that processes bursts of messages, this is a slow memory leak.
Consider capping the dict or tying cleanup to the SQS visibility timeout — e.g. record the approximate expiry and purge stale entries on the next receive() call, or document clearly that callers must call delete() or dead_letter() for every received message (including failed ones) to avoid the leak.
| ) | ||
| except ClientError as e: | ||
| self._raise(e) | ||
| redrive = response.get("Attributes", {}).get("RedrivePolicy") |
There was a problem hiding this comment.
Static analysis will flag response as potentially unbound on line 237 because self._raise(e) inside the except block does not have a NoReturn annotation, so the type checker cannot prove it always raises. This is a false alarm at runtime (it always raises MessagingError), but adding -> NoReturn to _raise (or switching to raise ... from e inline) will silence the warning and make the intent clear.
shreyas-lyzr
left a comment
There was a problem hiding this comment.
The two fixes pushed since the last review are correct: the sinter unpacking bug is resolved and the misleading pipeline docstring comment is gone. Three issues from the previous round remain open.
Security pass: no new concerns. The aws_secret_access_key="test" in the test fixture is a moto placeholder. No secrets, no CVE-affected deps, no injection or authz gaps.
| yield pipe | ||
| finally: | ||
| await pipe.execute() | ||
|
|
There was a problem hiding this comment.
Still unresolved: the base pipeline() runs pipe.execute() in a finally block, so a partially-constructed pipeline is silently replayed even if the caller's with-block raised an exception. If pipe.execute() itself then raises, the original exception is replaced.
Suggested fix:
| await pipe.execute() |
Actually the full fix is to change the try/finally to try/except/else:
pipe = _SequentialPipeline(self)
try:
yield pipe
except BaseException:
raise
else:
await pipe.execute()
This ensures execute() only runs when the with-block exits cleanly.
| self._raise(e) | ||
| else: | ||
| self._pending.pop(receipt_handle, None) | ||
|
|
There was a problem hiding this comment.
Still unresolved: _pending is only cleared in the else branch, which only runs when both send_message and delete_message succeed. If send_message succeeds but delete_message raises, MessagingError propagates, the else branch is skipped, and the receipt handle stays in _pending indefinitely. The message also ends up in the DLQ and gets redelivered from the source queue once its visibility timeout expires.
Fix: move the pop into finally so it clears on any outcome after a dead_letter attempt:
| self._pending.pop(receipt_handle, None) |
Change the except/else structure to except/finally.
|
One remaining item not addressable as an inline comment since Fix: from typing import NoReturn
def _raise(self, exc: ClientError) -> NoReturn:
...This is non-blocking but worth doing alongside the two code fixes above. |
|



Cache set intersection (sinter) is missing. The other set ops
(sadd/srem/smembers/scard/sismember) were added on
feat/redis_set_functions, but sinter was never included — and it has
never existed in any prior commit/release (git log --all -S sinter returns
nothing; there is no v0.2.2). This corrects the earlier assumption that set
ops were "dropped in 0.2.2" — they were net-new, not a regression.
MessagingBackend.dead_letter(receipt_handle, reason) — no way to route
a poison message to a dead-letter queue. Azure has a native per-message API;
SQS does not and must emulate it.
MessagingBackend.get_queue_depth() — no way to read the approximate
number of waiting messages (needed for autoscaling / backlog monitoring /
alerting).