Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## What this is

`lyzr-cloudrift` is a cloud-agnostic abstraction layer for Lyzr microservices, covering six categories: **storage**, **messaging**, **document DB**, **cache**, **secrets**, and **pub/sub**. Each category exposes the same interface across AWS, Azure, and (for cache) self-hosted backends, so a service swaps providers by changing a single string. Everything is **async-first** — public methods are `async def`, backed by native-async SDK clients (`aioboto3`, `azure.*.aio`, `motor`, `redis.asyncio`); there is no thread-pool wrapping. The one deliberate exception: the document category also exposes an **optional sync factory** (`get_mongodb_sync`, returning a raw `pymongo.MongoClient`) for services that don't run an event loop.

## Commands

The project uses `uv`. Optional-dependency extras gate which providers install.

```bash
uv sync --extra dev # install with test/lint tooling (matches CI)
uv run pytest tests/ -v # run the full test suite
uv run pytest tests/test_cache.py # single file
uv run pytest tests/test_cache.py::test_set_and_get # single test
uv run ruff check . # lint (line-length 100, target py311)
uv run ruff format . # format
```

Tests run against in-process mocks (`fakeredis`, `moto`/`ThreadedMotoServer`, and recording stand-ins for the Mongo clients), so **no real cloud credentials are ever needed**. `asyncio_mode = "auto"` is set, so `async def test_*` functions need no `@pytest.mark.asyncio` decorator.

CI: pushes to `develop` test + publish to TestPyPI; pushes to `main` test + publish to PyPI (`.github/workflows/`).

## Architecture

Five of the six categories (all but document) are self-contained packages under `cloudrift/` following an identical three-part shape:

1. **`base.py`** — an `ABC` defining the provider-neutral interface (e.g. `StorageBackend`, `CacheBackend`, `MessagingBackend`). All `@abstractmethod`s are async. Concrete, non-abstract helpers (`__aenter__`/`__aexit__`, `health_check`, default `pipeline`) live here too.
2. **Per-provider modules** — e.g. `s3.py` + `azure_blob.py`, `redis_standalone.py` + `redis_elasticache.py` + `redis_azure.py`. Each subclasses the ABC and is constructed **only** via `from_*` classmethods (`from_iam_role`, `from_access_key`, `from_connection_string`, `from_managed_identity`, etc.) — never a bare `__init__` with credentials.
3. **`__init__.py`** — a `get_*` factory function that selects the provider and routes to the right `from_*` constructor.

Provider SDKs are imported **lazily inside the factory branch**, not at module top level, so a service installing only `cloudrift[aws]` never imports Azure packages.

### Document DB is different — no wrappers

