Skip to content

LYZR-OSS/cloudrift

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

37 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

cloudrift

Cloud-agnostic abstraction for storage, messaging, document databases, cache, and email — built for Lyzr microservices.

  • Async-first. Every public method is async def. All five categories use native-async SDK clients (aioboto3, azure.*.aio, motor, redis.asyncio, aiosmtplib) — no thread-pool wrapping.
  • Drop-in providers. Same interface across AWS, Azure, and self-hosted backends. Swap s3azure_blob (or sqsazure_bus, documentdbcosmos, rediselasticacheazure_redis, sesazure_acssmtp) by changing one string.
  • Multiple auth methods per provider. Static keys, IAM roles, profiles, managed identity, service principals, SAS tokens, mTLS, IAM auth — pick what your microservice already has.
Category AWS Azure Self-hosted
Storage S3 Blob Storage
Messaging SQS Service Bus
Document DB DocumentDB Cosmos DB (MongoDB API)
Cache ElastiCache Azure Cache for Redis Redis
Email SES Communication Services SMTP

Install

Pick the extras your service needs:

pip install "cloudrift[aws]"          # S3 + SQS + DocumentDB + SES + Redis client
pip install "cloudrift[azure]"        # Blob + Service Bus + Cosmos + ACS Email + Redis client
pip install "cloudrift[cache]"        # Just Redis (any flavour)
pip install "cloudrift[email]"        # Just raw SMTP (aiosmtplib)
pip install "cloudrift[all]"          # Everything

Python 3.11+.


Quick start

Every backend is constructed via a factory function and held for the lifetime of the service. Reuse one instance per resource — the underlying client is connection-pooled.

from cloudrift.storage import get_storage

# Construct once at startup
storage = get_storage(
    "s3",
    bucket="my-bucket",
    aws_access_key_id="AKIA...",
    aws_secret_access_key="...",
    region="us-east-1",
)

# Use anywhere
await storage.upload("docs/hello.txt", b"hello world", content_type="text/plain")
data = await storage.download("docs/hello.txt")
url = await storage.presigned_url("docs/hello.txt", expires_in=3600)

# Release sockets at shutdown
await storage.close()

Or as an async context manager (auto-close):

async with get_storage("s3", bucket="b", region="us-east-1") as storage:
    await storage.upload("k", b"v")

Microservice integration

Configuration via env vars

Pick the provider per environment with a single env var:

import os
from cloudrift.storage import get_storage

storage = get_storage(
    os.environ["STORAGE_PROVIDER"],   # "s3" in prod, "azure_blob" in dev
    **{
        k.lower().removeprefix("storage_"): v
        for k, v in os.environ.items()
        if k.startswith("STORAGE_") and k != "STORAGE_PROVIDER"
    },
)

Storage

from cloudrift.storage import get_storage

# AWS S3
s3 = get_storage("s3", bucket="b", region="us-east-1")                       # IAM role
s3 = get_storage("s3", bucket="b", aws_access_key_id="...",                  # static keys
                 aws_secret_access_key="...", region="us-east-1")
s3 = get_storage("s3", bucket="b", profile_name="dev")                       # ~/.aws/credentials

# Azure Blob
blob = get_storage("azure_blob", connection_string="...", container="c")
blob = get_storage("azure_blob", account_url="https://acct.blob.core.windows.net",
                   account_key="...", container="c")
blob = get_storage("azure_blob", account_url="...", sas_token="...", container="c")
blob = get_storage("azure_blob", account_url="...", container="c")           # managed identity
blob = get_storage("azure_blob", account_url="...", container="c",
                   tenant_id="...", client_id="...", client_secret="...")    # service principal

Operations — same on every backend:

await storage.upload(key, data, content_type="application/json")
data: bytes = await storage.download(key)
await storage.delete(key)
exists: bool = await storage.exists(key)
keys: list[str] = await storage.list(prefix="logs/")
url: str = await storage.presigned_url(key, expires_in=3600)
await storage.close()

