Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Python bytecode / caches
__pycache__/
*.py[cod]
*$py.class
*.so
.Python

# Virtualenvs
.venv/
venv/
env/
ENV/

# Build / dist artifacts
build/
dist/
*.egg-info/
.eggs/
wheels/
pip-wheel-metadata/

# Test / type-check / lint caches
.pytest_cache/
.mypy_cache/
.ruff_cache/
.tox/
.nox/
.coverage
.coverage.*
htmlcov/
coverage.xml

# Local-only dev / smoke-test scripts (kept out of source control)
scripts/

# Editor / IDE
.vscode/
.idea/
*.swp
*.swo

# OS
.DS_Store
Thumbs.db

# Secrets / env
.env
.env.*
!.env.example
152 changes: 148 additions & 4 deletions cloudrift/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,25 @@ async def exists(self, key: str) -> bool:
"""Return ``True`` if *key* exists."""

@abstractmethod
async def expire(self, key: str, seconds: int) -> bool:
"""Set a timeout on *key*. Returns ``True`` if the timeout was set."""
async def expire(
self,
key: str,
seconds: int,
nx: bool = False,
xx: bool = False,
) -> bool:
"""Set a timeout on *key*. Returns ``True`` if the timeout was set.

Args:
key: Target key.
seconds: TTL in seconds.
nx: Only set the TTL if the key has no existing TTL.
xx: Only set the TTL if the key already has a TTL.

``nx`` and ``xx`` are mutually exclusive. Backends that don't support
these flags natively should emulate them via ``ttl(key)`` and document
that the operation is not atomic.
"""

@abstractmethod
async def ttl(self, key: str) -> int:
Expand All @@ -54,6 +71,39 @@ async def hgetall(self, key: str) -> dict[bytes, bytes]:
async def hdel(self, key: str, *fields: str) -> int:
"""Delete fields from the hash at *key*. Returns number of fields removed."""

@abstractmethod
async def sadd(self, key: str, *members: bytes | str) -> int:
"""Add one or more *members* to the set at *key*.

Returns the number of members that were newly added (i.e. not already
present). This "was-new" signal is the foundation of unique-element
deduplication patterns (e.g. DAU/MAU tracking).
"""

@abstractmethod
async def srem(self, key: str, *members: bytes | str) -> int:
"""Remove one or more *members* from the set at *key*. Returns the number removed."""

@abstractmethod
async def scard(self, key: str) -> int:
"""Return the number of elements in the set at *key*."""

@abstractmethod
async def sismember(self, key: str, member: bytes | str) -> bool:
"""Return ``True`` if *member* is in the set at *key*."""

@abstractmethod
async def smembers(self, key: str) -> "set[bytes]":
"""Return all members of the set at *key*."""

@abstractmethod
async def sinter(self, *keys: str) -> "set[bytes]":
"""Return the members common to all sets at *keys* (set intersection).

With a single key this is equivalent to :meth:`smembers`. A missing key
is treated as an empty set, so any missing key yields an empty result.
"""

@abstractmethod
async def lpush(self, key: str, *values: bytes | str) -> int:
"""Prepend values to the list at *key*. Returns new list length."""
Expand Down Expand Up @@ -102,6 +152,27 @@ async def setex(self, key: str, value: bytes | str, ttl: int) -> None:
"""Atomic set-with-TTL. Default delegates to ``set(key, value, ttl=ttl)``."""
await self.set(key, value, ttl=ttl)

