From 62c2a55ef726b8390abdb57e4fca5c34697d16c8 Mon Sep 17 00:00:00 2001 From: Allisson Azevedo Date: Sat, 3 Jan 2026 11:26:54 -0300 Subject: [PATCH 1/6] feat: add performance timing and logging to auth, clients, and subscriptions services - Measure execution duration for token validation, message cleanup, and subscription creation - Log debug info on successful validations, warnings on failures, and errors on exceptions - Include relevant metadata like client_id, scopes, duration, and error details in logs - Enhances observability for debugging and performance monitoring --- fastpubsub/services/auth.py | 47 ++++- fastpubsub/services/clients.py | 210 +++++++++++++++++------ fastpubsub/services/helpers.py | 53 +++++- fastpubsub/services/messages.py | 248 +++++++++++++++++++++++---- fastpubsub/services/subscriptions.py | 163 ++++++++++++++---- fastpubsub/services/topics.py | 112 +++++++++--- uv.lock | 6 +- 7 files changed, 693 insertions(+), 146 deletions(-) diff --git a/fastpubsub/services/auth.py b/fastpubsub/services/auth.py index 2fe98a5..c78d1df 100644 --- a/fastpubsub/services/auth.py +++ b/fastpubsub/services/auth.py @@ -1,5 +1,6 @@ """Authentication and authorization services for fastpubsub.""" +import time from typing import Annotated from fastapi import Depends, Request @@ -8,8 +9,11 @@ from fastpubsub import services from fastpubsub.config import settings from fastpubsub.exceptions import InvalidClientToken +from fastpubsub.logger import get_logger from fastpubsub.models import DecodedClientToken +logger = get_logger(__name__) + oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/oauth/token", auto_error=False) @@ -54,9 +58,46 @@ async def get_current_token(token: str | None = Depends(oauth2_scheme)) -> Decod Raises: InvalidClientToken: If token is invalid or authentication fails. """ - if token is None: - token = "" - return await services.decode_jwt_client_token(token, auth_enabled=settings.auth_enabled) + start_time = time.perf_counter() + + try: + if token is None: + token = "" + + decoded_token = await services.decode_jwt_client_token(token, auth_enabled=settings.auth_enabled) + + duration = time.perf_counter() - start_time + logger.debug( + "token validated", + extra={ + "client_id": str(decoded_token.client_id), + "scopes": list(decoded_token.scopes), + "duration": f"{duration:.4f}s", + }, + ) + return decoded_token + except InvalidClientToken as e: + duration = time.perf_counter() - start_time + logger.warning( + "token validation failed", + extra={ + "error": str(e), + "has_token": token is not None and token != "", + "duration": f"{duration:.4f}s", + }, + ) + raise + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "token validation error", + extra={ + "error": str(e), + "has_token": token is not None and token != "", + "duration": f"{duration:.4f}s", + }, + ) + raise def require_scope(resource: str, action: str): diff --git a/fastpubsub/services/clients.py b/fastpubsub/services/clients.py index af2cf00..8866150 100644 --- a/fastpubsub/services/clients.py +++ b/fastpubsub/services/clients.py @@ -2,6 +2,7 @@ import datetime import secrets +import time import uuid from jose import jwt @@ -13,6 +14,7 @@ from fastpubsub.database import Client as DBClient from fastpubsub.database import SessionLocal from fastpubsub.exceptions import InvalidClient +from fastpubsub.logger import get_logger from fastpubsub.models import ( Client, ClientToken, @@ -24,6 +26,7 @@ from fastpubsub.services.helpers import _delete_entity, _get_entity, utc_now password_hash = PasswordHash.recommended() +logger = get_logger(__name__) def generate_secret() -> str: @@ -54,25 +57,44 @@ async def create_client(data: CreateClient) -> CreateClientResult: AlreadyExistsError: If a client with the same ID already exists. ValueError: If client data validation fails. """ - async with SessionLocal() as session: - now = utc_now() - secret = generate_secret() - secret_hash = password_hash.hash(secret) - db_client = DBClient( - id=uuid.uuid7(), - name=data.name, - scopes=data.scopes, - is_active=data.is_active, - secret_hash=secret_hash, - token_version=1, - created_at=now, - updated_at=now, - ) - session.add(db_client) - - await session.commit() + start_time = time.perf_counter() + logger.info( + "creating client", + extra={"client_name": data.name, "scopes": data.scopes, "is_active": data.is_active}, + ) - return CreateClientResult(id=db_client.id, secret=secret) + try: + async with SessionLocal() as session: + now = utc_now() + secret = generate_secret() + secret_hash = password_hash.hash(secret) + db_client = DBClient( + id=uuid.uuid7(), + name=data.name, + scopes=data.scopes, + is_active=data.is_active, + secret_hash=secret_hash, + token_version=1, + created_at=now, + updated_at=now, + ) + session.add(db_client) + + await session.commit() + + duration = time.perf_counter() - start_time + logger.info( + "client created", + extra={"client_id": str(db_client.id), "client_name": data.name, "duration": f"{duration:.4f}s"}, + ) + return CreateClientResult(id=db_client.id, secret=secret) + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "client creation failed", + extra={"client_name": data.name, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise async def get_client(client_id: uuid.UUID) -> Client: @@ -175,31 +197,68 @@ async def issue_jwt_client_token(client_id: uuid.UUID, client_secret: str) -> Cl Raises: InvalidClient: If client credentials are invalid or client is disabled. """ - async with SessionLocal() as session: - db_client = await _get_entity(session, DBClient, client_id, "Client not found", raise_exception=False) - if not db_client: - raise InvalidClient("Client not found") from None - if not db_client.is_active: - raise InvalidClient("Client disabled") from None - if password_hash.verify(client_secret, db_client.secret_hash) is False: - raise InvalidClient("Client secret is invalid") from None - - now = utc_now() - expires_in = now + datetime.timedelta(minutes=settings.auth_access_token_expire_minutes) - payload = { - "sub": str(client_id), - "exp": expires_in, - "iat": now, - "scope": db_client.scopes, - "ver": db_client.token_version, - } - access_token = jwt.encode(payload, key=settings.auth_secret_key, algorithm=settings.auth_algorithm) - - return ClientToken( - access_token=access_token, - expires_in=int((expires_in - now).total_seconds()), - scope=db_client.scopes, - ) + start_time = time.perf_counter() + logger.info("issuing jwt token", extra={"client_id": str(client_id)}) + + try: + async with SessionLocal() as session: + db_client = await _get_entity( + session, DBClient, client_id, "Client not found", raise_exception=False + ) + if not db_client: + logger.warning("token issuance failed: client not found", extra={"client_id": str(client_id)}) + raise InvalidClient("Client not found") from None + if not db_client.is_active: + logger.warning( + "token issuance failed: client disabled", + extra={"client_id": str(client_id), "client_name": db_client.name}, + ) + raise InvalidClient("Client disabled") from None + if password_hash.verify(client_secret, db_client.secret_hash) is False: + logger.warning( + "token issuance failed: invalid secret", + extra={"client_id": str(client_id), "client_name": db_client.name}, + ) + raise InvalidClient("Client secret is invalid") from None + + now = utc_now() + expires_in = now + datetime.timedelta(minutes=settings.auth_access_token_expire_minutes) + payload = { + "sub": str(client_id), + "exp": expires_in, + "iat": now, + "scope": db_client.scopes, + "ver": db_client.token_version, + } + access_token = jwt.encode( + payload, key=settings.auth_secret_key, algorithm=settings.auth_algorithm + ) + + duration = time.perf_counter() - start_time + logger.info( + "jwt token issued", + extra={ + "client_id": str(client_id), + "client_name": db_client.name, + "scopes": db_client.scopes, + "expires_in_minutes": settings.auth_access_token_expire_minutes, + "duration": f"{duration:.4f}s", + }, + ) + return ClientToken( + access_token=access_token, + expires_in=int((expires_in - now).total_seconds()), + scope=db_client.scopes, + ) + except InvalidClient: + raise + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "token issuance failed", + extra={"client_id": str(client_id), "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise async def decode_jwt_client_token(access_token: str, auth_enabled: bool = True) -> DecodedClientToken: @@ -219,28 +278,71 @@ async def decode_jwt_client_token(access_token: str, auth_enabled: bool = True) InvalidClient: If token is invalid, expired, or client is disabled/revoked. """ if not auth_enabled: + logger.debug("authentication disabled, returning test token") return DecodedClientToken(client_id=uuid.uuid7(), scopes={"*"}) + start_time = time.perf_counter() + logger.debug("decoding jwt token") + try: payload = jwt.decode( access_token, key=settings.auth_secret_key, algorithms=[settings.auth_algorithm], ) - except JWTError: + except JWTError as e: + logger.warning("jwt token decode failed: invalid token", extra={"error": str(e)}) raise InvalidClient("Invalid jwt token") from None client_id = payload["sub"] scopes = payload["scope"] token_version = payload["ver"] - async with SessionLocal() as session: - db_client = await _get_entity(session, DBClient, client_id, "Client not found", raise_exception=False) - if not db_client: - raise InvalidClient("Client not found") from None - if not db_client.is_active: - raise InvalidClient("Client disabled") from None - if token_version != db_client.token_version: - raise InvalidClient("Token revoked") from None - - return DecodedClientToken(client_id=uuid.UUID(client_id), scopes={scope for scope in scopes.split()}) + try: + async with SessionLocal() as session: + db_client = await _get_entity( + session, DBClient, client_id, "Client not found", raise_exception=False + ) + if not db_client: + logger.warning( + "jwt token validation failed: client not found", extra={"client_id": client_id} + ) + raise InvalidClient("Client not found") from None + if not db_client.is_active: + logger.warning( + "jwt token validation failed: client disabled", + extra={"client_id": client_id, "client_name": db_client.name}, + ) + raise InvalidClient("Client disabled") from None + if token_version != db_client.token_version: + logger.warning( + "jwt token validation failed: token revoked", + extra={ + "client_id": client_id, + "client_name": db_client.name, + "token_version": token_version, + "current_version": db_client.token_version, + }, + ) + raise InvalidClient("Token revoked") from None + + duration = time.perf_counter() - start_time + logger.debug( + "jwt token validated", + extra={ + "client_id": client_id, + "client_name": db_client.name, + "scopes": scopes, + "duration": f"{duration:.4f}s", + }, + ) + return DecodedClientToken(client_id=uuid.UUID(client_id), scopes={scope for scope in scopes.split()}) + except InvalidClient: + raise + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "jwt token validation failed", + extra={"client_id": client_id, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise diff --git a/fastpubsub/services/helpers.py b/fastpubsub/services/helpers.py index 2bb6c57..93e4ea6 100644 --- a/fastpubsub/services/helpers.py +++ b/fastpubsub/services/helpers.py @@ -1,12 +1,16 @@ """Helper functions for service layer operations.""" import datetime +import time import uuid from sqlalchemy import select, text from fastpubsub.database import SessionLocal from fastpubsub.exceptions import NotFoundError +from fastpubsub.logger import get_logger + +logger = get_logger(__name__) def utc_now(): @@ -75,9 +79,48 @@ async def _execute_sql_command(query: str, params: dict) -> bool: Returns: True if exactly one row was affected, False otherwise """ + start_time = time.perf_counter() stmt = text(query) - async with SessionLocal() as session: - result = await session.execute(stmt, params) - rowcount = result.rowcount - await session.commit() - return rowcount == 1 + + try: + async with SessionLocal() as session: + result = await session.execute(stmt, params) + rowcount = result.rowcount + await session.commit() + + duration = time.perf_counter() - start_time + + # Log slow database operations (>100ms) + if duration > 0.1: + logger.warning( + "slow database operation", + extra={ + "query": query[:100] + "..." if len(query) > 100 else query, + "params": str(params)[:200] + "..." if len(str(params)) > 200 else str(params), + "rowcount": rowcount, + "duration": f"{duration:.4f}s", + }, + ) + elif duration > 0.01: # Log operations >10ms at debug level + logger.debug( + "database operation completed", + extra={ + "query": query[:50] + "..." if len(query) > 50 else query, + "rowcount": rowcount, + "duration": f"{duration:.4f}s", + }, + ) + + return rowcount == 1 + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "database operation failed", + extra={ + "query": query[:100] + "..." if len(query) > 100 else query, + "params": str(params)[:200] + "..." if len(str(params)) > 200 else str(params), + "error": str(e), + "duration": f"{duration:.4f}s", + }, + ) + raise diff --git a/fastpubsub/services/messages.py b/fastpubsub/services/messages.py index 8b3a279..3ab5db9 100644 --- a/fastpubsub/services/messages.py +++ b/fastpubsub/services/messages.py @@ -1,5 +1,6 @@ """Message operations service for publishing, consuming, and managing pub/sub messages.""" +import time import uuid from typing import Any @@ -7,9 +8,12 @@ from sqlalchemy import select, text from fastpubsub.database import SessionLocal +from fastpubsub.logger import get_logger from fastpubsub.models import Message, SubscriptionMetrics from fastpubsub.services.helpers import _execute_sql_command +logger = get_logger(__name__) + async def publish_messages(topic_id: str, messages: list[dict[str, Any]]) -> int: """Publish messages to a topic. @@ -21,19 +25,45 @@ async def publish_messages(topic_id: str, messages: list[dict[str, Any]]) -> int Returns: Number of messages successfully published. """ - query = "SELECT publish_messages(:topic_id, CAST(:messages AS jsonb[]))" - stmt = text(query).bindparams(topic_id=topic_id, messages=messages) - jsonb_array = [Json(m) for m in messages] - - async with SessionLocal() as session: - result = await session.execute( - stmt, - {"topic_id": topic_id, "messages": jsonb_array}, + start_time = time.perf_counter() + logger.info("publishing messages", extra={"topic_id": topic_id, "message_count": len(messages)}) + + try: + query = "SELECT publish_messages(:topic_id, CAST(:messages AS jsonb[]))" + stmt = text(query).bindparams(topic_id=topic_id, messages=messages) + jsonb_array = [Json(m) for m in messages] + + async with SessionLocal() as session: + result = await session.execute( + stmt, + {"topic_id": topic_id, "messages": jsonb_array}, + ) + count = result.scalar_one() + await session.commit() + + duration = time.perf_counter() - start_time + logger.info( + "messages published", + extra={ + "topic_id": topic_id, + "published_count": count, + "requested_count": len(messages), + "duration": f"{duration:.4f}s", + }, ) - count = result.scalar_one() - await session.commit() - - return count + return count + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "message publishing failed", + extra={ + "topic_id": topic_id, + "message_count": len(messages), + "error": str(e), + "duration": f"{duration:.4f}s", + }, + ) + raise async def consume_messages(subscription_id: str, consumer_id: str, batch_size: int) -> list[Message]: @@ -47,22 +77,54 @@ async def consume_messages(subscription_id: str, consumer_id: str, batch_size: i Returns: List of available messages for consumption. """ - query = "SELECT * FROM consume_messages(:subscription_id, :consumer_id, :batch_size)" - stmt = text(query) - - async with SessionLocal() as session: - result = await session.execute( - stmt, - { + start_time = time.perf_counter() + logger.info( + "consuming messages", + extra={"subscription_id": subscription_id, "consumer_id": consumer_id, "batch_size": batch_size}, + ) + + try: + query = "SELECT * FROM consume_messages(:subscription_id, :consumer_id, :batch_size)" + stmt = text(query) + + async with SessionLocal() as session: + result = await session.execute( + stmt, + { + "subscription_id": subscription_id, + "consumer_id": consumer_id, + "batch_size": batch_size, + }, + ) + rows = result.mappings().all() + await session.commit() + + messages = [Message(**row) for row in rows] + duration = time.perf_counter() - start_time + logger.info( + "messages consumed", + extra={ + "subscription_id": subscription_id, + "consumer_id": consumer_id, + "consumed_count": len(messages), + "requested_batch_size": batch_size, + "duration": f"{duration:.4f}s", + }, + ) + return messages + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "message consumption failed", + extra={ "subscription_id": subscription_id, "consumer_id": consumer_id, "batch_size": batch_size, + "error": str(e), + "duration": f"{duration:.4f}s", }, ) - rows = result.mappings().all() - await session.commit() - - return [Message(**row) for row in rows] + raise async def ack_messages(subscription_id: str, message_ids: list[uuid.UUID]) -> bool: @@ -75,8 +137,46 @@ async def ack_messages(subscription_id: str, message_ids: list[uuid.UUID]) -> bo Returns: True if exactly one row was affected, False otherwise. """ - query = "SELECT ack_messages(:subscription_id, :message_ids)" - return await _execute_sql_command(query, {"subscription_id": subscription_id, "message_ids": message_ids}) + start_time = time.perf_counter() + logger.info( + "acknowledging messages", + extra={ + "subscription_id": subscription_id, + "message_ids": [str(mid) for mid in message_ids], + "message_count": len(message_ids), + }, + ) + + try: + query = "SELECT ack_messages(:subscription_id, :message_ids)" + result = await _execute_sql_command( + query, {"subscription_id": subscription_id, "message_ids": message_ids} + ) + + duration = time.perf_counter() - start_time + logger.info( + "messages acknowledged", + extra={ + "subscription_id": subscription_id, + "message_count": len(message_ids), + "success": result, + "duration": f"{duration:.4f}s", + }, + ) + return result + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "message acknowledgment failed", + extra={ + "subscription_id": subscription_id, + "message_ids": [str(mid) for mid in message_ids], + "message_count": len(message_ids), + "error": str(e), + "duration": f"{duration:.4f}s", + }, + ) + raise async def nack_messages(subscription_id: str, message_ids: list[uuid.UUID]) -> bool: @@ -89,8 +189,46 @@ async def nack_messages(subscription_id: str, message_ids: list[uuid.UUID]) -> b Returns: True if exactly one row was affected, False otherwise. """ - query = "SELECT nack_messages(:subscription_id, :message_ids)" - return await _execute_sql_command(query, {"subscription_id": subscription_id, "message_ids": message_ids}) + start_time = time.perf_counter() + logger.warning( + "negatively acknowledging messages", + extra={ + "subscription_id": subscription_id, + "message_ids": [str(mid) for mid in message_ids], + "message_count": len(message_ids), + }, + ) + + try: + query = "SELECT nack_messages(:subscription_id, :message_ids)" + result = await _execute_sql_command( + query, {"subscription_id": subscription_id, "message_ids": message_ids} + ) + + duration = time.perf_counter() - start_time + logger.warning( + "messages negatively acknowledged", + extra={ + "subscription_id": subscription_id, + "message_count": len(message_ids), + "success": result, + "duration": f"{duration:.4f}s", + }, + ) + return result + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "message negative acknowledgment failed", + extra={ + "subscription_id": subscription_id, + "message_ids": [str(mid) for mid in message_ids], + "message_count": len(message_ids), + "error": str(e), + "duration": f"{duration:.4f}s", + }, + ) + raise async def list_dlq_messages(subscription_id: str, offset: int = 0, limit: int = 100) -> list[Message]: @@ -144,8 +282,34 @@ async def cleanup_stuck_messages(lock_timeout_seconds: int) -> bool: Returns: True if cleanup was successful, False otherwise. """ - query = "SELECT cleanup_stuck_messages(make_interval(secs => :timeout))" - return await _execute_sql_command(query, {"timeout": lock_timeout_seconds}) + start_time = time.perf_counter() + logger.info("cleaning up stuck messages", extra={"lock_timeout_seconds": lock_timeout_seconds}) + + try: + query = "SELECT cleanup_stuck_messages(make_interval(secs => :timeout))" + result = await _execute_sql_command(query, {"timeout": lock_timeout_seconds}) + + duration = time.perf_counter() - start_time + logger.info( + "stuck messages cleanup completed", + extra={ + "lock_timeout_seconds": lock_timeout_seconds, + "success": result, + "duration": f"{duration:.4f}s", + }, + ) + return result + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "stuck messages cleanup failed", + extra={ + "lock_timeout_seconds": lock_timeout_seconds, + "error": str(e), + "duration": f"{duration:.4f}s", + }, + ) + raise async def cleanup_acked_messages(older_than_seconds: int) -> bool: @@ -157,8 +321,30 @@ async def cleanup_acked_messages(older_than_seconds: int) -> bool: Returns: True if cleanup was successful, False otherwise. """ - query = "SELECT cleanup_acked_messages(make_interval(secs => :older_than))" - return await _execute_sql_command(query, {"older_than": older_than_seconds}) + start_time = time.perf_counter() + logger.info("cleaning up acknowledged messages", extra={"older_than_seconds": older_than_seconds}) + + try: + query = "SELECT cleanup_acked_messages(make_interval(secs => :older_than))" + result = await _execute_sql_command(query, {"older_than": older_than_seconds}) + + duration = time.perf_counter() - start_time + logger.info( + "acknowledged messages cleanup completed", + extra={ + "older_than_seconds": older_than_seconds, + "success": result, + "duration": f"{duration:.4f}s", + }, + ) + return result + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "acknowledged messages cleanup failed", + extra={"older_than_seconds": older_than_seconds, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise async def subscription_metrics(subscription_id: str) -> SubscriptionMetrics: diff --git a/fastpubsub/services/subscriptions.py b/fastpubsub/services/subscriptions.py index 9bc258c..ea820e7 100644 --- a/fastpubsub/services/subscriptions.py +++ b/fastpubsub/services/subscriptions.py @@ -1,14 +1,19 @@ """Subscription management services for creating and managing topic subscriptions.""" +import time + from sqlalchemy import select from sqlalchemy.exc import IntegrityError from fastpubsub.database import is_foreign_key_violation, is_unique_violation, SessionLocal from fastpubsub.database import Subscription as DBSubscription from fastpubsub.exceptions import AlreadyExistsError, NotFoundError +from fastpubsub.logger import get_logger from fastpubsub.models import CreateSubscription, Subscription from fastpubsub.services.helpers import _delete_entity, _get_entity, utc_now +logger = get_logger(__name__) + async def create_subscription(data: CreateSubscription) -> Subscription: """Create a new subscription to a topic. @@ -23,28 +28,66 @@ async def create_subscription(data: CreateSubscription) -> Subscription: AlreadyExistsError: If a subscription with the same ID already exists. NotFoundError: If the specified topic doesn't exist. """ - async with SessionLocal() as session: - db_subscription = DBSubscription( - id=data.id, - topic_id=data.topic_id, - filter=data.filter, - max_delivery_attempts=data.max_delivery_attempts, - backoff_min_seconds=data.backoff_min_seconds, - backoff_max_seconds=data.backoff_max_seconds, - created_at=utc_now(), + start_time = time.perf_counter() + logger.info( + "creating subscription", + extra={ + "subscription_id": data.id, + "topic_id": data.topic_id, + "max_delivery_attempts": data.max_delivery_attempts, + }, + ) + + try: + async with SessionLocal() as session: + db_subscription = DBSubscription( + id=data.id, + topic_id=data.topic_id, + filter=data.filter, + max_delivery_attempts=data.max_delivery_attempts, + backoff_min_seconds=data.backoff_min_seconds, + backoff_max_seconds=data.backoff_max_seconds, + created_at=utc_now(), + ) + session.add(db_subscription) + + try: + await session.commit() + except IntegrityError as exc: + if is_unique_violation(exc): + logger.warning( + "subscription creation failed: subscription already exists", + extra={"subscription_id": data.id, "topic_id": data.topic_id}, + ) + raise AlreadyExistsError("This subscription already exists") from None + if is_foreign_key_violation(exc): + logger.warning( + "subscription creation failed: topic not found", + extra={"subscription_id": data.id, "topic_id": data.topic_id}, + ) + raise NotFoundError("Topic not found") from None + raise + + duration = time.perf_counter() - start_time + logger.info( + "subscription created", + extra={"subscription_id": data.id, "topic_id": data.topic_id, "duration": f"{duration:.4f}s"}, ) - session.add(db_subscription) - - try: - await session.commit() - except IntegrityError as exc: - if is_unique_violation(exc): - raise AlreadyExistsError("This subscription already exists") from None - if is_foreign_key_violation(exc): - raise NotFoundError("Topic not found") from None - raise - return Subscription(**db_subscription.to_dict()) + except (AlreadyExistsError, NotFoundError): + raise + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "subscription creation failed", + extra={ + "subscription_id": data.id, + "topic_id": data.topic_id, + "error": str(e), + "duration": f"{duration:.4f}s", + }, + ) + raise async def get_subscription(subscription_id: str) -> Subscription: @@ -59,11 +102,32 @@ async def get_subscription(subscription_id: str) -> Subscription: Raises: NotFoundError: If no subscription with the given ID exists. """ - async with SessionLocal() as session: - db_subscription = await _get_entity( - session, DBSubscription, subscription_id, "Subscription not found" + start_time = time.perf_counter() + logger.debug("getting subscription", extra={"subscription_id": subscription_id}) + + try: + async with SessionLocal() as session: + db_subscription = await _get_entity( + session, DBSubscription, subscription_id, "Subscription not found" + ) + + duration = time.perf_counter() - start_time + logger.debug( + "subscription retrieved", + extra={ + "subscription_id": subscription_id, + "topic_id": db_subscription.topic_id, + "duration": f"{duration:.4f}s", + }, ) return Subscription(**db_subscription.to_dict()) + except Exception as e: + duration = time.perf_counter() - start_time + logger.warning( + "subscription retrieval failed", + extra={"subscription_id": subscription_id, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise async def list_subscription(offset: int, limit: int) -> list[Subscription]: @@ -76,11 +140,34 @@ async def list_subscription(offset: int, limit: int) -> list[Subscription]: Returns: List of Subscription models. """ - async with SessionLocal() as session: - stmt = select(DBSubscription).order_by(DBSubscription.id.asc()).offset(offset).limit(limit) - result = await session.execute(stmt) - db_subscriptions = result.scalars().all() - return [Subscription(**db_subscription.to_dict()) for db_subscription in db_subscriptions] + start_time = time.perf_counter() + logger.debug("listing subscriptions", extra={"offset": offset, "limit": limit}) + + try: + async with SessionLocal() as session: + stmt = select(DBSubscription).order_by(DBSubscription.id.asc()).offset(offset).limit(limit) + result = await session.execute(stmt) + db_subscriptions = result.scalars().all() + + subscriptions = [Subscription(**db_subscription.to_dict()) for db_subscription in db_subscriptions] + duration = time.perf_counter() - start_time + logger.debug( + "subscriptions listed", + extra={ + "offset": offset, + "limit": limit, + "returned_count": len(subscriptions), + "duration": f"{duration:.4f}s", + }, + ) + return subscriptions + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "subscription listing failed", + extra={"offset": offset, "limit": limit, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise async def delete_subscription(subscription_id: str) -> None: @@ -92,5 +179,21 @@ async def delete_subscription(subscription_id: str) -> None: Raises: NotFoundError: If no subscription with the given ID exists. """ - async with SessionLocal() as session: - await _delete_entity(session, DBSubscription, subscription_id, "Subscription not found") + start_time = time.perf_counter() + logger.info("deleting subscription", extra={"subscription_id": subscription_id}) + + try: + async with SessionLocal() as session: + await _delete_entity(session, DBSubscription, subscription_id, "Subscription not found") + + duration = time.perf_counter() - start_time + logger.info( + "subscription deleted", extra={"subscription_id": subscription_id, "duration": f"{duration:.4f}s"} + ) + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "subscription deletion failed", + extra={"subscription_id": subscription_id, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise diff --git a/fastpubsub/services/topics.py b/fastpubsub/services/topics.py index 0991854..ee5377f 100644 --- a/fastpubsub/services/topics.py +++ b/fastpubsub/services/topics.py @@ -1,14 +1,19 @@ """Topic management services for creating and managing pub/sub topics.""" +import time + from sqlalchemy import select from sqlalchemy.exc import IntegrityError from fastpubsub.database import is_unique_violation, SessionLocal from fastpubsub.database import Topic as DBTopic from fastpubsub.exceptions import AlreadyExistsError +from fastpubsub.logger import get_logger from fastpubsub.models import CreateTopic, Topic from fastpubsub.services.helpers import _delete_entity, _get_entity, utc_now +logger = get_logger(__name__) + async def create_topic(data: CreateTopic) -> Topic: """Create a new topic in the pub/sub system. @@ -22,18 +27,34 @@ async def create_topic(data: CreateTopic) -> Topic: Raises: AlreadyExistsError: If a topic with the same ID already exists. """ - async with SessionLocal() as session: - db_topic = DBTopic(id=data.id, created_at=utc_now()) - session.add(db_topic) - - try: - await session.commit() - except IntegrityError as exc: - if is_unique_violation(exc): - raise AlreadyExistsError("This topic already exists") from None - raise - + start_time = time.perf_counter() + logger.info("creating topic", extra={"topic_id": data.id}) + + try: + async with SessionLocal() as session: + db_topic = DBTopic(id=data.id, created_at=utc_now()) + session.add(db_topic) + + try: + await session.commit() + except IntegrityError as exc: + if is_unique_violation(exc): + logger.warning("topic creation failed: topic already exists", extra={"topic_id": data.id}) + raise AlreadyExistsError("This topic already exists") from None + raise + + duration = time.perf_counter() - start_time + logger.info("topic created", extra={"topic_id": data.id, "duration": f"{duration:.4f}s"}) return Topic(**db_topic.to_dict()) + except AlreadyExistsError: + raise + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "topic creation failed", + extra={"topic_id": data.id, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise async def get_topic(topic_id: str) -> Topic: @@ -48,9 +69,23 @@ async def get_topic(topic_id: str) -> Topic: Raises: NotFoundError: If no topic with the given ID exists. """ - async with SessionLocal() as session: - db_topic = await _get_entity(session, DBTopic, topic_id, "Topic not found") + start_time = time.perf_counter() + logger.debug("getting topic", extra={"topic_id": topic_id}) + + try: + async with SessionLocal() as session: + db_topic = await _get_entity(session, DBTopic, topic_id, "Topic not found") + + duration = time.perf_counter() - start_time + logger.debug("topic retrieved", extra={"topic_id": topic_id, "duration": f"{duration:.4f}s"}) return Topic(**db_topic.to_dict()) + except Exception as e: + duration = time.perf_counter() - start_time + logger.warning( + "topic retrieval failed", + extra={"topic_id": topic_id, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise async def list_topic(offset: int, limit: int) -> list[Topic]: @@ -63,11 +98,34 @@ async def list_topic(offset: int, limit: int) -> list[Topic]: Returns: List of Topic models. """ - async with SessionLocal() as session: - stmt = select(DBTopic).order_by(DBTopic.id.asc()).offset(offset).limit(limit) - result = await session.execute(stmt) - db_topics = result.scalars().all() - return [Topic(**db_topic.to_dict()) for db_topic in db_topics] + start_time = time.perf_counter() + logger.debug("listing topics", extra={"offset": offset, "limit": limit}) + + try: + async with SessionLocal() as session: + stmt = select(DBTopic).order_by(DBTopic.id.asc()).offset(offset).limit(limit) + result = await session.execute(stmt) + db_topics = result.scalars().all() + + topics = [Topic(**db_topic.to_dict()) for db_topic in db_topics] + duration = time.perf_counter() - start_time + logger.debug( + "topics listed", + extra={ + "offset": offset, + "limit": limit, + "returned_count": len(topics), + "duration": f"{duration:.4f}s", + }, + ) + return topics + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "topic listing failed", + extra={"offset": offset, "limit": limit, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise async def delete_topic(topic_id: str) -> None: @@ -79,5 +137,19 @@ async def delete_topic(topic_id: str) -> None: Raises: NotFoundError: If no topic with the given ID exists. """ - async with SessionLocal() as session: - await _delete_entity(session, DBTopic, topic_id, "Topic not found") + start_time = time.perf_counter() + logger.info("deleting topic", extra={"topic_id": topic_id}) + + try: + async with SessionLocal() as session: + await _delete_entity(session, DBTopic, topic_id, "Topic not found") + + duration = time.perf_counter() - start_time + logger.info("topic deleted", extra={"topic_id": topic_id, "duration": f"{duration:.4f}s"}) + except Exception as e: + duration = time.perf_counter() - start_time + logger.error( + "topic deletion failed", + extra={"topic_id": topic_id, "error": str(e), "duration": f"{duration:.4f}s"}, + ) + raise diff --git a/uv.lock b/uv.lock index 425aad9..1419c87 100644 --- a/uv.lock +++ b/uv.lock @@ -450,11 +450,11 @@ dev = [ [[package]] name = "filelock" -version = "3.20.1" +version = "3.20.2" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/a7/23/ce7a1126827cedeb958fc043d61745754464eb56c5937c35bbf2b8e26f34/filelock-3.20.1.tar.gz", hash = "sha256:b8360948b351b80f420878d8516519a2204b07aefcdcfd24912a5d33127f188c", size = 19476, upload-time = "2025-12-15T23:54:28.027Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c1/e0/a75dbe4bca1e7d41307323dad5ea2efdd95408f74ab2de8bd7dba9b51a1a/filelock-3.20.2.tar.gz", hash = "sha256:a2241ff4ddde2a7cebddf78e39832509cb045d18ec1a09d7248d6bfc6bfbbe64", size = 19510, upload-time = "2026-01-02T15:33:32.582Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/e3/7f/a1a97644e39e7316d850784c642093c99df1290a460df4ede27659056834/filelock-3.20.1-py3-none-any.whl", hash = "sha256:15d9e9a67306188a44baa72f569d2bfd803076269365fdea0934385da4dc361a", size = 16666, upload-time = "2025-12-15T23:54:26.874Z" }, + { url = "https://files.pythonhosted.org/packages/9a/30/ab407e2ec752aa541704ed8f93c11e2a5d92c168b8a755d818b74a3c5c2d/filelock-3.20.2-py3-none-any.whl", hash = "sha256:fbba7237d6ea277175a32c54bb71ef814a8546d8601269e1bfc388de333974e8", size = 16697, upload-time = "2026-01-02T15:33:31.133Z" }, ] [[package]] From 357b2a52b0a86e6b97296dd56100b9c89a8db4cb Mon Sep 17 00:00:00 2001 From: Allisson Azevedo Date: Sat, 3 Jan 2026 11:30:34 -0300 Subject: [PATCH 2/6] Update fastpubsub/services/topics.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- fastpubsub/services/topics.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fastpubsub/services/topics.py b/fastpubsub/services/topics.py index ee5377f..a0f6c7e 100644 --- a/fastpubsub/services/topics.py +++ b/fastpubsub/services/topics.py @@ -46,8 +46,6 @@ async def create_topic(data: CreateTopic) -> Topic: duration = time.perf_counter() - start_time logger.info("topic created", extra={"topic_id": data.id, "duration": f"{duration:.4f}s"}) return Topic(**db_topic.to_dict()) - except AlreadyExistsError: - raise except Exception as e: duration = time.perf_counter() - start_time logger.error( From efb7f3c53c1f386ed798a4b44bd6c920e07184af Mon Sep 17 00:00:00 2001 From: Allisson Azevedo Date: Sat, 3 Jan 2026 11:30:53 -0300 Subject: [PATCH 3/6] Update fastpubsub/services/subscriptions.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- fastpubsub/services/subscriptions.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fastpubsub/services/subscriptions.py b/fastpubsub/services/subscriptions.py index ea820e7..6784195 100644 --- a/fastpubsub/services/subscriptions.py +++ b/fastpubsub/services/subscriptions.py @@ -74,8 +74,6 @@ async def create_subscription(data: CreateSubscription) -> Subscription: extra={"subscription_id": data.id, "topic_id": data.topic_id, "duration": f"{duration:.4f}s"}, ) return Subscription(**db_subscription.to_dict()) - except (AlreadyExistsError, NotFoundError): - raise except Exception as e: duration = time.perf_counter() - start_time logger.error( From 56364c0b3c2cde84f235060cbc60dd50f5489a68 Mon Sep 17 00:00:00 2001 From: Allisson Azevedo Date: Sat, 3 Jan 2026 11:31:29 -0300 Subject: [PATCH 4/6] Update fastpubsub/services/clients.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- fastpubsub/services/clients.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fastpubsub/services/clients.py b/fastpubsub/services/clients.py index 8866150..a1ece64 100644 --- a/fastpubsub/services/clients.py +++ b/fastpubsub/services/clients.py @@ -250,8 +250,6 @@ async def issue_jwt_client_token(client_id: uuid.UUID, client_secret: str) -> Cl expires_in=int((expires_in - now).total_seconds()), scope=db_client.scopes, ) - except InvalidClient: - raise except Exception as e: duration = time.perf_counter() - start_time logger.error( From 3d7e29a6090c6c8ac1e145865a521f4412142df9 Mon Sep 17 00:00:00 2001 From: Allisson Azevedo Date: Sat, 3 Jan 2026 11:31:36 -0300 Subject: [PATCH 5/6] Update fastpubsub/services/clients.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- fastpubsub/services/clients.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fastpubsub/services/clients.py b/fastpubsub/services/clients.py index a1ece64..9de27ec 100644 --- a/fastpubsub/services/clients.py +++ b/fastpubsub/services/clients.py @@ -335,8 +335,6 @@ async def decode_jwt_client_token(access_token: str, auth_enabled: bool = True) }, ) return DecodedClientToken(client_id=uuid.UUID(client_id), scopes={scope for scope in scopes.split()}) - except InvalidClient: - raise except Exception as e: duration = time.perf_counter() - start_time logger.error( From 73424342aa3c791905ef2f3cadd7df2f6f1a77aa Mon Sep 17 00:00:00 2001 From: Allisson Azevedo Date: Sat, 3 Jan 2026 11:46:03 -0300 Subject: [PATCH 6/6] feat(api): add generic exception handler Add a generic exception handler in the FastAPI app to catch unhandled exceptions and return a 500 Internal Server Error with a generic message, preventing leakage of sensitive application internals. Also, update test fixture to downgrade migrations to 'base' instead of '-1' for cleaner teardown. --- fastpubsub/api/app.py | 21 ++++++++++++++++++++- tests/conftest.py | 2 +- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/fastpubsub/api/app.py b/fastpubsub/api/app.py index 2da6e5a..091c270 100644 --- a/fastpubsub/api/app.py +++ b/fastpubsub/api/app.py @@ -1,7 +1,7 @@ """FastAPI application setup and configuration.""" from fastapi import FastAPI, Request, status -from fastapi.responses import ORJSONResponse +from fastapi.responses import JSONResponse, ORJSONResponse from prometheus_fastapi_instrumentator import Instrumentator from fastpubsub import models @@ -132,6 +132,25 @@ def invalid_client_token_exception_handler(request: Request, exc: InvalidClientT """ return _create_error_response(models.GenericError, status.HTTP_403_FORBIDDEN, exc) + @app.exception_handler(Exception) + def generic_exception_handler(request: Request, exc: Exception): + """Handle generic Exception instances. + + Catches any unhandled exceptions that don't have specific handlers. + Returns a generic 500 Internal Server Error response to avoid leaking + sensitive information about the application internals. + + Args: + request: The incoming HTTP request that caused the exception. + exc: The unhandled exception that was raised. + + Returns: + JSON error response with 500 status code and generic error message. + """ + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": "internal server error"} + ) + # Add routers app.include_router(topics.router) app.include_router(subscriptions.router) diff --git a/tests/conftest.py b/tests/conftest.py index 194aeda..eb20335 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,7 +29,7 @@ async def async_engine(): """ await run_migrations(command_type="upgrade", revision="head") yield engine - await run_migrations(command_type="downgrade", revision="-1") + await run_migrations(command_type="downgrade", revision="base") await engine.dispose()