Messaging

from cloudrift.messaging import get_queue

# AWS SQS
sqs = get_queue("sqs", queue_url="https://sqs.us-east-1.amazonaws.com/.../q",
                region="us-east-1")

# Azure Service Bus
bus = get_queue("azure_bus", connection_string="...", queue_name="my-queue")
bus = get_queue("azure_bus", fully_qualified_namespace="ns.servicebus.windows.net",
                queue_name="my-queue")  # managed identity

Operations:

msg_id = await queue.send({"action": "process", "id": 42}, delay=0)
ids = await queue.send_batch([{"n": 1}, {"n": 2}])

messages = await queue.receive(max_messages=10, wait_time=20)   # long-poll
for m in messages:
    handle_job(m.body)
    await queue.delete(m.receipt_handle)   # ack
    # or: await queue.nack(m.receipt_handle)  # return for immediate redelivery

await queue.purge()
await queue.close()

Azure Service Bus note: receipt handles are lock tokens — they are only valid on the same backend instance that received the message, and only within the message lock duration. SQS receipt handles, by contrast, are plain strings usable from any client.

FIFO queues / ordered delivery

For SQS FIFO queues (URL ending in .fifo) and session-enabled Service Bus queues, pass group_id (ordering key) and dedup_id (deduplication key):

# SQS FIFO — group_id is required, dedup_id optional if the queue has
# content-based deduplication enabled
fifo = get_queue("sqs", queue_url="https://sqs.../jobs.fifo", region="us-east-1")
await fifo.send({"task": "extract"}, group_id="owner-123", dedup_id="evt-abc")

# Azure Service Bus — queue must be created with sessions enabled;
# pass session_enabled=True so the backend uses session receivers
bus = get_queue("azure_bus", connection_string="...", queue_name="jobs",
                session_enabled=True)
await bus.send({"task": "extract"}, group_id="owner-123", dedup_id="evt-abc")

messages = await fifo.receive(max_messages=10, wait_time=20, visibility_timeout=300)
for m in messages:
    print(m.group_id, m.dedup_id, m.receive_count)
    await fifo.delete(m.receipt_handle)            # ack
# Azure only: receive from a specific session
messages = await bus.receive(group_id="owner-123")

Semantic differences to be aware of:

SQS FIFO Azure Service Bus (sessions)
Ordering Per MessageGroupId, groups interleave on receive Per session; receive() without group_id drains one session at a time (NEXT_AVAILABLE_SESSION)
Deduplication Fixed 5-minute window by dedup_id or content hash By message_id, only if the queue enables duplicate detection (window 20s–7d)
Per-message delay Not supported — raises FeatureNotSupportedError Supported (scheduled enqueue)
receive(group_id=...) Not supported — raises FeatureNotSupportedError Supported
visibility_timeout on receive Supported Ignored (lock duration is queue-level config)
nack() change_message_visibility(0) — does not bump receive count until redelivery abandon_message — increments delivery_count

Document Database

get_mongodb(...) returns a configured Motor AsyncIOMotorClient. Both providers speak the MongoDB wire protocol — AWS DocumentDB natively, Azure Cosmos via its MongoDB-API endpoint — so the caller uses Motor's API directly:

from cloudrift.document import get_mongodb

# AWS DocumentDB (MongoDB-compatible)
client = get_mongodb(
    "documentdb",
    uri="mongodb://user:pass@cluster.docdb.amazonaws.com:27017/?tls=true",
    tls_ca_file="/etc/ssl/rds-ca-bundle.pem",
    max_pool_size=200,
)

# Azure Cosmos DB (MongoDB API)
client = get_mongodb("cosmos", connection_string="mongodb://...")
client = get_mongodb("cosmos", account="myacct", account_key="...")

Operations — full Motor / pymongo surface, no wrappers:

db = client["lyzr"]
users = db["users"]