@asynccontextmanager
async def pipeline(self):
"""Batch multiple commands.

Usage:
async with cache.pipeline() as pipe:
pipe.sadd("k", "m")
pipe.expire("k", 60)
# commands execute on context exit

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says "results available via await pipe.execute() if called explicitly inside the block" but _SequentialPipeline.execute() resets self._ops = [] — so calling it inside the async with block and then having the finally branch call it again will replay an empty list and return [] the second time. That is fine for correctness but the comment implies callers can inspect incremental results mid-block, which they cannot.

Suggestion: drop the parenthetical about calling execute() explicitly, or document that calling it inside the block consumes the queued ops and the finally branch becomes a no-op:

Suggested change
Usage:
async with cache.pipeline() as pipe:
pipe.sadd("k", "m")
pipe.expire("k", 60)
# commands execute on context exit

The default implementation queues calls and replays them sequentially
on exit — it provides no atomicity and no round-trip savings. Redis
backends override this with a true server-side pipeline. Callers that
depend on atomicity or batching performance must check the backend.
"""
pipe = _SequentialPipeline(self)
try:
yield pipe
finally:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The finally block runs pipe.execute() even when the with block raised an exception. This means a partially-constructed pipeline (queued before the error) is silently replayed and the caller's exception is suppressed for the duration of that replay. If pipe.execute() itself raises, the original exception is lost entirely.

Use try/except instead so the pipeline only executes on clean exit:

Suggested change
finally:
pipe = _SequentialPipeline(self)
try:
yield pipe
except BaseException:
raise
else:
await pipe.execute()

await pipe.execute()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still unresolved: the base pipeline() runs pipe.execute() in a finally block, so a partially-constructed pipeline is silently replayed even if the caller's with-block raised an exception. If pipe.execute() itself then raises, the original exception is replaced.

Suggested fix:

Suggested change
await pipe.execute()

Actually the full fix is to change the try/finally to try/except/else:

        pipe = _SequentialPipeline(self)
        try:
            yield pipe
        except BaseException:
            raise
        else:
            await pipe.execute()

This ensures execute() only runs when the with-block exits cleanly.

async def health_check(self) -> bool:
"""Return True if the cache server is reachable."""
try:
Expand All @@ -116,6 +187,33 @@ async def __aexit__(self, exc_type, exc, tb) -> None:
await self.close()


class _SequentialPipeline:
"""Default pipeline that records calls and replays them sequentially on execute.

Provides no atomicity and no round-trip savings — exists so the
``pipeline()`` API works on every backend. Redis backends bypass this with
a true server-side pipeline.
"""

def __init__(self, backend: "CacheBackend") -> None:
self._backend = backend
self._ops: list[tuple[str, tuple, dict]] = []

def __getattr__(self, name: str):
def queue(*args, **kwargs):
self._ops.append((name, args, kwargs))
return self
return queue

async def execute(self) -> list:
results = []
ops, self._ops = self._ops, []
for name, args, kwargs in ops:
method = getattr(self._backend, name)
results.append(await method(*args, **kwargs))
return results


