Cloud-agnostic abstraction for storage, messaging, document databases, cache, and email — built for Lyzr microservices.
- Async-first. Every public method is
async def. All five categories use native-async SDK clients (aioboto3,azure.*.aio,motor,redis.asyncio,aiosmtplib) — no thread-pool wrapping. - Drop-in providers. Same interface across AWS, Azure, and self-hosted backends. Swap
s3↔azure_blob(orsqs↔azure_bus,documentdb↔cosmos,redis↔elasticache↔azure_redis,ses↔azure_acs↔smtp) by changing one string. - Multiple auth methods per provider. Static keys, IAM roles, profiles, managed identity, service principals, SAS tokens, mTLS, IAM auth — pick what your microservice already has.
| Category | AWS | Azure | Self-hosted |
|---|---|---|---|
| Storage | S3 | Blob Storage | — |
| Messaging | SQS | Service Bus | — |
| Document DB | DocumentDB | Cosmos DB (MongoDB API) | — |
| Cache | ElastiCache | Azure Cache for Redis | Redis |
| SES | Communication Services | SMTP |
Pick the extras your service needs:
pip install "cloudrift[aws]" # S3 + SQS + DocumentDB + SES + Redis client
pip install "cloudrift[azure]" # Blob + Service Bus + Cosmos + ACS Email + Redis client
pip install "cloudrift[cache]" # Just Redis (any flavour)
pip install "cloudrift[email]" # Just raw SMTP (aiosmtplib)
pip install "cloudrift[all]" # EverythingPython 3.11+.
Every backend is constructed via a factory function and held for the lifetime of the service. Reuse one instance per resource — the underlying client is connection-pooled.
from cloudrift.storage import get_storage
# Construct once at startup
storage = get_storage(
"s3",
bucket="my-bucket",
aws_access_key_id="AKIA...",
aws_secret_access_key="...",
region="us-east-1",
)
# Use anywhere
await storage.upload("docs/hello.txt", b"hello world", content_type="text/plain")
data = await storage.download("docs/hello.txt")
url = await storage.presigned_url("docs/hello.txt", expires_in=3600)
# Release sockets at shutdown
await storage.close()Or as an async context manager (auto-close):
async with get_storage("s3", bucket="b", region="us-east-1") as storage:
await storage.upload("k", b"v")Pick the provider per environment with a single env var:
import os
from cloudrift.storage import get_storage
storage = get_storage(
os.environ["STORAGE_PROVIDER"], # "s3" in prod, "azure_blob" in dev
**{
k.lower().removeprefix("storage_"): v
for k, v in os.environ.items()
if k.startswith("STORAGE_") and k != "STORAGE_PROVIDER"
},
)from cloudrift.storage import get_storage
# AWS S3
s3 = get_storage("s3", bucket="b", region="us-east-1") # IAM role
s3 = get_storage("s3", bucket="b", aws_access_key_id="...", # static keys
aws_secret_access_key="...", region="us-east-1")
s3 = get_storage("s3", bucket="b", profile_name="dev") # ~/.aws/credentials
# Azure Blob
blob = get_storage("azure_blob", connection_string="...", container="c")
blob = get_storage("azure_blob", account_url="https://acct.blob.core.windows.net",
account_key="...", container="c")
blob = get_storage("azure_blob", account_url="...", sas_token="...", container="c")
blob = get_storage("azure_blob", account_url="...", container="c") # managed identity
blob = get_storage("azure_blob", account_url="...", container="c",
tenant_id="...", client_id="...", client_secret="...") # service principalOperations — same on every backend:
await storage.upload(key, data, content_type="application/json")
data: bytes = await storage.download(key)
await storage.delete(key)
exists: bool = await storage.exists(key)
keys: list[str] = await storage.list(prefix="logs/")
url: str = await storage.presigned_url(key, expires_in=3600)
await storage.close()from cloudrift.messaging import get_queue
# AWS SQS
sqs = get_queue("sqs", queue_url="https://sqs.us-east-1.amazonaws.com/.../q",
region="us-east-1")
# Azure Service Bus
bus = get_queue("azure_bus", connection_string="...", queue_name="my-queue")
bus = get_queue("azure_bus", fully_qualified_namespace="ns.servicebus.windows.net",
queue_name="my-queue") # managed identityOperations:
msg_id = await queue.send({"action": "process", "id": 42}, delay=0)
ids = await queue.send_batch([{"n": 1}, {"n": 2}])
messages = await queue.receive(max_messages=10, wait_time=20) # long-poll
for m in messages:
handle_job(m.body)
await queue.delete(m.receipt_handle) # ack (SQS only — see below)
await queue.purge()
await queue.close()Azure Service Bus note:
delete(receipt_handle)raisesNotImplementedErrorbecause Service Bus completes messages via the receiver's lock token, not by handle. Until the abstraction is reworked, complete messages inside a custom receiver loop usingazure-servicebusdirectly, or use thepurge()helper.
get_mongodb(...) returns a configured Motor
AsyncIOMotorClient. Both providers speak the MongoDB wire protocol — AWS
DocumentDB natively, Azure Cosmos via its MongoDB-API endpoint — so the caller
uses Motor's API directly:
from cloudrift.document import get_mongodb
# AWS DocumentDB (MongoDB-compatible)
client = get_mongodb(
"documentdb",
uri="mongodb://user:pass@cluster.docdb.amazonaws.com:27017/?tls=true",
tls_ca_file="/etc/ssl/rds-ca-bundle.pem",
max_pool_size=200,
)
# Azure Cosmos DB (MongoDB API)
client = get_mongodb("cosmos", connection_string="mongodb://...")
client = get_mongodb("cosmos", account="myacct", account_key="...")Operations — full Motor / pymongo surface, no wrappers:
db = client["lyzr"]
users = db["users"]
result = await users.insert_one({"name": "Alice", "age": 30})
doc_id = result.inserted_id
doc = await users.find_one({"name": "Alice"})
async for u in users.find({"age": {"$gte": 18}}).skip(0).limit(100):
...
await users.update_one({"_id": doc_id}, {"$set": {"age": 31}})
await db["events"].delete_many({"v": 1})
total = await users.count_documents({"age": {"$gte": 18}})
# bulk writes, aggregations, change streams, transactions, GridFS — all
# of Motor is available; nothing is hidden behind a wrapper.
client.close()Cosmos auth note. Cosmos for MongoDB (RU) is keys-only at the wire protocol layer — Azure AD tokens are not accepted. Use the connection string from the portal or the account name + account key. Earlier versions of cloudrift exposed managed-identity / service-principal factories for Cosmos that called the SQL API; those have been removed in favour of a single Motor-based path.
from cloudrift.cache import get_cache
# Self-hosted Redis
cache = get_cache("redis", "from_url", url="redis://localhost:6379/0")
cache = get_cache("redis", "from_credentials",
host="redis.internal", port=6379, password="...", db=0)
# AWS ElastiCache
cache = get_cache("elasticache", "from_auth_token",
host="my-cluster.cache.amazonaws.com", auth_token="...")
cache = get_cache("elasticache", "from_iam_auth",
host="my-cluster.cache.amazonaws.com",
username="lyzr-app", region="us-east-1") # SigV4 + auto-refresh
# Azure Cache for Redis
cache = get_cache("azure_redis", "from_access_key",
host="my-cache.redis.cache.windows.net", access_key="...")
cache = get_cache("azure_redis", "from_managed_identity",
host="my-cache.redis.cache.windows.net", username="lyzr-app")Operations — KV, hash, list, counters:
await cache.set("session:abc", b"data", ttl=3600)
value: bytes | None = await cache.get("session:abc")
await cache.delete("session:abc")
await cache.hset("user:1", "name", "Alice")
fields = await cache.hgetall("user:1")
await cache.lpush("jobs", "job-1", "job-2")
batch = await cache.lrange("jobs", 0, 99)
count = await cache.incr("hits:home")
ok = await cache.ping()
await cache.close()from cloudrift.email import get_email
# AWS SES (SESv2)
ses = get_email("ses", region="us-east-1", default_from="noreply@example.com") # IAM / env
ses = get_email("ses", aws_access_key_id="AKIA...",
aws_secret_access_key="...", region="us-east-1",
default_from="noreply@example.com") # static keys
ses = get_email("ses", profile_name="dev", region="us-east-1",
default_from="noreply@example.com") # ~/.aws profile
# Azure Communication Services
acs = get_email("azure_acs",
connection_string="endpoint=https://...;accesskey=...",
default_from="DoNotReply@example.com") # connection string
acs = get_email("azure_acs", endpoint="https://x.communication.azure.com",
default_from="DoNotReply@example.com") # managed identity
acs = get_email("azure_acs", endpoint="https://x.communication.azure.com",
tenant_id="...", client_id="...", client_secret="...",
default_from="DoNotReply@example.com") # service principal
# Raw SMTP (SendGrid, Mailgun, Postmark, Office365, MailHog, ...)
smtp = get_email("smtp", host="smtp.sendgrid.net",
username="apikey", password="...",
default_from="noreply@example.com") # STARTTLS, port 587 (default)
smtp = get_email("smtp", mode="tls", host="smtp.example.com", port=465,
username="user", password="pw",
default_from="noreply@example.com") # implicit TLS
smtp = get_email("smtp", mode="plaintext", host="localhost", port=1025,
default_from="noreply@example.test") # MailHog / Mailpit (dev)Operations — same on every backend:
from cloudrift.email import Attachment, EmailMessage
# Single send (text, HTML, or multipart/alternative)
msg_id: str = await email.send(
"alice@example.com",
"Welcome",
body_text="Plain text body",
body_html="<p>HTML body</p>",
cc=["bob@example.com"], bcc=["audit@example.com"],
reply_to=["support@example.com"],
attachments=[Attachment(filename="welcome.pdf",
content=pdf_bytes,
content_type="application/pdf")],
headers={"X-Campaign": "welcome-v2"},
)
# Batch send (loops `send()` by default; subclasses override when the
# provider exposes a true bulk API)
ids: list[str] = await email.send_batch([
EmailMessage(to=["alice@example.com"], subject="hi", body_text="hi"),
EmailMessage(to=["bob@example.com"], subject="hi2", body_html="<b>hi2</b>"),
])
ok: bool = await email.health_check()
await email.close()Default sender. Each backend accepts a
default_fromat construction time; calls that omitfrom_fall back to it. SES requires the sender (address or domain) to be verified; ACS requires the sending domain to be linked to the resource.
Every backend holds one long-lived async client that is reused across all operations. This is the single biggest perf knob:
- Don't call
get_storage(...)inside a request handler. - Do construct it once at app startup and share it (e.g.
app.state.storage, FastAPI dependency, or module-level singleton).
Pool sizes are configurable per backend:
get_storage("s3", bucket="b", region="us-east-1",
max_pool_connections=100, connect_timeout=5.0, read_timeout=30.0)
get_mongodb("documentdb", uri="...",
max_pool_size=200, min_pool_size=10)Always release sockets on shutdown with await backend.close() — or wrap the whole lifetime in async with.
All backends raise from a single hierarchy under cloudrift.core.exceptions:
from cloudrift.core.exceptions import (
ObjectNotFoundError, StoragePermissionError, StorageError,
QueueNotFoundError, MessageSendError, MessagingError,
DocumentConnectionError,
CacheKeyNotFoundError, CacheConnectionError, CacheError,
EmailError, EmailSendError,
RecipientRejectedError, SenderUnverifiedError, EmailThrottledError,
)
try:
await storage.download("missing.txt")
except ObjectNotFoundError:
...Provider-specific exceptions (e.g. botocore.ClientError, azure.core.exceptions.HttpResponseError) are translated to the cloudrift hierarchy at the boundary. The document layer is the exception: get_mongodb(...) returns a Motor client and any operation errors propagate as native pymongo exceptions (e.g. pymongo.errors.OperationFailure, DuplicateKeyError). Connect-time failures still surface as DocumentConnectionError.
The dev extra ships moto and fakeredis so unit tests don't need real cloud credentials:
pip install "cloudrift[dev]"
pytestFor local integration testing of the AWS backends, the suite uses ThreadedMotoServer (LocalStack-style in-process mock) — see tests/test_storage.py for the pattern. Azure backends are tested against Azurite / Service Bus emulators (configure endpoint via the relevant *_url kwarg). For DocumentDB and Cosmos (MongoDB API), tests/test_document.py covers connection construction; for live integration smoke tests, see scripts/test_cosmos_*.py.