result = await users.insert_one({"name": "Alice", "age": 30})
doc_id = result.inserted_id

doc = await users.find_one({"name": "Alice"})
async for u in users.find({"age": {"$gte": 18}}).skip(0).limit(100):
    ...

await users.update_one({"_id": doc_id}, {"$set": {"age": 31}})
await db["events"].delete_many({"v": 1})
total = await users.count_documents({"age": {"$gte": 18}})

# bulk writes, aggregations, change streams, transactions, GridFS — all
# of Motor is available; nothing is hidden behind a wrapper.

client.close()

Cosmos auth note. Cosmos for MongoDB (RU) is keys-only at the wire protocol layer — Azure AD tokens are not accepted. Use the connection string from the portal or the account name + account key. Earlier versions of cloudrift exposed managed-identity / service-principal factories for Cosmos that called the SQL API; those have been removed in favour of a single Motor-based path.

Optional sync client

For services that don't run an event loop, get_mongodb_sync(...) returns a blocking PyMongo MongoClient — the sync driver Motor wraps — with identical provider and auth routing:

from cloudrift.document import get_mongodb_sync

client = get_mongodb_sync("documentdb", uri="mongodb://...")
client = get_mongodb_sync("cosmos", account="myacct", account_key="...")

users = client["lyzr"]["users"]
users.insert_one({"name": "Alice"})
doc = users.find_one({"name": "Alice"})

client.close()

Cache

from cloudrift.cache import get_cache

# Self-hosted Redis
cache = get_cache("redis", "from_url", url="redis://localhost:6379/0")
cache = get_cache("redis", "from_credentials",
                  host="redis.internal", port=6379, password="...", db=0)

# AWS ElastiCache
cache = get_cache("elasticache", "from_auth_token",
                  host="my-cluster.cache.amazonaws.com", auth_token="...")
cache = get_cache("elasticache", "from_iam_auth",
                  host="my-cluster.cache.amazonaws.com",
                  username="lyzr-app", region="us-east-1")  # SigV4 + auto-refresh

# Azure Cache for Redis
cache = get_cache("azure_redis", "from_access_key",
                  host="my-cache.redis.cache.windows.net", access_key="...")
cache = get_cache("azure_redis", "from_managed_identity",
                  host="my-cache.redis.cache.windows.net", username="lyzr-app")

Operations — KV, hash, list, counters:

await cache.set("session:abc", b"data", ttl=3600)
value: bytes | None = await cache.get("session:abc")
await cache.delete("session:abc")

await cache.hset("user:1", "name", "Alice")
fields = await cache.hgetall("user:1")

await cache.lpush("jobs", "job-1", "job-2")
batch = await cache.lrange("jobs", 0, 99)

count = await cache.incr("hits:home")
ok = await cache.ping()
await cache.close()

Email

from cloudrift.email import get_email

# AWS SES (SESv2)
ses = get_email("ses", region="us-east-1", default_from="noreply@example.com")     # IAM / env
ses = get_email("ses", aws_access_key_id="AKIA...",
                aws_secret_access_key="...", region="us-east-1",
                default_from="noreply@example.com")                                 # static keys
ses = get_email("ses", profile_name="dev", region="us-east-1",
                default_from="noreply@example.com")                                 # ~/.aws profile

# Azure Communication Services
acs = get_email("azure_acs",
                connection_string="endpoint=https://...;accesskey=...",
                default_from="DoNotReply@example.com")                              # connection string
acs = get_email("azure_acs", endpoint="https://x.communication.azure.com",
                default_from="DoNotReply@example.com")                              # managed identity
acs = get_email("azure_acs", endpoint="https://x.communication.azure.com",
                tenant_id="...", client_id="...", client_secret="...",
                default_from="DoNotReply@example.com")                              # service principal

