diff --git a/README.md b/README.md index 96d186a..f73e2c0 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ # cloudrift -Cloud-agnostic abstraction for **storage**, **messaging**, **document databases**, and **cache** — built for Lyzr microservices. +Cloud-agnostic abstraction for **storage**, **messaging**, **document databases**, **cache**, and **email** — built for Lyzr microservices. -- **Async-first.** Every public method is `async def`. All four categories use native-async SDK clients (`aioboto3`, `azure.*.aio`, `motor`, `redis.asyncio`) — no thread-pool wrapping. -- **Drop-in providers.** Same interface across AWS, Azure, and self-hosted backends. Swap `s3` ↔ `azure_blob` (or `sqs` ↔ `azure_bus`, `documentdb` ↔ `cosmos`, `redis` ↔ `elasticache` ↔ `azure_redis`) by changing one string. +- **Async-first.** Every public method is `async def`. All five categories use native-async SDK clients (`aioboto3`, `azure.*.aio`, `motor`, `redis.asyncio`, `aiosmtplib`) — no thread-pool wrapping. +- **Drop-in providers.** Same interface across AWS, Azure, and self-hosted backends. Swap `s3` ↔ `azure_blob` (or `sqs` ↔ `azure_bus`, `documentdb` ↔ `cosmos`, `redis` ↔ `elasticache` ↔ `azure_redis`, `ses` ↔ `azure_acs` ↔ `smtp`) by changing one string. - **Multiple auth methods per provider.** Static keys, IAM roles, profiles, managed identity, service principals, SAS tokens, mTLS, IAM auth — pick what your microservice already has. | Category | AWS | Azure | Self-hosted | @@ -12,6 +12,7 @@ Cloud-agnostic abstraction for **storage**, **messaging**, **document databases* | Messaging | SQS | Service Bus | — | | Document DB | DocumentDB | Cosmos DB (MongoDB API) | — | | Cache | ElastiCache | Azure Cache for Redis | Redis | +| Email | SES | Communication Services | SMTP | --- @@ -20,9 +21,10 @@ Cloud-agnostic abstraction for **storage**, **messaging**, **document databases* Pick the extras your service needs: ```bash -pip install "cloudrift[aws]" # S3 + SQS + DocumentDB + Redis client -pip install "cloudrift[azure]" # Blob + Service Bus + Cosmos + Redis client +pip install "cloudrift[aws]" # S3 + SQS + DocumentDB + SES + Redis client +pip install "cloudrift[azure]" # Blob + Service Bus + Cosmos + ACS Email + Redis client pip install "cloudrift[cache]" # Just Redis (any flavour) +pip install "cloudrift[email]" # Just raw SMTP (aiosmtplib) pip install "cloudrift[all]" # Everything ``` @@ -254,6 +256,74 @@ await cache.close() --- +## Email + +```python +from cloudrift.email import get_email + +# AWS SES (SESv2) +ses = get_email("ses", region="us-east-1", default_from="noreply@example.com") # IAM / env +ses = get_email("ses", aws_access_key_id="AKIA...", + aws_secret_access_key="...", region="us-east-1", + default_from="noreply@example.com") # static keys +ses = get_email("ses", profile_name="dev", region="us-east-1", + default_from="noreply@example.com") # ~/.aws profile + +# Azure Communication Services +acs = get_email("azure_acs", + connection_string="endpoint=https://...;accesskey=...", + default_from="DoNotReply@example.com") # connection string +acs = get_email("azure_acs", endpoint="https://x.communication.azure.com", + default_from="DoNotReply@example.com") # managed identity +acs = get_email("azure_acs", endpoint="https://x.communication.azure.com", + tenant_id="...", client_id="...", client_secret="...", + default_from="DoNotReply@example.com") # service principal + +# Raw SMTP (SendGrid, Mailgun, Postmark, Office365, MailHog, ...) +smtp = get_email("smtp", host="smtp.sendgrid.net", + username="apikey", password="...", + default_from="noreply@example.com") # STARTTLS, port 587 (default) +smtp = get_email("smtp", mode="tls", host="smtp.example.com", port=465, + username="user", password="pw", + default_from="noreply@example.com") # implicit TLS +smtp = get_email("smtp", mode="plaintext", host="localhost", port=1025, + default_from="noreply@example.test") # MailHog / Mailpit (dev) +``` + +**Operations** — same on every backend: + +```python +from cloudrift.email import Attachment, EmailMessage + +# Single send (text, HTML, or multipart/alternative) +msg_id: str = await email.send( + "alice@example.com", + "Welcome", + body_text="Plain text body", + body_html="
HTML body
", + cc=["bob@example.com"], bcc=["audit@example.com"], + reply_to=["support@example.com"], + attachments=[Attachment(filename="welcome.pdf", + content=pdf_bytes, + content_type="application/pdf")], + headers={"X-Campaign": "welcome-v2"}, +) + +# Batch send (loops `send()` by default; subclasses override when the +# provider exposes a true bulk API) +ids: list[str] = await email.send_batch([ + EmailMessage(to=["alice@example.com"], subject="hi", body_text="hi"), + EmailMessage(to=["bob@example.com"], subject="hi2", body_html="hi2"), +]) + +ok: bool = await email.health_check() +await email.close() +``` + +> **Default sender.** Each backend accepts a `default_from` at construction time; calls that omit `from_` fall back to it. SES requires the sender (address or domain) to be verified; ACS requires the sending domain to be linked to the resource. + +--- + ## Connection pooling & lifecycle Every backend holds **one long-lived async client** that is reused across all operations. This is the single biggest perf knob: @@ -285,6 +355,8 @@ from cloudrift.core.exceptions import ( QueueNotFoundError, MessageSendError, MessagingError, DocumentConnectionError, CacheKeyNotFoundError, CacheConnectionError, CacheError, + EmailError, EmailSendError, + RecipientRejectedError, SenderUnverifiedError, EmailThrottledError, ) try: diff --git a/cloudrift/__init__.py b/cloudrift/__init__.py index 5a7f266..caf992a 100644 --- a/cloudrift/__init__.py +++ b/cloudrift/__init__.py @@ -4,6 +4,7 @@ from cloudrift.cache import get_cache from cloudrift.secrets import get_secrets from cloudrift.pubsub import get_pubsub +from cloudrift.email import get_email __version__ = "0.2.0" __all__ = [ @@ -13,4 +14,5 @@ "get_cache", "get_secrets", "get_pubsub", + "get_email", ] diff --git a/cloudrift/core/exceptions.py b/cloudrift/core/exceptions.py index 484f77a..7522694 100644 --- a/cloudrift/core/exceptions.py +++ b/cloudrift/core/exceptions.py @@ -70,3 +70,24 @@ class TopicNotFoundError(PubSubError): class PublishError(PubSubError): """Raised when a message fails to publish.""" + + +# Email exceptions +class EmailError(CloudRiftError): + """Base exception for email operations.""" + + +class EmailSendError(EmailError): + """Raised when an email fails to send.""" + + +class RecipientRejectedError(EmailError): + """Raised when one or more recipients are rejected by the provider.""" + + +class SenderUnverifiedError(EmailError): + """Raised when the From address or domain is not verified with the provider.""" + + +class EmailThrottledError(EmailError): + """Raised when the provider rate-limits the send.""" diff --git a/cloudrift/email/__init__.py b/cloudrift/email/__init__.py new file mode 100644 index 0000000..faf10bc --- /dev/null +++ b/cloudrift/email/__init__.py @@ -0,0 +1,67 @@ +from cloudrift.email.base import Attachment, EmailBackend, EmailMessage + + +def get_email(provider: str, **kwargs) -> EmailBackend: + """Factory to instantiate an email backend. + + Args: + provider: ``"ses"``, ``"azure_acs"``, or ``"smtp"``. + **kwargs: Provider-specific config. The factory routes to the + appropriate ``from_*`` classmethod based on which credential keys + are present. + + Returns: + An :class:`EmailBackend` instance. + + Examples: + get_email("ses", region="us-east-1", default_from="noreply@example.com") + get_email("ses", aws_access_key_id="AKIA...", aws_secret_access_key="...", + default_from="noreply@example.com") + get_email("azure_acs", connection_string="endpoint=https://...;accesskey=...", + default_from="DoNotReply@example.com") + get_email("azure_acs", endpoint="https://...communication.azure.com", + default_from="DoNotReply@example.com") + get_email("smtp", host="smtp.sendgrid.net", username="apikey", password="...", + default_from="noreply@example.com") + get_email("smtp", mode="tls", host="smtp.example.com", port=465, + username="user", password="pw", default_from="...") + """ + if provider == "ses": + from cloudrift.email.ses import AWSSESBackend + + if "aws_access_key_id" in kwargs: + return AWSSESBackend.from_access_key(**kwargs) + if "profile_name" in kwargs: + return AWSSESBackend.from_profile(**kwargs) + return AWSSESBackend.from_iam_role(**kwargs) + + if provider == "azure_acs": + from cloudrift.email.azure_acs import AzureACSEmailBackend + + if "connection_string" in kwargs: + return AzureACSEmailBackend.from_connection_string(**kwargs) + if "client_secret" in kwargs: + return AzureACSEmailBackend.from_service_principal(**kwargs) + return AzureACSEmailBackend.from_managed_identity(**kwargs) + + if provider == "smtp": + from cloudrift.email.smtp import SMTPEmailBackend + + mode = kwargs.pop("mode", "starttls") + if mode == "tls": + return SMTPEmailBackend.from_tls(**kwargs) + if mode == "plaintext": + return SMTPEmailBackend.from_plaintext(**kwargs) + if mode == "starttls": + return SMTPEmailBackend.from_starttls(**kwargs) + raise ValueError( + f"Unknown SMTP mode: {mode!r}. Choose 'plaintext', 'starttls', or 'tls'." + ) + + raise ValueError( + f"Unknown email provider: {provider!r}. " + "Choose 'ses', 'azure_acs', or 'smtp'." + ) + + +__all__ = ["Attachment", "EmailBackend", "EmailMessage", "get_email"] diff --git a/cloudrift/email/azure_acs.py b/cloudrift/email/azure_acs.py new file mode 100644 index 0000000..0159132 --- /dev/null +++ b/cloudrift/email/azure_acs.py @@ -0,0 +1,205 @@ +import asyncio +import base64 + +from cloudrift.core.exceptions import ( + EmailError, + EmailSendError, + EmailThrottledError, + RecipientRejectedError, + SenderUnverifiedError, +) +from cloudrift.email.base import EmailBackend, _as_list + + +class AzureACSEmailBackend(EmailBackend): + """Azure Communication Services Email backend. + + Uses the sync ``azure.communication.email.EmailClient`` SDK (the async + variant is not GA at the time of writing) and wraps blocking calls with + ``asyncio.to_thread`` so the public surface stays async-only. + + Use one of the class methods to construct: + - ``from_connection_string`` — ACS resource connection string + - ``from_managed_identity`` — Managed Identity (system or user-assigned) + - ``from_service_principal`` — Azure AD service principal (client secret) + + Every constructor takes a ``default_from`` (a verified + ``MailFrom`` / ``senderAddress`` on a domain linked to the ACS resource). + """ + + def __init__( + self, + *, + endpoint: str | None, + default_from: str | None, + connection_string: str | None = None, + credential=None, + ) -> None: + self._endpoint = endpoint + self._connection_string = connection_string + self._credential = credential + self._default_from = default_from + self._client = None + self._lock = asyncio.Lock() + + # ------------------------------------------------------------------ + # Factory constructors + # ------------------------------------------------------------------ + + @classmethod + def from_connection_string( + cls, + connection_string: str, + default_from: str | None = None, + ) -> "AzureACSEmailBackend": + """Authenticate with an ACS connection string.""" + return cls( + endpoint=None, + default_from=default_from, + connection_string=connection_string, + ) + + @classmethod + def from_managed_identity( + cls, + endpoint: str, + default_from: str | None = None, + client_id: str | None = None, + ) -> "AzureACSEmailBackend": + """Authenticate via Azure Managed Identity.""" + from azure.identity import ManagedIdentityCredential + + credential = ( + ManagedIdentityCredential(client_id=client_id) + if client_id + else ManagedIdentityCredential() + ) + return cls(endpoint=endpoint, default_from=default_from, credential=credential) + + @classmethod + def from_service_principal( + cls, + endpoint: str, + tenant_id: str, + client_id: str, + client_secret: str, + default_from: str | None = None, + ) -> "AzureACSEmailBackend": + """Authenticate via Azure AD service principal (client secret).""" + from azure.identity import ClientSecretCredential + + credential = ClientSecretCredential( + tenant_id=tenant_id, client_id=client_id, client_secret=client_secret + ) + return cls(endpoint=endpoint, default_from=default_from, credential=credential) + + # ------------------------------------------------------------------ + # Internal lifecycle + # ------------------------------------------------------------------ + + async def _ensure(self): + if self._client is not None: + return self._client + async with self._lock: + if self._client is None: + from azure.communication.email import EmailClient + + if self._connection_string is not None: + self._client = EmailClient.from_connection_string(self._connection_string) + else: + self._client = EmailClient(self._endpoint, self._credential) + return self._client + + async def close(self) -> None: + client, self._client = self._client, None + if client is not None and hasattr(client, "close"): + await asyncio.to_thread(client.close) + if self._credential is not None and hasattr(self._credential, "close"): + # azure.identity sync credentials have a sync close(). + await asyncio.to_thread(self._credential.close) + + # ------------------------------------------------------------------ + # EmailBackend implementation + # ------------------------------------------------------------------ + + async def send( + self, + to, + subject, + *, + body_text=None, + body_html=None, + from_=None, + cc=None, + bcc=None, + reply_to=None, + attachments=None, + headers=None, + ) -> str: + sender = from_ or self._default_from + if not sender: + raise EmailError( + "No sender address: pass from_=... or set default_from on the backend." + ) + if body_text is None and body_html is None: + raise EmailError("send() requires body_text and/or body_html.") + + client = await self._ensure() + + content: dict = {"subject": subject} + if body_text is not None: + content["plainText"] = body_text + if body_html is not None: + content["html"] = body_html + + recipients: dict = {"to": [{"address": addr} for addr in _as_list(to)]} + if cc: + recipients["cc"] = [{"address": addr} for addr in _as_list(cc)] + if bcc: + recipients["bcc"] = [{"address": addr} for addr in _as_list(bcc)] + + message: dict = { + "senderAddress": sender, + "recipients": recipients, + "content": content, + } + if reply_to: + message["replyTo"] = [{"address": addr} for addr in _as_list(reply_to)] + if attachments: + message["attachments"] = [ + { + "name": att.filename, + "contentType": att.content_type, + "contentInBase64": base64.b64encode(att.content).decode("ascii"), + } + for att in attachments + ] + if headers: + message["headers"] = dict(headers) + + try: + poller = await asyncio.to_thread(client.begin_send, message) + result = await asyncio.to_thread(poller.result) + except Exception as e: + self._raise(e) + # ACS returns an object with an ``id`` attr (or dict-like). Be forgiving. + if isinstance(result, dict): + return str(result.get("id") or result.get("messageId") or "") + return str(getattr(result, "id", None) or getattr(result, "message_id", "") or "") + + def _raise(self, exc: Exception): + from azure.core.exceptions import HttpResponseError + + if isinstance(exc, HttpResponseError): + status = getattr(exc, "status_code", None) + message = str(exc) + if status == 429: + raise EmailThrottledError(message) from exc + if status == 403 and "DomainNotLinked" in message: + raise SenderUnverifiedError(message) from exc + if status == 400 and ( + "InvalidRecipient" in message or "InvalidAddress" in message + ): + raise RecipientRejectedError(message) from exc + raise EmailSendError(message) from exc + raise EmailSendError(str(exc)) from exc diff --git a/cloudrift/email/base.py b/cloudrift/email/base.py new file mode 100644 index 0000000..70ddde1 --- /dev/null +++ b/cloudrift/email/base.py @@ -0,0 +1,109 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass, field + + +@dataclass +class Attachment: + """An email attachment. + + ``content`` is the raw payload bytes. ``content_type`` is used directly in + the MIME / provider request — pick the right one (``application/pdf``, + ``image/png``, etc.) so the recipient's mail client renders it correctly. + """ + + filename: str + content: bytes + content_type: str = "application/octet-stream" + + +@dataclass +class EmailMessage: + """An outbound email used by :meth:`EmailBackend.send_batch`. + + ``from_`` falls back to the backend's ``default_from`` when ``None``. + At least one of ``body_text`` / ``body_html`` must be set. + """ + + to: list[str] + subject: str + body_text: str | None = None + body_html: str | None = None + from_: str | None = None + cc: list[str] = field(default_factory=list) + bcc: list[str] = field(default_factory=list) + reply_to: list[str] = field(default_factory=list) + attachments: list[Attachment] = field(default_factory=list) + headers: dict[str, str] = field(default_factory=dict) + + +class EmailBackend(ABC): + """Abstract base class for transactional email backends. + + Backends hold long-lived async clients. Use ``await backend.close()`` (or + ``async with backend:``) to release sockets cleanly. + + Implementations must accept a ``default_from`` at construction time. The + ``from_`` argument on :meth:`send` overrides it per call. + """ + + @abstractmethod + async def send( + self, + to: str | list[str], + subject: str, + *, + body_text: str | None = None, + body_html: str | None = None, + from_: str | None = None, + cc: list[str] | None = None, + bcc: list[str] | None = None, + reply_to: list[str] | None = None, + attachments: list[Attachment] | None = None, + headers: dict[str, str] | None = None, + ) -> str: + """Send a single email. Returns the provider message ID.""" + + async def send_batch(self, messages: list[EmailMessage]) -> list[str]: + """Send a batch of emails. Default implementation loops :meth:`send`. + + Subclasses override only when the provider has a true bulk API. + """ + ids: list[str] = [] + for msg in messages: + ids.append( + await self.send( + msg.to, + msg.subject, + body_text=msg.body_text, + body_html=msg.body_html, + from_=msg.from_, + cc=msg.cc or None, + bcc=msg.bcc or None, + reply_to=msg.reply_to or None, + attachments=msg.attachments or None, + headers=msg.headers or None, + ) + ) + return ids + + async def health_check(self) -> bool: + """Return True if the email backend is reachable.""" + return True + + async def close(self) -> None: + """Close the underlying client and release sockets. Default is a no-op.""" + + async def __aenter__(self) -> "EmailBackend": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.close() + + +def _as_list(value: str | list[str] | None) -> list[str]: + """Normalize a recipient / address field to a list.""" + if value is None: + return [] + if isinstance(value, str): + return [value] + return list(value) diff --git a/cloudrift/email/ses.py b/cloudrift/email/ses.py new file mode 100644 index 0000000..6373755 --- /dev/null +++ b/cloudrift/email/ses.py @@ -0,0 +1,303 @@ +import asyncio +from email.message import EmailMessage as MIMEEmailMessage + +import aioboto3 +from botocore.config import Config +from botocore.exceptions import ClientError + +from cloudrift.core.exceptions import ( + EmailError, + EmailSendError, + EmailThrottledError, + RecipientRejectedError, + SenderUnverifiedError, +) +from cloudrift.email.base import Attachment, EmailBackend, _as_list + + +class AWSSESBackend(EmailBackend): + """AWS SES email backend (native async via ``aioboto3``, SESv2 API). + + A single async SESv2 client is created lazily on first use and reused for + the lifetime of the backend. + + Use one of the class methods to construct: + - ``from_access_key`` — static credentials + - ``from_iam_role`` — instance profile / environment / ECS task role + - ``from_profile`` — named profile from ``~/.aws/credentials`` + + Every constructor takes a ``default_from`` (a verified SES sender). The + ``from_`` argument on :meth:`send` overrides it per call. + """ + + def __init__( + self, + session: aioboto3.Session, + *, + default_from: str | None = None, + endpoint_url: str | None = None, + max_pool_connections: int = 25, + connect_timeout: float = 10.0, + read_timeout: float = 30.0, + client_kwargs: dict | None = None, + ) -> None: + self._session = session + self._default_from = default_from + self._endpoint_url = endpoint_url + self._config = Config( + max_pool_connections=max_pool_connections, + connect_timeout=connect_timeout, + read_timeout=read_timeout, + ) + self._client_kwargs = client_kwargs or {} + self._client_cm = None + self._client = None + self._lock = asyncio.Lock() + + # ------------------------------------------------------------------ + # Factory constructors + # ------------------------------------------------------------------ + + @classmethod + def from_access_key( + cls, + aws_access_key_id: str, + aws_secret_access_key: str, + default_from: str | None = None, + region: str = "us-east-1", + aws_session_token: str | None = None, + endpoint_url: str | None = None, + **kwargs, + ) -> "AWSSESBackend": + """Authenticate with explicit access key / secret.""" + session = aioboto3.Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_session_token=aws_session_token, + region_name=region, + ) + return cls( + session, + default_from=default_from, + endpoint_url=endpoint_url, + **kwargs, + ) + + @classmethod + def from_iam_role( + cls, + default_from: str | None = None, + region: str = "us-east-1", + endpoint_url: str | None = None, + **kwargs, + ) -> "AWSSESBackend": + """Authenticate via IAM role / instance profile / environment variables.""" + session = aioboto3.Session(region_name=region) + return cls( + session, + default_from=default_from, + endpoint_url=endpoint_url, + **kwargs, + ) + + @classmethod + def from_profile( + cls, + profile_name: str, + default_from: str | None = None, + region: str = "us-east-1", + endpoint_url: str | None = None, + **kwargs, + ) -> "AWSSESBackend": + """Authenticate using a named profile from ``~/.aws/credentials``.""" + session = aioboto3.Session(profile_name=profile_name, region_name=region) + return cls( + session, + default_from=default_from, + endpoint_url=endpoint_url, + **kwargs, + ) + + # ------------------------------------------------------------------ + # Internal lifecycle + # ------------------------------------------------------------------ + + async def _ensure(self): + if self._client is not None: + return self._client + async with self._lock: + if self._client is None: + self._client_cm = self._session.client( + "sesv2", + endpoint_url=self._endpoint_url, + config=self._config, + **self._client_kwargs, + ) + try: + self._client = await self._client_cm.__aenter__() + except Exception: + self._client_cm = None + raise + return self._client + + async def close(self) -> None: + client_cm, self._client_cm = self._client_cm, None + self._client = None + if client_cm is not None: + await client_cm.__aexit__(None, None, None) + + # ------------------------------------------------------------------ + # EmailBackend implementation + # ------------------------------------------------------------------ + + async def send( + self, + to, + subject, + *, + body_text=None, + body_html=None, + from_=None, + cc=None, + bcc=None, + reply_to=None, + attachments=None, + headers=None, + ) -> str: + sender = from_ or self._default_from + if not sender: + raise EmailError( + "No sender address: pass from_=... or set default_from on the backend." + ) + if body_text is None and body_html is None: + raise EmailError("send() requires body_text and/or body_html.") + + to_list = _as_list(to) + cc_list = _as_list(cc) + bcc_list = _as_list(bcc) + client = await self._ensure() + + if attachments or headers: + raw = _build_mime( + sender=sender, + to=to_list, + cc=cc_list, + bcc=bcc_list, + reply_to=_as_list(reply_to), + subject=subject, + body_text=body_text, + body_html=body_html, + attachments=attachments or [], + headers=headers or {}, + ) + try: + response = await client.send_email( + FromEmailAddress=sender, + Destination={ + "ToAddresses": to_list, + "CcAddresses": cc_list, + "BccAddresses": bcc_list, + }, + Content={"Raw": {"Data": raw}}, + ReplyToAddresses=_as_list(reply_to), + ) + return response["MessageId"] + except ClientError as e: + self._raise(e) + + body: dict = {} + if body_text is not None: + body["Text"] = {"Data": body_text, "Charset": "UTF-8"} + if body_html is not None: + body["Html"] = {"Data": body_html, "Charset": "UTF-8"} + + try: + response = await client.send_email( + FromEmailAddress=sender, + Destination={ + "ToAddresses": to_list, + "CcAddresses": cc_list, + "BccAddresses": bcc_list, + }, + Content={ + "Simple": { + "Subject": {"Data": subject, "Charset": "UTF-8"}, + "Body": body, + } + }, + ReplyToAddresses=_as_list(reply_to), + ) + return response["MessageId"] + except ClientError as e: + self._raise(e) + + async def health_check(self) -> bool: + try: + client = await self._ensure() + # ``list_email_identities`` is the cheapest read on SESv2 that + # exercises both auth and the data path. ``get_account`` would be + # smaller but is not implemented by moto. + await client.list_email_identities() + return True + except Exception: + return False + + def _raise(self, exc: ClientError): + code = exc.response.get("Error", {}).get("Code", "") + if code in ("MessageRejected",): + raise RecipientRejectedError(str(exc)) from exc + if code in ( + "MailFromDomainNotVerified", + "MailFromDomainNotVerifiedException", + "FromEmailAddressNotVerified", + ): + raise SenderUnverifiedError(str(exc)) from exc + if code in ("Throttling", "TooManyRequestsException", "SendingPausedException"): + raise EmailThrottledError(str(exc)) from exc + raise EmailSendError(str(exc)) from exc + + +def _build_mime( + *, + sender: str, + to: list[str], + cc: list[str], + bcc: list[str], + reply_to: list[str], + subject: str, + body_text: str | None, + body_html: str | None, + attachments: list[Attachment], + headers: dict[str, str], +) -> bytes: + """Build a raw MIME message for SES SendEmail Content.Raw.""" + msg = MIMEEmailMessage() + msg["From"] = sender + if to: + msg["To"] = ", ".join(to) + if cc: + msg["Cc"] = ", ".join(cc) + if reply_to: + msg["Reply-To"] = ", ".join(reply_to) + msg["Subject"] = subject + for header, value in headers.items(): + msg[header] = value + + if body_text is not None and body_html is not None: + msg.set_content(body_text) + msg.add_alternative(body_html, subtype="html") + elif body_html is not None: + msg.set_content(body_html, subtype="html") + else: + msg.set_content(body_text or "") + + for att in attachments: + maintype, _, subtype = att.content_type.partition("/") + msg.add_attachment( + att.content, + maintype=maintype or "application", + subtype=subtype or "octet-stream", + filename=att.filename, + ) + + return msg.as_bytes() diff --git a/cloudrift/email/smtp.py b/cloudrift/email/smtp.py new file mode 100644 index 0000000..92fd862 --- /dev/null +++ b/cloudrift/email/smtp.py @@ -0,0 +1,280 @@ +"""Raw SMTP email backend (SendGrid, Mailgun, Postmark, Office365, MailHog, ...). + +A fresh ``aiosmtplib.SMTP`` connection is opened per :meth:`send`. SMTP servers +commonly drop idle connections and the simplicity is worth more than the +marginal latency win — transactional volumes don't benefit from pooling. +""" +from email.message import EmailMessage as MIMEEmailMessage +from email.utils import make_msgid + +import aiosmtplib +from aiosmtplib.errors import ( + SMTPRecipientsRefused, + SMTPResponseException, + SMTPSenderRefused, +) + +from cloudrift.core.exceptions import ( + EmailError, + EmailSendError, + EmailThrottledError, + RecipientRejectedError, + SenderUnverifiedError, +) +from cloudrift.email.base import Attachment, EmailBackend, _as_list + + +class SMTPEmailBackend(EmailBackend): + """Raw SMTP backend. + + Use one of the class methods to construct: + - ``from_plaintext`` — no TLS (port 25, dev only — MailHog / Mailpit) + - ``from_starttls`` — STARTTLS upgrade (port 587, most providers) + - ``from_tls`` — implicit TLS (port 465) + """ + + _MODE_PLAINTEXT = "plaintext" + _MODE_STARTTLS = "starttls" + _MODE_TLS = "tls" + + def __init__( + self, + *, + host: str, + port: int, + mode: str, + username: str | None = None, + password: str | None = None, + default_from: str | None = None, + ssl_context=None, + timeout: float = 30.0, + ) -> None: + self._host = host + self._port = port + self._mode = mode + self._username = username + self._password = password + self._default_from = default_from + self._ssl_context = ssl_context + self._timeout = timeout + + # ------------------------------------------------------------------ + # Factory constructors + # ------------------------------------------------------------------ + + @classmethod + def from_plaintext( + cls, + host: str, + port: int = 25, + username: str | None = None, + password: str | None = None, + default_from: str | None = None, + timeout: float = 30.0, + ) -> "SMTPEmailBackend": + """Connect without TLS. Dev / local-relay only — never use on the public internet.""" + return cls( + host=host, + port=port, + mode=cls._MODE_PLAINTEXT, + username=username, + password=password, + default_from=default_from, + timeout=timeout, + ) + + @classmethod + def from_starttls( + cls, + host: str, + username: str, + password: str, + default_from: str | None = None, + port: int = 587, + ssl_context=None, + timeout: float = 30.0, + ) -> "SMTPEmailBackend": + """Connect, then upgrade to TLS via STARTTLS (port 587). Default for most providers.""" + return cls( + host=host, + port=port, + mode=cls._MODE_STARTTLS, + username=username, + password=password, + default_from=default_from, + ssl_context=ssl_context, + timeout=timeout, + ) + + @classmethod + def from_tls( + cls, + host: str, + username: str, + password: str, + default_from: str | None = None, + port: int = 465, + ssl_context=None, + timeout: float = 30.0, + ) -> "SMTPEmailBackend": + """Connect with implicit TLS (port 465).""" + return cls( + host=host, + port=port, + mode=cls._MODE_TLS, + username=username, + password=password, + default_from=default_from, + ssl_context=ssl_context, + timeout=timeout, + ) + + # ------------------------------------------------------------------ + # EmailBackend implementation + # ------------------------------------------------------------------ + + async def send( + self, + to, + subject, + *, + body_text=None, + body_html=None, + from_=None, + cc=None, + bcc=None, + reply_to=None, + attachments=None, + headers=None, + ) -> str: + sender = from_ or self._default_from + if not sender: + raise EmailError( + "No sender address: pass from_=... or set default_from on the backend." + ) + if body_text is None and body_html is None: + raise EmailError("send() requires body_text and/or body_html.") + + to_list = _as_list(to) + cc_list = _as_list(cc) + bcc_list = _as_list(bcc) + + msg = _build_mime( + sender=sender, + to=to_list, + cc=cc_list, + reply_to=_as_list(reply_to), + subject=subject, + body_text=body_text, + body_html=body_html, + attachments=attachments or [], + headers=headers or {}, + ) + message_id = make_msgid() + msg["Message-ID"] = message_id + + kwargs = self._connect_kwargs() + try: + await aiosmtplib.send( + msg, + recipients=to_list + cc_list + bcc_list, + sender=sender, + **kwargs, + ) + except SMTPRecipientsRefused as e: + raise RecipientRejectedError(str(e)) from e + except SMTPSenderRefused as e: + raise SenderUnverifiedError(str(e)) from e + except SMTPResponseException as e: + if e.code in (421, 450, 451, 452): + raise EmailThrottledError(str(e)) from e + raise EmailSendError(str(e)) from e + except (OSError, aiosmtplib.SMTPException) as e: + raise EmailSendError(str(e)) from e + + return message_id + + async def health_check(self) -> bool: + kwargs = self._connect_kwargs() + client = aiosmtplib.SMTP( + hostname=kwargs["hostname"], + port=kwargs["port"], + use_tls=kwargs.get("use_tls", False), + start_tls=kwargs.get("start_tls", False), + tls_context=kwargs.get("tls_context"), + timeout=kwargs.get("timeout"), + ) + try: + await client.connect() + await client.noop() + await client.quit() + return True + except Exception: + return False + + def _connect_kwargs(self) -> dict: + kwargs: dict = { + "hostname": self._host, + "port": self._port, + "timeout": self._timeout, + } + if self._username and self._password: + kwargs["username"] = self._username + kwargs["password"] = self._password + if self._mode == self._MODE_TLS: + kwargs["use_tls"] = True + kwargs["start_tls"] = False + if self._ssl_context is not None: + kwargs["tls_context"] = self._ssl_context + elif self._mode == self._MODE_STARTTLS: + kwargs["use_tls"] = False + kwargs["start_tls"] = True + if self._ssl_context is not None: + kwargs["tls_context"] = self._ssl_context + else: + kwargs["use_tls"] = False + kwargs["start_tls"] = False + return kwargs + + +def _build_mime( + *, + sender: str, + to: list[str], + cc: list[str], + reply_to: list[str], + subject: str, + body_text: str | None, + body_html: str | None, + attachments: list[Attachment], + headers: dict[str, str], +) -> MIMEEmailMessage: + msg = MIMEEmailMessage() + msg["From"] = sender + if to: + msg["To"] = ", ".join(to) + if cc: + msg["Cc"] = ", ".join(cc) + if reply_to: + msg["Reply-To"] = ", ".join(reply_to) + msg["Subject"] = subject + for header, value in headers.items(): + msg[header] = value + + if body_text is not None and body_html is not None: + msg.set_content(body_text) + msg.add_alternative(body_html, subtype="html") + elif body_html is not None: + msg.set_content(body_html, subtype="html") + else: + msg.set_content(body_text or "") + + for att in attachments: + maintype, _, subtype = att.content_type.partition("/") + msg.add_attachment( + att.content, + maintype=maintype or "application", + subtype=subtype or "octet-stream", + filename=att.filename, + ) + return msg diff --git a/pyproject.toml b/pyproject.toml index 179b0dd..93a1401 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "hatchling.build" [project] name = "lyzr-cloudrift" version = "0.2.1" -description = "Cloud-agnostic abstraction for storage, messaging, document databases, cache, secrets, and pub/sub" +description = "Cloud-agnostic abstraction for storage, messaging, document databases, cache, secrets, pub/sub, and email" readme = "README.md" requires-python = ">=3.11" license = { file = "LICENSE" } @@ -13,6 +13,7 @@ keywords = [ "cloud", "aws", "azure", "s3", "blob", "sqs", "servicebus", "documentdb", "cosmosdb", "redis", "elasticache", "cache", "secrets", "keyvault", "sns", "eventgrid", "pubsub", + "email", "ses", "smtp", ] classifiers = [ "Programming Language :: Python :: 3", @@ -33,7 +34,7 @@ Documentation = "https://github.com/LYZR-OSS/cloudrift#readme" [project.optional-dependencies] aws = [ - "aioboto3>=13.0.0", # native async S3 / SQS / SNS / Secrets Manager + "aioboto3>=13.0.0", # native async S3 / SQS / SNS / Secrets Manager / SES "motor>=3.3.0", # async MongoDB driver for DocumentDB "redis[hiredis]>=5.0.0", # ElastiCache (also used by standalone) ] @@ -43,12 +44,16 @@ azure = [ "azure-identity>=1.15.0", "azure-keyvault-secrets>=4.7.0", "azure-eventgrid>=4.9.0", + "azure-communication-email>=1.0.0", "motor>=3.3.0", # Cosmos DB via MongoDB API "redis[hiredis]>=5.0.0", # Azure Cache for Redis ] cache = [ "redis[hiredis]>=5.0.0", ] +email = [ + "aiosmtplib>=3.0", # raw SMTP backend +] all = [ "aioboto3>=13.0.0", "motor>=3.3.0", @@ -57,17 +62,22 @@ all = [ "azure-identity>=1.15.0", "azure-keyvault-secrets>=4.7.0", "azure-eventgrid>=4.9.0", + "azure-communication-email>=1.0.0", "redis[hiredis]>=5.0.0", + "aiosmtplib>=3.0", ] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", - "moto[s3,sqs,sns,secretsmanager,server]>=5.0", + "moto[s3,sqs,sns,ses,secretsmanager,server]>=5.0", "aioboto3>=13.0.0", "motor>=3.3.0", "fakeredis>=2.20.0", "httpx>=0.25.0", "ruff>=0.4.0", + "aiosmtplib>=3.0", + "azure-communication-email>=1.0.0", + "azure-identity>=1.15.0", ] [tool.hatch.build.targets.wheel] diff --git a/tests/test_email.py b/tests/test_email.py new file mode 100644 index 0000000..d34e2bb --- /dev/null +++ b/tests/test_email.py @@ -0,0 +1,588 @@ +"""Unit tests for the email category. + +- SES: real `aioboto3` client hitting an in-process moto SESv2 server. +- ACS: `azure.communication.email.EmailClient` patched with a fake. +- SMTP: `aiosmtplib.send` patched to capture the MIME tree. +""" +from __future__ import annotations + +import base64 +from email import message_from_bytes +from email.message import EmailMessage as MIMEEmailMessage +from unittest.mock import MagicMock, patch + +import boto3 +import pytest +from moto.server import ThreadedMotoServer + +from cloudrift.core.exceptions import ( + EmailError, + EmailSendError, + EmailThrottledError, + RecipientRejectedError, + SenderUnverifiedError, +) +from cloudrift.email import Attachment, EmailMessage, get_email +from cloudrift.email.azure_acs import AzureACSEmailBackend +from cloudrift.email.smtp import SMTPEmailBackend + +REGION = "us-east-1" +SENDER = "sender@example.com" + + +# =========================================================================== +# SES — moto-backed +# =========================================================================== + +@pytest.fixture(scope="module") +def moto_server(): + server = ThreadedMotoServer(port=0) + server.start() + host, port = server._server.server_address + yield f"http://{host}:{port}" + server.stop() + + +@pytest.fixture +def verified_sender(moto_server): + """Verify SENDER in the moto SES backend so send_email succeeds.""" + ses = boto3.client( + "ses", + region_name=REGION, + endpoint_url=moto_server, + aws_access_key_id="test", + aws_secret_access_key="test", + ) + ses.verify_email_identity(EmailAddress=SENDER) + return SENDER + + +@pytest.fixture +async def ses_backend(moto_server, verified_sender): + backend = get_email( + "ses", + aws_access_key_id="test", + aws_secret_access_key="test", + region=REGION, + endpoint_url=moto_server, + default_from=SENDER, + ) + yield backend + await backend.close() + + +async def test_ses_send_simple_text(ses_backend): + msg_id = await ses_backend.send( + "to@example.com", + "hello", + body_text="plain body", + ) + assert isinstance(msg_id, str) and msg_id + + +async def test_ses_send_with_html_and_text(ses_backend): + msg_id = await ses_backend.send( + ["to@example.com", "to2@example.com"], + "hello", + body_text="plain", + body_html="html
", + ) + assert isinstance(msg_id, str) and msg_id + + +async def test_ses_send_with_attachment_uses_raw(ses_backend): + msg_id = await ses_backend.send( + "to@example.com", + "with attachment", + body_text="see attached", + attachments=[Attachment(filename="hello.txt", content=b"hi", content_type="text/plain")], + ) + assert isinstance(msg_id, str) and msg_id + + +async def test_ses_send_unverified_sender_maps_exception(moto_server): + backend = get_email( + "ses", + aws_access_key_id="test", + aws_secret_access_key="test", + region=REGION, + endpoint_url=moto_server, + default_from="unverified@nowhere.example", + ) + try: + # moto raises MessageRejected with "Email address is not verified" when + # the From identity isn't verified. + with pytest.raises((RecipientRejectedError, SenderUnverifiedError, EmailSendError)): + await backend.send( + "to@example.com", + "subject", + body_text="hi", + ) + finally: + await backend.close() + + +async def test_ses_send_requires_body(ses_backend): + with pytest.raises(EmailError): + await ses_backend.send("to@example.com", "subj") + + +async def test_ses_send_requires_sender(moto_server): + backend = get_email( + "ses", + aws_access_key_id="test", + aws_secret_access_key="test", + region=REGION, + endpoint_url=moto_server, + # no default_from + ) + try: + with pytest.raises(EmailError): + await backend.send("to@example.com", "subj", body_text="hi") + finally: + await backend.close() + + +async def test_ses_send_batch_loops(ses_backend): + ids = await ses_backend.send_batch( + [ + EmailMessage(to=["to1@example.com"], subject="s1", body_text="b1"), + EmailMessage(to=["to2@example.com"], subject="s2", body_html="b2"), + EmailMessage(to=["to3@example.com"], subject="s3", body_text="b3"), + ] + ) + assert len(ids) == 3 + assert all(isinstance(i, str) and i for i in ids) + + +async def test_ses_health_check(ses_backend): + assert await ses_backend.health_check() is True + + +# =========================================================================== +# ACS — mocked SDK +# =========================================================================== + +class _FakePoller: + def __init__(self, result): + self._result = result + self.result_called = False + + def result(self): + self.result_called = True + return self._result + + +class _FakeACSClient: + def __init__(self): + self.last_message = None + self.closed = False + self._next_result = {"id": "acs-msg-1"} + + def begin_send(self, message): + self.last_message = message + return _FakePoller(self._next_result) + + def close(self): + self.closed = True + + +@pytest.fixture +def fake_acs_client(): + return _FakeACSClient() + + +@pytest.fixture +async def acs_backend(fake_acs_client): + backend = AzureACSEmailBackend( + endpoint=None, + default_from="DoNotReply@example.com", + connection_string="endpoint=https://x.communication.azure.com;accesskey=fake==", + ) + # Force the client without going through ``_ensure``'s SDK import. + backend._client = fake_acs_client + yield backend + await backend.close() + + +async def test_acs_send_basic(acs_backend, fake_acs_client): + msg_id = await acs_backend.send( + "to@example.com", + "subject", + body_text="hi", + body_html="hi
", + ) + assert msg_id == "acs-msg-1" + msg = fake_acs_client.last_message + assert msg["senderAddress"] == "DoNotReply@example.com" + assert msg["recipients"]["to"] == [{"address": "to@example.com"}] + assert msg["content"]["subject"] == "subject" + assert msg["content"]["plainText"] == "hi" + assert msg["content"]["html"] == "hi
" + + +async def test_acs_send_with_cc_bcc_reply_to(acs_backend, fake_acs_client): + await acs_backend.send( + ["a@example.com", "b@example.com"], + "subject", + body_text="hi", + cc=["c@example.com"], + bcc=["d@example.com"], + reply_to=["r@example.com"], + ) + msg = fake_acs_client.last_message + assert msg["recipients"]["to"] == [ + {"address": "a@example.com"}, + {"address": "b@example.com"}, + ] + assert msg["recipients"]["cc"] == [{"address": "c@example.com"}] + assert msg["recipients"]["bcc"] == [{"address": "d@example.com"}] + assert msg["replyTo"] == [{"address": "r@example.com"}] + + +async def test_acs_send_with_attachment_base64(acs_backend, fake_acs_client): + await acs_backend.send( + "to@example.com", + "subject", + body_text="hi", + attachments=[ + Attachment(filename="doc.pdf", content=b"PDFDATA", content_type="application/pdf") + ], + ) + atts = fake_acs_client.last_message["attachments"] + assert atts[0]["name"] == "doc.pdf" + assert atts[0]["contentType"] == "application/pdf" + assert base64.b64decode(atts[0]["contentInBase64"]) == b"PDFDATA" + + +async def test_acs_send_throttled_maps_exception(acs_backend, fake_acs_client): + from azure.core.exceptions import HttpResponseError + + err = HttpResponseError(message="Too Many Requests") + err.status_code = 429 + + def _raise_429(_message): + raise err + + fake_acs_client.begin_send = _raise_429 # type: ignore[assignment] + with pytest.raises(EmailThrottledError): + await acs_backend.send("to@example.com", "subj", body_text="hi") + + +async def test_acs_send_invalid_recipient_maps_exception(acs_backend, fake_acs_client): + from azure.core.exceptions import HttpResponseError + + err = HttpResponseError(message="InvalidRecipient: bad email") + err.status_code = 400 + + def _raise_400(_message): + raise err + + fake_acs_client.begin_send = _raise_400 # type: ignore[assignment] + with pytest.raises(RecipientRejectedError): + await acs_backend.send("bogus", "subj", body_text="hi") + + +async def test_acs_send_domain_not_linked_maps_exception(acs_backend, fake_acs_client): + from azure.core.exceptions import HttpResponseError + + err = HttpResponseError(message="DomainNotLinked: sender domain not linked") + err.status_code = 403 + + def _raise_403(_message): + raise err + + fake_acs_client.begin_send = _raise_403 # type: ignore[assignment] + with pytest.raises(SenderUnverifiedError): + await acs_backend.send("to@example.com", "subj", body_text="hi") + + +async def test_acs_close_idempotent(acs_backend, fake_acs_client): + await acs_backend.close() + assert fake_acs_client.closed is True + # Second close must not raise. + await acs_backend.close() + + +# =========================================================================== +# SMTP — mocked aiosmtplib +# =========================================================================== + +@pytest.fixture +def captured_send(): + captured: dict = {} + + async def fake_send(msg, **kwargs): + captured["msg"] = msg + captured["kwargs"] = kwargs + return ({}, "250 OK") + + with patch("cloudrift.email.smtp.aiosmtplib.send", side_effect=fake_send): + yield captured + + +@pytest.fixture +def smtp_backend(): + return SMTPEmailBackend.from_starttls( + host="smtp.example.com", + username="user", + password="pw", + default_from=SENDER, + ) + + +def _extract_mime(msg) -> MIMEEmailMessage: + """The captured object is already an EmailMessage.""" + if isinstance(msg, MIMEEmailMessage): + return msg + # Defensive: aiosmtplib.send accepts str/bytes too. + return message_from_bytes(bytes(msg)) # type: ignore[arg-type] + + +async def test_smtp_send_text_only(smtp_backend, captured_send): + msg_id = await smtp_backend.send( + "to@example.com", + "hello", + body_text="plain body", + ) + assert msg_id.startswith("<") and msg_id.endswith(">") + mime = _extract_mime(captured_send["msg"]) + assert mime["From"] == SENDER + assert mime["To"] == "to@example.com" + assert mime["Subject"] == "hello" + assert mime.get_content_type() == "text/plain" + assert "plain body" in mime.get_content() + + +async def test_smtp_send_html_and_text_multipart_alternative(smtp_backend, captured_send): + await smtp_backend.send( + "to@example.com", + "hello", + body_text="plain", + body_html="html", + ) + mime = _extract_mime(captured_send["msg"]) + assert mime.get_content_type() == "multipart/alternative" + parts = list(mime.iter_parts()) + types = sorted(p.get_content_type() for p in parts) + assert types == ["text/html", "text/plain"] + + +async def test_smtp_send_html_only(smtp_backend, captured_send): + await smtp_backend.send( + "to@example.com", + "hello", + body_html="only html
", + ) + mime = _extract_mime(captured_send["msg"]) + assert mime.get_content_type() == "text/html" + + +async def test_smtp_send_with_attachment(smtp_backend, captured_send): + await smtp_backend.send( + "to@example.com", + "hello", + body_text="body", + attachments=[ + Attachment(filename="doc.pdf", content=b"PDFDATA", content_type="application/pdf") + ], + ) + mime = _extract_mime(captured_send["msg"]) + # add_attachment promotes a text-only message to multipart/mixed + assert mime.get_content_type() == "multipart/mixed" + att_parts = [ + p for p in mime.iter_parts() if p.get_filename() == "doc.pdf" + ] + assert len(att_parts) == 1 + assert att_parts[0].get_content_type() == "application/pdf" + assert att_parts[0].get_payload(decode=True) == b"PDFDATA" + + +async def test_smtp_send_cc_bcc_recipients_passed_to_aiosmtplib(smtp_backend, captured_send): + await smtp_backend.send( + "to@example.com", + "hello", + body_text="body", + cc=["cc@example.com"], + bcc=["bcc@example.com"], + ) + recipients = captured_send["kwargs"]["recipients"] + assert "to@example.com" in recipients + assert "cc@example.com" in recipients + assert "bcc@example.com" in recipients + # The MIME headers should expose To and Cc but NOT Bcc. + mime = _extract_mime(captured_send["msg"]) + assert mime["To"] == "to@example.com" + assert mime["Cc"] == "cc@example.com" + assert mime["Bcc"] is None + + +async def test_smtp_send_starttls_kwargs(smtp_backend, captured_send): + await smtp_backend.send("to@example.com", "hello", body_text="body") + k = captured_send["kwargs"] + assert k["hostname"] == "smtp.example.com" + assert k["port"] == 587 + assert k["start_tls"] is True + assert k["use_tls"] is False + assert k["username"] == "user" + assert k["password"] == "pw" + + +async def test_smtp_send_tls_mode_kwargs(captured_send): + backend = get_email( + "smtp", + mode="tls", + host="smtp.example.com", + port=465, + username="user", + password="pw", + default_from=SENDER, + ) + await backend.send("to@example.com", "hello", body_text="body") + k = captured_send["kwargs"] + assert k["port"] == 465 + assert k["use_tls"] is True + assert k["start_tls"] is False + + +async def test_smtp_send_plaintext_mode_kwargs(captured_send): + backend = get_email( + "smtp", + mode="plaintext", + host="mailhog.local", + port=1025, + default_from=SENDER, + ) + await backend.send("to@example.com", "hello", body_text="body") + k = captured_send["kwargs"] + assert k["port"] == 1025 + assert k["use_tls"] is False + assert k["start_tls"] is False + + +async def test_smtp_send_recipients_refused_maps_exception(smtp_backend): + from aiosmtplib.errors import SMTPRecipientsRefused + + async def _raise(*_a, **_k): + raise SMTPRecipientsRefused([]) + + with patch("cloudrift.email.smtp.aiosmtplib.send", side_effect=_raise): + with pytest.raises(RecipientRejectedError): + await smtp_backend.send("to@example.com", "subj", body_text="hi") + + +async def test_smtp_send_sender_refused_maps_exception(smtp_backend): + from aiosmtplib.errors import SMTPSenderRefused + + async def _raise(*_a, **_k): + raise SMTPSenderRefused(550, "sender rejected", SENDER) + + with patch("cloudrift.email.smtp.aiosmtplib.send", side_effect=_raise): + with pytest.raises(SenderUnverifiedError): + await smtp_backend.send("to@example.com", "subj", body_text="hi") + + +async def test_smtp_send_throttled_maps_exception(smtp_backend): + from aiosmtplib.errors import SMTPResponseException + + async def _raise(*_a, **_k): + raise SMTPResponseException(421, "service not available") + + with patch("cloudrift.email.smtp.aiosmtplib.send", side_effect=_raise): + with pytest.raises(EmailThrottledError): + await smtp_backend.send("to@example.com", "subj", body_text="hi") + + +async def test_smtp_send_requires_body(smtp_backend): + with pytest.raises(EmailError): + await smtp_backend.send("to@example.com", "subj") + + +async def test_smtp_send_requires_sender(): + backend = SMTPEmailBackend.from_plaintext(host="mailhog.local", port=1025) + with pytest.raises(EmailError): + await backend.send("to@example.com", "subj", body_text="hi") + + +# =========================================================================== +# Factory +# =========================================================================== + +def test_factory_unknown_provider(): + with pytest.raises(ValueError, match="Unknown email provider"): + get_email("gmail", username="me", password="x") + + +def test_factory_unknown_smtp_mode(): + with pytest.raises(ValueError, match="Unknown SMTP mode"): + get_email("smtp", mode="weird", host="smtp.example.com") + + +def test_factory_smtp_routing_starttls(): + backend = get_email( + "smtp", + host="smtp.example.com", + username="u", + password="p", + default_from=SENDER, + ) + assert isinstance(backend, SMTPEmailBackend) + assert backend._mode == "starttls" + assert backend._port == 587 + + +def test_factory_smtp_routing_tls(): + backend = get_email( + "smtp", + mode="tls", + host="smtp.example.com", + username="u", + password="p", + default_from=SENDER, + ) + assert backend._mode == "tls" + + +def test_factory_ses_routing_access_key(): + from cloudrift.email.ses import AWSSESBackend + + backend = get_email( + "ses", + aws_access_key_id="AKIA-test", + aws_secret_access_key="secret", + region=REGION, + default_from=SENDER, + ) + assert isinstance(backend, AWSSESBackend) + + +def test_factory_acs_routing_connection_string(): + backend = get_email( + "azure_acs", + connection_string="endpoint=https://x.communication.azure.com;accesskey=k", + default_from="dnr@example.com", + ) + assert isinstance(backend, AzureACSEmailBackend) + assert backend._connection_string is not None + + +def test_factory_acs_routing_service_principal(): + # Just constructs the credential; no SDK calls. + # NB: `azure` is a PEP 420 namespace package — `azure.identity` is not + # discoverable as an attribute on `azure` until something imports it + # directly. `patch("azure.identity...")` triggers that lookup, so we + # import the submodule here and patch it via `patch.object`. + import azure.identity + + fake_cred = MagicMock(name="creds") + with patch.object(azure.identity, "ClientSecretCredential", return_value=fake_cred): + backend = get_email( + "azure_acs", + endpoint="https://x.communication.azure.com", + tenant_id="t", + client_id="c", + client_secret="s", + default_from="dnr@example.com", + ) + assert isinstance(backend, AzureACSEmailBackend) + assert backend._credential is fake_cred diff --git a/tests/test_ensure_lifecycle.py b/tests/test_ensure_lifecycle.py index 523dbd5..6e7c6a6 100644 --- a/tests/test_ensure_lifecycle.py +++ b/tests/test_ensure_lifecycle.py @@ -17,6 +17,7 @@ import pytest +from cloudrift.email.ses import AWSSESBackend from cloudrift.messaging.sqs import AWSSQSBackend from cloudrift.pubsub.sns import AWSSNSBackend from cloudrift.secrets.aws_secrets_manager import AWSSecretsManagerBackend @@ -69,8 +70,9 @@ def _fake_session(cm): lambda s: AWSSNSBackend(s), lambda s: AWSSQSBackend("https://example/queue", s), lambda s: AWSSecretsManagerBackend(s), + lambda s: AWSSESBackend(s), ], - ids=["s3", "sns", "sqs", "secrets"], + ids=["s3", "sns", "sqs", "secrets", "ses"], ) async def test_ensure_clears_client_cm_on_aenter_failure(factory): cm = _BoomCM() @@ -92,8 +94,9 @@ async def test_ensure_clears_client_cm_on_aenter_failure(factory): lambda s: AWSSNSBackend(s), lambda s: AWSSQSBackend("https://example/queue", s), lambda s: AWSSecretsManagerBackend(s), + lambda s: AWSSESBackend(s), ], - ids=["s3", "sns", "sqs", "secrets"], + ids=["s3", "sns", "sqs", "secrets", "ses"], ) async def test_close_is_noop_after_failed_ensure(factory): """Calling close() after a failed _ensure() must be a clean no-op.""" @@ -118,8 +121,9 @@ async def test_close_is_noop_after_failed_ensure(factory): lambda s: AWSSNSBackend(s), lambda s: AWSSQSBackend("https://example/queue", s), lambda s: AWSSecretsManagerBackend(s), + lambda s: AWSSESBackend(s), ], - ids=["s3", "sns", "sqs", "secrets"], + ids=["s3", "sns", "sqs", "secrets", "ses"], ) async def test_close_clears_state_even_if_aexit_raises(factory): """If __aexit__ raises during close, the object must still end up clean.""" diff --git a/uv.lock b/uv.lock index 6a633a5..d30353b 100644 --- a/uv.lock +++ b/uv.lock @@ -180,6 +180,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fb/76/641ae371508676492379f16e2fa48f4e2c11741bd63c48be4b12a6b09cba/aiosignal-1.4.0-py3-none-any.whl", hash = "sha256:053243f8b92b990551949e63930a839ff0cf0b0ebbe0597b0f3fb19e1a0fe82e", size = 7490, upload-time = "2025-07-03T22:54:42.156Z" }, ] +[[package]] +name = "aiosmtplib" +version = "5.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e7/ad/240a7ce4e50713b111dff8b781a898d8d4770e5d6ad4899103f84c86005c/aiosmtplib-5.1.0.tar.gz", hash = "sha256:2504a23b2b63c9de6bc4ea719559a38996dba68f73f6af4eb97be20ee4c5e6c4", size = 66176, upload-time = "2026-01-25T01:51:11.408Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/37/82/70f2c452acd7ed18c558c8ace9a8cf4fdcc70eae9a41749b5bdc53eb6f45/aiosmtplib-5.1.0-py3-none-any.whl", hash = "sha256:368029440645b486b69db7029208a7a78c6691b90d24a5332ddba35d9109d55b", size = 27778, upload-time = "2026-01-25T01:51:10.026Z" }, +] + [[package]] name = "annotated-types" version = "0.7.0" @@ -258,29 +267,30 @@ wheels = [ ] [[package]] -name = "azure-core" -version = "1.39.0" +name = "azure-communication-email" +version = "1.1.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "requests" }, + { name = "azure-core" }, + { name = "isodate" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/34/83/bbde3faa84ddcb8eb0eca4b3ffb3221252281db4ce351300fe248c5c70b1/azure_core-1.39.0.tar.gz", hash = "sha256:8a90a562998dd44ce84597590fff6249701b98c0e8797c95fcdd695b54c35d74", size = 367531, upload-time = "2026-03-19T01:31:29.461Z" } +sdist = { url = "https://files.pythonhosted.org/packages/97/de/f71191ead5e778a7d8939744977a049f796f6fb6549847c667fad373605e/azure_communication_email-1.1.0.tar.gz", hash = "sha256:6a4af8281024327c3ab18a4996919069a99a69aad3a19c40f7852a6682493327", size = 55546, upload-time = "2025-10-17T20:30:23.349Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/7e/d6/8ebcd05b01a580f086ac9a97fb9fac65c09a4b012161cc97c21a336e880b/azure_core-1.39.0-py3-none-any.whl", hash = "sha256:4ac7b70fab5438c3f68770649a78daf97833caa83827f91df9c14e0e0ea7d34f", size = 218318, upload-time = "2026-03-19T01:31:31.25Z" }, + { url = "https://files.pythonhosted.org/packages/97/fe/9bd1028853c4b351c756ba4acc39ef5b5914a3452ebe2df0b8ddd6051114/azure_communication_email-1.1.0-py3-none-any.whl", hash = "sha256:9212153f21cf7e68734c32ebfe8702b43398bd01df2dddb0ca52cd5a8bbd5024", size = 64170, upload-time = "2025-10-17T20:30:24.829Z" }, ] [[package]] -name = "azure-cosmos" -version = "4.15.0" +name = "azure-core" +version = "1.39.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "azure-core" }, + { name = "requests" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/c8/a3/0474e622bf9676e3206d61269461ed16a05958363c254ea3b15af16219b2/azure_cosmos-4.15.0.tar.gz", hash = "sha256:be1cf49837c197d9da880ec47fe020a24d679075b89e0e1e2aca8d376b3a5a24", size = 2100744, upload-time = "2026-02-23T16:01:52.293Z" } +sdist = { url = "https://files.pythonhosted.org/packages/34/83/bbde3faa84ddcb8eb0eca4b3ffb3221252281db4ce351300fe248c5c70b1/azure_core-1.39.0.tar.gz", hash = "sha256:8a90a562998dd44ce84597590fff6249701b98c0e8797c95fcdd695b54c35d74", size = 367531, upload-time = "2026-03-19T01:31:29.461Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/31/5f/b6e3d3ae16fa121fdc17e62447800d378b7e716cd6103c3650977a6c4618/azure_cosmos-4.15.0-py3-none-any.whl", hash = "sha256:83c1da7386bcd0df9a15c52116cc35012225d8a72d4f1379938b83ea5eb19fff", size = 424870, upload-time = "2026-02-23T16:01:54.514Z" }, + { url = "https://files.pythonhosted.org/packages/7e/d6/8ebcd05b01a580f086ac9a97fb9fac65c09a4b012161cc97c21a336e880b/azure_core-1.39.0-py3-none-any.whl", hash = "sha256:4ac7b70fab5438c3f68770649a78daf97833caa83827f91df9c14e0e0ea7d34f", size = 218318, upload-time = "2026-03-19T01:31:31.25Z" }, ] [[package]] @@ -1130,13 +1140,14 @@ wheels = [ [[package]] name = "lyzr-cloudrift" -version = "0.1.0" +version = "0.2.1" source = { editable = "." } [package.optional-dependencies] all = [ { name = "aioboto3" }, - { name = "azure-cosmos" }, + { name = "aiosmtplib" }, + { name = "azure-communication-email" }, { name = "azure-eventgrid" }, { name = "azure-identity" }, { name = "azure-keyvault-secrets" }, @@ -1151,7 +1162,7 @@ aws = [ { name = "redis", extra = ["hiredis"] }, ] azure = [ - { name = "azure-cosmos" }, + { name = "azure-communication-email" }, { name = "azure-eventgrid" }, { name = "azure-identity" }, { name = "azure-keyvault-secrets" }, @@ -1165,14 +1176,19 @@ cache = [ ] dev = [ { name = "aioboto3" }, + { name = "aiosmtplib" }, + { name = "azure-communication-email" }, { name = "fakeredis" }, { name = "httpx" }, - { name = "mongomock-motor" }, { name = "moto", extra = ["s3", "server"] }, + { name = "motor" }, { name = "pytest" }, { name = "pytest-asyncio" }, { name = "ruff" }, ] +email = [ + { name = "aiosmtplib" }, +] [package.dev-dependencies] dev = [ @@ -1184,8 +1200,12 @@ requires-dist = [ { name = "aioboto3", marker = "extra == 'all'", specifier = ">=13.0.0" }, { name = "aioboto3", marker = "extra == 'aws'", specifier = ">=13.0.0" }, { name = "aioboto3", marker = "extra == 'dev'", specifier = ">=13.0.0" }, - { name = "azure-cosmos", marker = "extra == 'all'", specifier = ">=4.7.0" }, - { name = "azure-cosmos", marker = "extra == 'azure'", specifier = ">=4.7.0" }, + { name = "aiosmtplib", marker = "extra == 'all'", specifier = ">=3.0" }, + { name = "aiosmtplib", marker = "extra == 'dev'", specifier = ">=3.0" }, + { name = "aiosmtplib", marker = "extra == 'email'", specifier = ">=3.0" }, + { name = "azure-communication-email", marker = "extra == 'all'", specifier = ">=1.0.0" }, + { name = "azure-communication-email", marker = "extra == 'azure'", specifier = ">=1.0.0" }, + { name = "azure-communication-email", marker = "extra == 'dev'", specifier = ">=1.0.0" }, { name = "azure-eventgrid", marker = "extra == 'all'", specifier = ">=4.9.0" }, { name = "azure-eventgrid", marker = "extra == 'azure'", specifier = ">=4.9.0" }, { name = "azure-identity", marker = "extra == 'all'", specifier = ">=1.15.0" }, @@ -1198,11 +1218,11 @@ requires-dist = [ { name = "azure-storage-blob", marker = "extra == 'azure'", specifier = ">=12.19.0" }, { name = "fakeredis", marker = "extra == 'dev'", specifier = ">=2.20.0" }, { name = "httpx", marker = "extra == 'dev'", specifier = ">=0.25.0" }, - { name = "mongomock-motor", marker = "extra == 'dev'", specifier = ">=0.0.21" }, - { name = "moto", extras = ["s3", "sqs", "sns", "secretsmanager", "server"], marker = "extra == 'dev'", specifier = ">=5.0" }, + { name = "moto", extras = ["s3", "sqs", "sns", "ses", "secretsmanager", "server"], marker = "extra == 'dev'", specifier = ">=5.0" }, { name = "motor", marker = "extra == 'all'", specifier = ">=3.3.0" }, { name = "motor", marker = "extra == 'aws'", specifier = ">=3.3.0" }, { name = "motor", marker = "extra == 'azure'", specifier = ">=3.3.0" }, + { name = "motor", marker = "extra == 'dev'", specifier = ">=3.3.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23" }, { name = "redis", extras = ["hiredis"], marker = "extra == 'all'", specifier = ">=5.0.0" }, @@ -1211,7 +1231,7 @@ requires-dist = [ { name = "redis", extras = ["hiredis"], marker = "extra == 'cache'", specifier = ">=5.0.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.4.0" }, ] -provides-extras = ["aws", "azure", "cache", "all", "dev"] +provides-extras = ["aws", "azure", "cache", "email", "all", "dev"] [package.metadata.requires-dev] dev = [{ name = "fakeredis", specifier = ">=2.35.1" }] @@ -1290,33 +1310,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/70/bc/6f1c2f612465f5fa89b95bead1f44dcb607670fd42891d8fdcd5d039f4f4/markupsafe-3.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:32001d6a8fc98c8cb5c947787c5d08b0a50663d139f1305bac5885d98d9b40fa", size = 14146, upload-time = "2025-09-27T18:37:28.327Z" }, ] -[[package]] -name = "mongomock" -version = "4.3.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "packaging" }, - { name = "pytz" }, - { name = "sentinels" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/4d/a4/4a560a9f2a0bec43d5f63104f55bc48666d619ca74825c8ae156b08547cf/mongomock-4.3.0.tar.gz", hash = "sha256:32667b79066fabc12d4f17f16a8fd7361b5f4435208b3ba32c226e52212a8c30", size = 135862, upload-time = "2024-11-16T11:23:25.957Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/94/4d/8bea712978e3aff017a2ab50f262c620e9239cc36f348aae45e48d6a4786/mongomock-4.3.0-py2.py3-none-any.whl", hash = "sha256:5ef86bd12fc8806c6e7af32f21266c61b6c4ba96096f85129852d1c4fec1327e", size = 64891, upload-time = "2024-11-16T11:23:24.748Z" }, -] - -[[package]] -name = "mongomock-motor" -version = "0.0.36" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "mongomock" }, - { name = "motor" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/18/9f/38e42a34ebad323addaf6296d6b5d83eaf2c423adf206b757c68315e196a/mongomock_motor-0.0.36.tar.gz", hash = "sha256:3cf62352ece5af2f02e04d2f252393f88b5fe0487997da00584020cee4b8efba", size = 5754, upload-time = "2025-05-16T22:52:27.214Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/d6/99/f5fdbbdc96bfd03e5f9c36339547a9076f5dbb5882900b7621526d41a38d/mongomock_motor-0.0.36-py3-none-any.whl", hash = "sha256:3ecb7949662b8986ff9c267fa0b1402b5b75a6afd57f03850cd6e13a067e3691", size = 7334, upload-time = "2025-05-16T22:52:25.417Z" }, -] - [[package]] name = "moto" version = "5.1.22" @@ -1981,15 +1974,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0b/d7/1959b9648791274998a9c3526f6d0ec8fd2233e4d4acce81bbae76b44b2a/python_dotenv-1.2.2-py3-none-any.whl", hash = "sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a", size = 22101, upload-time = "2026-03-01T16:00:25.09Z" }, ] -[[package]] -name = "pytz" -version = "2026.1.post1" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/56/db/b8721d71d945e6a8ac63c0fc900b2067181dbb50805958d4d4661cf7d277/pytz-2026.1.post1.tar.gz", hash = "sha256:3378dde6a0c3d26719182142c56e60c7f9af7e968076f31aae569d72a0358ee1", size = 321088, upload-time = "2026-03-03T07:47:50.683Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/10/99/781fe0c827be2742bcc775efefccb3b048a3a9c6ce9aec0cbf4a101677e5/pytz-2026.1.post1-py2.py3-none-any.whl", hash = "sha256:f2fd16142fda348286a75e1a524be810bb05d444e5a081f37f7affc635035f7a", size = 510489, upload-time = "2026-03-03T07:47:49.167Z" }, -] - [[package]] name = "pywin32" version = "311" @@ -2385,15 +2369,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/48/f0/ae7ca09223a81a1d890b2557186ea015f6e0502e9b8cb8e1813f1d8cfa4e/s3transfer-0.14.0-py3-none-any.whl", hash = "sha256:ea3b790c7077558ed1f02a3072fb3cb992bbbd253392f4b6e9e8976941c7d456", size = 85712, upload-time = "2025-09-09T19:23:30.041Z" }, ] -[[package]] -name = "sentinels" -version = "1.1.1" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/6f/9b/07195878aa25fe6ed209ec74bc55ae3e3d263b60a489c6e73fdca3c8fe05/sentinels-1.1.1.tar.gz", hash = "sha256:3c2f64f754187c19e0a1a029b148b74cf58dd12ec27b4e19c0e5d6e22b5a9a86", size = 4393, upload-time = "2025-08-12T07:57:50.26Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/49/65/dea992c6a97074f6d8ff9eab34741298cac2ce23e2b6c74fb7d08afdf85c/sentinels-1.1.1-py3-none-any.whl", hash = "sha256:835d3b28f3b47f5284afa4bf2db6e00f2dc5f80f9923d4b7e7aeeeccf6146a11", size = 3744, upload-time = "2025-08-12T07:57:48.858Z" }, -] - [[package]] name = "setuptools" version = "82.0.1"