class _RedisMixin:
"""Concrete Redis implementation shared by all Redis-backed cache backends.

Expand Down Expand Up @@ -148,9 +246,55 @@ async def exists(self, key: str) -> bool:
except RedisError as e:
raise CacheError(str(e)) from e

async def expire(self, key: str, seconds: int) -> bool:
async def expire(
self,
key: str,
seconds: int,
nx: bool = False,
xx: bool = False,
) -> bool:
if nx and xx:
raise ValueError("expire() flags `nx` and `xx` are mutually exclusive")
try:
return bool(await self._client.expire(key, seconds, nx=nx, xx=xx))
except RedisError as e:
raise CacheError(str(e)) from e

async def sadd(self, key: str, *members: bytes | str) -> int:
try:
return await self._client.sadd(key, *members)
except RedisError as e:
raise CacheError(str(e)) from e

async def srem(self, key: str, *members: bytes | str) -> int:
try:
return await self._client.srem(key, *members)
except RedisError as e:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness bug — sinter passes a tuple to the redis-py client instead of unpacking it.

return set(await self._client.sinter(keys))

redis-py’s sinter takes *keys (variadic), so this sends a single tuple as the sole argument rather than the individual key strings. The call will fail at runtime with a type error or silently return wrong results depending on the redis-py version.

Suggested change
except RedisError as e:
return set(await self._client.sinter(*keys))

This mirrors how every other variadic Redis command in this file is called (e.g. sadd, srem).

raise CacheError(str(e)) from e

async def scard(self, key: str) -> int:
try:
return await self._client.scard(key)
except RedisError as e:
raise CacheError(str(e)) from e

async def sismember(self, key: str, member: bytes | str) -> bool:
try:
return bool(await self._client.sismember(key, member))
except RedisError as e:
raise CacheError(str(e)) from e

async def smembers(self, key: str) -> "set[bytes]":
try:
return await self._client.smembers(key)
except RedisError as e:
raise CacheError(str(e)) from e

async def sinter(self, *keys: str) -> "set[bytes]":
if not keys:
raise ValueError("sinter() requires at least one key")
try:
return bool(await self._client.expire(key, seconds))
return set(await self._client.sinter(*keys))
except RedisError as e:
raise CacheError(str(e)) from e

Expand Down
49 changes: 49 additions & 0 deletions cloudrift/messaging/azure_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,55 @@ async def delete(self, receipt_handle: str) -> None:
pass
del self._receiver_tokens[rid]

async def dead_letter(self, receipt_handle: str, reason: str) -> None:
entry = self._pending.pop(receipt_handle, None)
if entry is None:
raise MessagingError(
f"No pending message for receipt handle: {receipt_handle!r}. "
"Call receive() first and use the returned receipt_handle."
)
receiver, message = entry
try:
await receiver.dead_letter_message(
message, reason=reason, error_description=reason
)
except ResourceNotFoundError as e:
raise QueueNotFoundError(str(e)) from e
except HttpResponseError as e:
raise MessagingError(str(e)) from e
finally:
rid = id(receiver)
if rid in self._receiver_tokens:
_, token_set = self._receiver_tokens[rid]
token_set.discard(receipt_handle)
if not token_set:
try:
await receiver.__aexit__(None, None, None)
except Exception:
pass
del self._receiver_tokens[rid]

async def get_queue_depth(self) -> int:
# Message counts live on the management plane, not the data plane. The
# async admin client lives at azure.servicebus.aio.management (note: NOT
# azure.servicebus.management.aio) and accepts the same async credential.
from azure.servicebus.aio.management import ServiceBusAdministrationClient

if self._connection_string:
admin = ServiceBusAdministrationClient.from_connection_string(
self._connection_string
)
else:
admin = ServiceBusAdministrationClient(self._namespace, credential=self._credential)
try:
async with admin:
props = await admin.get_queue_runtime_properties(self.queue_name)
return props.active_message_count
except ResourceNotFoundError as e:
raise QueueNotFoundError(f"Queue not found: {self.queue_name}") from e
except HttpResponseError as e:
raise MessagingError(str(e)) from e

async def health_check(self) -> bool:
try:
client = await self._ensure()
Expand Down
22 changes: 22 additions & 0 deletions cloudrift/messaging/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,28 @@ async def receive(self, max_messages: int = 1, wait_time: int = 0) -> list[Messa
async def delete(self, receipt_handle: str) -> None:
"""Delete/acknowledge a message by its receipt handle."""

@abstractmethod
async def dead_letter(self, receipt_handle: str, reason: str) -> None:
"""Move a received message to the dead-letter queue and acknowledge it.

Args:
receipt_handle: The receipt handle from a previously received message.
reason: A human-readable reason recorded with the dead-lettered message.

Azure Service Bus implements this natively via ``dead_letter_message``.
SQS has no native per-message dead-letter API, so backends emulate it by
sending the message body to a configured dead-letter queue and then
deleting the original from the source queue.
"""

@abstractmethod
async def get_queue_depth(self) -> int:
"""Return the approximate number of messages waiting in the queue.

This is an estimate: cloud queues report it asynchronously and it may
lag in-flight (received-but-not-yet-deleted) messages.
"""

@abstractmethod
async def purge(self) -> None:
"""Delete all messages in the queue."""
Expand Down
Loading
Loading