The document category deliberately has **no `base.py` ABC and no backend wrapper classes** (they were removed in the v0.2.0 refactor — don't reintroduce them). Both providers speak the MongoDB wire protocol, so `get_mongodb` returns a raw `motor` `AsyncIOMotorClient` and the caller uses Motor's native API directly. `documentdb.py` and `cosmos.py` contain only plain `connect_*` factory functions (`connect_uri`, `connect_credentials`, `connect_tls_cert`; `connect_connection_string`, `connect_account_key`) that build the URI, translate construction failures to `DocumentConnectionError`, and return the client. The sync variant mirrors this exactly: `get_mongodb_sync` returns a raw `pymongo.MongoClient` via identical `connect_*` functions in `documentdb_sync.py`/`cosmos_sync.py`. Cosmos here is the **MongoDB API** (keys-only auth — AAD tokens don't work at the wire-protocol layer), not the SQL/Core API.

### Two factory-dispatch styles — don't conflate them

- **Cache** uses an explicit auth-method argument: `get_cache(provider, auth_method, **kwargs)` where `auth_method` is the literal `from_*` method name (e.g. `get_cache("redis", "from_url", url=...)`).
- **All five other categories** infer the constructor from **which credential keys are present** in `**kwargs`: `get_storage(provider, **kwargs)` calls `from_access_key` if `aws_access_key_id` is present, `from_connection_string` if `connection_string` is present, and falls through to the managed-identity/IAM-role default. When adding an auth method here, add both the constructor (a `from_*` classmethod, or a `connect_*` function for document) and a routing branch in the factory — for document, in **both** `get_mongodb` and `get_mongodb_sync`.

### Redis cache specifics

The three Redis backends (`redis`, `elasticache`, `azure_redis`) share **all** their operation logic through `_RedisMixin` in `cache/base.py` — the per-provider modules contain *only* the `from_*` constructors that build the `aioredis.Redis` client with the right auth (URL, IAM SigV4 auto-refresh, access key, managed identity). A new Redis command is implemented **once** in `_RedisMixin` and added as an `@abstractmethod` on `CacheBackend`; never reimplement it per provider. `CacheBackend.pipeline()` ships a no-atomicity `_SequentialPipeline` fallback that the Redis mixin overrides with a real server-side transactional pipeline.

### Errors

All backends raise from one hierarchy in `cloudrift/core/exceptions.py`, rooted at `CloudRiftError` with a per-category base (`StorageError`, `CacheError`, `MessagingError`, `DocumentError`, `SecretError`, `PubSubError`) and specific subclasses. Provider-native exceptions (`botocore.ClientError`, `RedisError`, `azure.core.exceptions.*`) are **caught and translated to this hierarchy at the backend boundary** — callers should only ever see cloudrift exceptions.

### Lifecycle

Backends hold one long-lived, connection-pooled async client meant to be constructed **once at service startup** and reused — never per-request. Always release sockets with `await backend.close()` or `async with backend:` (the ABCs implement the async-context-manager protocol).

## Provider/category matrix

| Category | Factory | AWS | Azure | Self-hosted |
|---|---|---|---|---|
| Storage | `get_storage` | `s3` | `azure_blob` | — |
| Messaging | `get_queue` | `sqs` | `azure_bus` | — |
| Document DB | `get_mongodb` | `documentdb` | `cosmos` | — |
| Cache | `get_cache` | `elasticache` | `azure_redis` | `redis` |
| Secrets | `get_secrets` | `aws_secrets_manager` | `azure_keyvault` | — |
| Pub/Sub | `get_pubsub` | `sns` | `azure_eventgrid` | — |

## Known abstraction gaps

The interface is uniform but not every operation maps cleanly to every provider. When a provider can't honor a method, it raises `NotImplementedError` rather than silently differing — e.g. Azure Service Bus `delete(receipt_handle)` (Service Bus acks via the receiver's lock token, not a handle). Preserve this fail-loud convention; document the gap in the method docstring as the existing code does.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,25 @@ client.close()
> factories for Cosmos that called the SQL API; those have been removed
> in favour of a single Motor-based path.

### Optional sync client

For services that don't run an event loop, `get_mongodb_sync(...)` returns a
blocking [PyMongo](https://pymongo.readthedocs.io/) `MongoClient` — the sync
driver Motor wraps — with identical provider and auth routing:

```python
from cloudrift.document import get_mongodb_sync

client = get_mongodb_sync("documentdb", uri="mongodb://...")
client = get_mongodb_sync("cosmos", account="myacct", account_key="...")

users = client["lyzr"]["users"]
users.insert_one({"name": "Alice"})
doc = users.find_one({"name": "Alice"})

client.close()
```

---

## Cache
Expand Down
3 changes: 2 additions & 1 deletion cloudrift/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from cloudrift.storage import get_storage
from cloudrift.messaging import get_queue
from cloudrift.document import get_mongodb
from cloudrift.document import get_mongodb, get_mongodb_sync
from cloudrift.cache import get_cache
from cloudrift.secrets import get_secrets
from cloudrift.pubsub import get_pubsub
Expand All @@ -11,6 +11,7 @@
"get_storage",
"get_queue",
"get_mongodb",
"get_mongodb_sync",
"get_cache",
"get_secrets",
"get_pubsub",
Expand Down
47 changes: 46 additions & 1 deletion cloudrift/document/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
client = get_mongodb("documentdb", uri="mongodb://...")
await client["mydb"]["users"].insert_one({"name": "Alice"})
client.close()

An optional synchronous variant, :func:`get_mongodb_sync`, returns a
:class:`pymongo.MongoClient` with identical provider/auth routing for services
that don't run an event loop.
"""
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import MongoClient


def get_mongodb(provider: str, **kwargs) -> AsyncIOMotorClient:
Expand Down Expand Up @@ -52,4 +57,44 @@ def get_mongodb(provider: str, **kwargs) -> AsyncIOMotorClient:
)


__all__ = ["get_mongodb"]
def get_mongodb_sync(provider: str, **kwargs) -> MongoClient:
"""Factory to build a *synchronous* MongoDB client for the given provider.

Same providers and ``connect_*`` routing as :func:`get_mongodb`, but returns
a blocking :class:`pymongo.MongoClient` — for services that don't run an
event loop.

Args:
provider: ``"documentdb"`` or ``"cosmos"``.
**kwargs: Provider-specific config. Routed to the appropriate
``connect_*`` function based on which keys are present.

Examples:
get_mongodb_sync("documentdb", uri="mongodb://...")
get_mongodb_sync("documentdb", host="...", port=27017,
username="u", password="p")
get_mongodb_sync("cosmos", connection_string="mongodb://...")
get_mongodb_sync("cosmos", account="myacct", account_key="...")
"""
if provider == "documentdb":
from cloudrift.document import documentdb_sync

if "uri" in kwargs:
return documentdb_sync.connect_uri(**kwargs)
if "tls_cert_key_file" in kwargs:
return documentdb_sync.connect_tls_cert(**kwargs)
return documentdb_sync.connect_credentials(**kwargs)

if provider == "cosmos":
from cloudrift.document import cosmos_sync

if "connection_string" in kwargs:
return cosmos_sync.connect_connection_string(**kwargs)
return cosmos_sync.connect_account_key(**kwargs)

raise ValueError(
f"Unknown document DB provider: {provider!r}. Choose 'documentdb' or 'cosmos'."
)


__all__ = ["get_mongodb", "get_mongodb_sync"]
88 changes: 88 additions & 0 deletions cloudrift/document/cosmos_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Azure Cosmos DB (MongoDB API) connection factory (synchronous).

Returns a configured :class:`pymongo.MongoClient` — the blocking counterpart of
:mod:`cloudrift.document.cosmos` for services that don't run an event loop.
Identical in shape to :mod:`cloudrift.document.documentdb_sync`.

Only key-based auth is supported here: Cosmos for MongoDB (RU) does not accept
Azure AD tokens at the Mongo wire-protocol layer. Use a connection string from
the portal or build one from the account name + key.

Lifecycle is caller-managed: call ``client.close()`` at shutdown.
"""
from urllib.parse import quote_plus

from pymongo import MongoClient

from cloudrift.core.exceptions import DocumentConnectionError


def connect_connection_string(
connection_string: str,
*,
max_pool_size: int = 100,
min_pool_size: int = 0,
) -> MongoClient:
"""Connect using a Cosmos MongoDB-API connection string from the Azure portal.

The portal exposes this under *Connection strings* on a Cosmos account
configured for the MongoDB API.

Args:
connection_string: Mongo-format URI from the Cosmos portal.
max_pool_size: Max connection pool size. Overrides any ``maxPoolSize``
in the connection string.
min_pool_size: Min connection pool size. Overrides any ``minPoolSize``
in the connection string.
"""
try:
return MongoClient(
connection_string,
maxPoolSize=max_pool_size,
minPoolSize=min_pool_size,
)
except Exception as e:
raise DocumentConnectionError(f"Failed to connect to Cosmos DB: {e}") from e


def connect_account_key(
account: str,
account_key: str,
*,
port: int = 10255,
app_name: str | None = None,
max_pool_size: int = 100,
min_pool_size: int = 0,
) -> MongoClient:
"""Build a Cosmos MongoDB-API URI from the account name and key.

Args:
account: Cosmos account name (the leftmost label of the host, i.e.
``<account>.mongo.cosmos.azure.com``).
account_key: Primary or secondary account key.
port: Mongo-API port (default ``10255``).
app_name: Optional ``appName`` URI parameter (Cosmos uses it for
telemetry and routing). Defaults to ``@<account>@``.
max_pool_size: Max connection pool size.
min_pool_size: Min connection pool size.
"""
user = quote_plus(account)
pwd = quote_plus(account_key)
host = f"{account}.mongo.cosmos.azure.com"
app = app_name if app_name is not None else f"@{account}@"
query = (
"ssl=true"
"&replicaSet=globaldb"
"&retryWrites=false"
"&maxIdleTimeMS=120000"
f"&appName={quote_plus(app)}"
)
uri = f"mongodb://{user}:{pwd}@{host}:{port}/?{query}"
try:
return MongoClient(
uri,
maxPoolSize=max_pool_size,
minPoolSize=min_pool_size,
)
except Exception as e:
raise DocumentConnectionError(f"Failed to connect to Cosmos DB: {e}") from e
110 changes: 110 additions & 0 deletions cloudrift/document/documentdb_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""AWS DocumentDB connection factory (synchronous).

Returns a configured :class:`pymongo.MongoClient` — the blocking counterpart of
:mod:`cloudrift.document.documentdb` for services that don't run an event loop.
The caller selects database and collection (``client[db][collection]``) and
uses PyMongo's native API directly.

Lifecycle is caller-managed: call ``client.close()`` at shutdown.
"""
from urllib.parse import quote_plus

from pymongo import MongoClient

from cloudrift.core.exceptions import DocumentConnectionError


def connect_uri(
uri: str,
*,
tls_ca_file: str | None = None,
max_pool_size: int = 100,
min_pool_size: int = 0,
**client_kwargs,
) -> MongoClient:
"""Connect using a full MongoDB-compatible URI."""
kwargs: dict = {"maxPoolSize": max_pool_size, "minPoolSize": min_pool_size}
if tls_ca_file:
kwargs["tlsCAFile"] = tls_ca_file
kwargs.update(client_kwargs)
try:
return MongoClient(uri, **kwargs)
except Exception as e:
raise DocumentConnectionError(f"Failed to connect to DocumentDB: {e}") from e


def connect_credentials(
host: str,
port: int,
username: str,
password: str,
*,
tls: bool = True,
tls_ca_file: str | None = None,
max_pool_size: int = 100,
min_pool_size: int = 0,
) -> MongoClient:
"""Connect using explicit host, port, username, and password.

Args:
host: DocumentDB cluster endpoint hostname.
port: Port number (DocumentDB default: 27017).
username: Database username.
password: Database password.
tls: Enable TLS (default ``True``; required for AWS DocumentDB).
tls_ca_file: Optional path to the CA certificate bundle (PEM).
max_pool_size: Max connection pool size.
min_pool_size: Min connection pool size.
"""
uri = f"mongodb://{quote_plus(username)}:{quote_plus(password)}@{host}:{port}/"
kwargs: dict = {
"tls": tls,
"maxPoolSize": max_pool_size,
"minPoolSize": min_pool_size,
}
if tls_ca_file:
kwargs["tlsCAFile"] = tls_ca_file
try:
return MongoClient(uri, **kwargs)
except Exception as e:
raise DocumentConnectionError(f"Failed to connect to DocumentDB: {e}") from e


def connect_tls_cert(
host: str,
port: int,
username: str,
password: str,
*,
tls_cert_key_file: str,
tls_ca_file: str | None = None,
max_pool_size: int = 100,
min_pool_size: int = 0,
) -> MongoClient:
"""Connect using mutual TLS (mTLS) with a client certificate.

Args:
host: DocumentDB cluster endpoint hostname.
port: Port number (DocumentDB default: 27017).
username: Database username.
password: Database password.
tls_cert_key_file: Path to a PEM file containing the client private key
followed by the client certificate (combined, as required by
pymongo).
tls_ca_file: Optional path to the CA certificate bundle (PEM).
max_pool_size: Max connection pool size.
min_pool_size: Min connection pool size.
"""
uri = f"mongodb://{quote_plus(username)}:{quote_plus(password)}@{host}:{port}/"
kwargs: dict = {
"tls": True,
"tlsCertificateKeyFile": tls_cert_key_file,
"maxPoolSize": max_pool_size,
"minPoolSize": min_pool_size,
}
if tls_ca_file:
kwargs["tlsCAFile"] = tls_ca_file
try:
return MongoClient(uri, **kwargs)
except Exception as e:
raise DocumentConnectionError(f"Failed to connect to DocumentDB: {e}") from e
Loading
Loading