# Raw SMTP (SendGrid, Mailgun, Postmark, Office365, MailHog, ...)
smtp = get_email("smtp", host="smtp.sendgrid.net",
                 username="apikey", password="...",
                 default_from="noreply@example.com")                                # STARTTLS, port 587 (default)
smtp = get_email("smtp", mode="tls", host="smtp.example.com", port=465,
                 username="user", password="pw",
                 default_from="noreply@example.com")                                # implicit TLS
smtp = get_email("smtp", mode="plaintext", host="localhost", port=1025,
                 default_from="noreply@example.test")                               # MailHog / Mailpit (dev)

Operations — same on every backend:

from cloudrift.email import Attachment, EmailMessage

# Single send (text, HTML, or multipart/alternative)
msg_id: str = await email.send(
    "alice@example.com",
    "Welcome",
    body_text="Plain text body",
    body_html="<p>HTML body</p>",
    cc=["bob@example.com"], bcc=["audit@example.com"],
    reply_to=["support@example.com"],
    attachments=[Attachment(filename="welcome.pdf",
                            content=pdf_bytes,
                            content_type="application/pdf")],
    headers={"X-Campaign": "welcome-v2"},
)

# Batch send (loops `send()` by default; subclasses override when the
# provider exposes a true bulk API)
ids: list[str] = await email.send_batch([
    EmailMessage(to=["alice@example.com"], subject="hi",  body_text="hi"),
    EmailMessage(to=["bob@example.com"],   subject="hi2", body_html="<b>hi2</b>"),
])

ok: bool = await email.health_check()
await email.close()

Default sender. Each backend accepts a default_from at construction time; calls that omit from_ fall back to it. SES requires the sender (address or domain) to be verified; ACS requires the sending domain to be linked to the resource.


Connection pooling & lifecycle

Every backend holds one long-lived async client that is reused across all operations. This is the single biggest perf knob:

  • Don't call get_storage(...) inside a request handler.
  • Do construct it once at app startup and share it (e.g. app.state.storage, FastAPI dependency, or module-level singleton).

Pool sizes are configurable per backend:

get_storage("s3", bucket="b", region="us-east-1",
            max_pool_connections=100, connect_timeout=5.0, read_timeout=30.0)

get_mongodb("documentdb", uri="...",
            max_pool_size=200, min_pool_size=10)

Always release sockets on shutdown with await backend.close() — or wrap the whole lifetime in async with.


Errors

All backends raise from a single hierarchy under cloudrift.core.exceptions:

from cloudrift.core.exceptions import (
    ObjectNotFoundError, StoragePermissionError, StorageError,
    QueueNotFoundError, MessageSendError, MessagingError, FeatureNotSupportedError,
    DocumentConnectionError,
    CacheKeyNotFoundError, CacheConnectionError, CacheError,
    EmailError, EmailSendError,
    RecipientRejectedError, SenderUnverifiedError, EmailThrottledError,
)

try:
    await storage.download("missing.txt")
except ObjectNotFoundError:
    ...

Provider-specific exceptions (e.g. botocore.ClientError, azure.core.exceptions.HttpResponseError) are translated to the cloudrift hierarchy at the boundary. The document layer is the exception: get_mongodb(...) returns a Motor client and any operation errors propagate as native pymongo exceptions (e.g. pymongo.errors.OperationFailure, DuplicateKeyError). Connect-time failures still surface as DocumentConnectionError.


Testing

The dev extra ships moto and fakeredis so unit tests don't need real cloud credentials:

pip install "cloudrift[dev]"
pytest

For local integration testing of the AWS backends, the suite uses ThreadedMotoServer (LocalStack-style in-process mock) — see tests/test_storage.py for the pattern. Azure backends are tested against Azurite / Service Bus emulators (configure endpoint via the relevant *_url kwarg). For DocumentDB and Cosmos (MongoDB API), tests/test_document.py covers connection construction; for live integration smoke tests, see scripts/test_cosmos_*.py.

About

Cloud-agnostic Python SDK for extensible cloud service integrations

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages