Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions fastpubsub/api/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""FastAPI application setup and configuration."""

from fastapi import FastAPI, Request, status
from fastapi.responses import ORJSONResponse
from prometheus_fastapi_instrumentator import Instrumentator
Expand Down Expand Up @@ -36,6 +38,14 @@


def create_app() -> FastAPI:
"""Create and configure the FastAPI application.

Sets up the complete application including middleware, exception handlers,
routers, and monitoring instrumentation.

Returns:
Configured FastAPI application instance.
"""
app = FastAPI(
title="fastpubsub",
description="Simple pubsub system based on FastAPI and PostgreSQL.",
Expand All @@ -49,22 +59,77 @@ def create_app() -> FastAPI:
# Add exception handlers
@app.exception_handler(AlreadyExistsError)
def already_exists_exception_handler(request: Request, exc: AlreadyExistsError):
"""Handle AlreadyExistsError exceptions.

Returns a 409 Conflict response when attempting to create resources that already exist.

Args:
request: The incoming HTTP request.
exc: The AlreadyExistsError exception.

Returns:
JSON error response with 409 status code.
"""
return _create_error_response(models.GenericError, status.HTTP_409_CONFLICT, exc)

@app.exception_handler(NotFoundError)
def not_found_exception_handler(request: Request, exc: NotFoundError):
"""Handle NotFoundError exceptions.

Returns a 404 Not Found response when requesting non-existent resources.

Args:
request: The incoming HTTP request.
exc: The NotFoundError exception.

Returns:
JSON error response with 404 status code.
"""
return _create_error_response(models.GenericError, status.HTTP_404_NOT_FOUND, exc)

@app.exception_handler(ServiceUnavailable)
def service_unavailable_exception_handler(request: Request, exc: ServiceUnavailable):
"""Handle ServiceUnavailable exceptions.

Returns a 503 Service Unavailable response when services are unavailable.

Args:
request: The incoming HTTP request.
exc: The ServiceUnavailable exception.

Returns:
JSON error response with 503 status code.
"""
return _create_error_response(models.GenericError, status.HTTP_503_SERVICE_UNAVAILABLE, exc)

@app.exception_handler(InvalidClient)
def invalid_client_exception_handler(request: Request, exc: InvalidClient):
"""Handle InvalidClient exceptions.

Returns a 401 Unauthorized response when client authentication fails.

Args:
request: The incoming HTTP request.
exc: The InvalidClient exception.

Returns:
JSON error response with 401 status code.
"""
return _create_error_response(models.GenericError, status.HTTP_401_UNAUTHORIZED, exc)

@app.exception_handler(InvalidClientToken)
def invalid_client_token_exception_handler(request: Request, exc: InvalidClientToken):
"""Handle InvalidClientToken exceptions.

Returns a 403 Forbidden response when client token is invalid or expired.

Args:
request: The incoming HTTP request.
exc: The InvalidClientToken exception.

Returns:
JSON error response with 403 status code.
"""
return _create_error_response(models.GenericError, status.HTTP_403_FORBIDDEN, exc)

# Add routers
Expand Down
13 changes: 12 additions & 1 deletion fastpubsub/api/helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
"""Helper functions for API responses and error handling."""

from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse


def _create_error_response(model_class, status_code: int, exc: Exception):
"""Helper to create error responses."""
"""Create a standardized error response JSON object.

Args:
model_class: Pydantic model class for the error response.
status_code: HTTP status code for the error.
exc: Exception instance containing the error message.

Returns:
JSONResponse with formatted error content and appropriate status code.
"""
response = jsonable_encoder(model_class(detail=exc.args[0]))
return JSONResponse(status_code=status_code, content=response)
22 changes: 22 additions & 0 deletions fastpubsub/api/middlewares.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""HTTP middleware for request logging and monitoring."""

import time
from uuid import uuid7

Expand All @@ -9,6 +11,26 @@


async def log_requests(request: Request, call_next):
"""Middleware to log HTTP requests and responses with timing and request IDs.

This middleware:
- Generates a unique request ID for tracking
- Logs request details at the start
- Measures processing time
- Logs response details including status code and timing
- Adds request ID header to response
- Handles and logs any exceptions during request processing

Args:
request: The incoming FastAPI request.
call_next: The next middleware or endpoint to call.

Returns:
The processed HTTP response.

Raises:
Exception: Re-raises any exceptions encountered during processing.
"""
start_time = time.perf_counter()
request_id = str(uuid7())
logger.info(
Expand Down
94 changes: 94 additions & 0 deletions fastpubsub/api/routers/clients.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""API endpoints for client management operations."""

import uuid
from typing import Annotated

Expand All @@ -18,6 +20,22 @@ async def create_client(
data: models.CreateClient,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("clients", "create"))],
):
"""Create a new client with specified name and scopes.

Creates a new authorized client that can access the pub/sub API
based on their granted scopes. Returns the client ID and generated secret.

Args:
data: Client creation data including name, scopes, and active status.
token: Decoded client token with 'clients:create' scope.

Returns:
CreateClientResult containing the new client ID and secret.

Raises:
AlreadyExistsError: If a client with the same ID already exists.
InvalidClient: If the requesting client lacks 'clients:create' scope.
"""
return await services.create_client(data)


Expand All @@ -32,6 +50,22 @@ async def get_client(
id: uuid.UUID,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("clients", "read"))],
):
"""Retrieve a client by ID.

Returns the full details of an existing client including ID, name,
scopes, status, and timestamps.

Args:
id: UUID of the client to retrieve.
token: Decoded client token with 'clients:read' scope.

Returns:
Client model with full client details.

Raises:
NotFoundError: If no client with the given ID exists.
InvalidClient: If the requesting client lacks 'clients:read' scope.
"""
return await services.get_client(id)


Expand All @@ -47,6 +81,23 @@ async def update_client(
data: models.UpdateClient,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("clients", "update"))],
):
"""Update an existing client's name, scopes, or active status.

Modifies the properties of an existing client. Only the fields
provided in the update data will be modified.

Args:
id: UUID of the client to update.
data: Updated client data including name, scopes, and/or active status.
token: Decoded client token with 'clients:update' scope.

Returns:
Client model with updated details.

Raises:
NotFoundError: If no client with the given ID exists.
InvalidClient: If the requesting client lacks 'clients:update' scope.
"""
return await services.update_client(id, data)


Expand All @@ -61,6 +112,21 @@ async def list_client(
offset: int = Query(default=0, ge=0),
limit: int = Query(default=10, ge=1, le=100),
):
"""List clients with pagination support.

Returns a paginated list of all clients in the system.

Args:
token: Decoded client token with 'clients:read' scope.
offset: Number of items to skip (for pagination).
limit: Maximum number of items to return (1-100).

Returns:
ListClientAPI containing the list of clients.

Raises:
InvalidClient: If the requesting client lacks 'clients:read' scope.
"""
clients = await services.list_client(offset, limit)
return models.ListClientAPI(data=clients)

Expand All @@ -75,6 +141,18 @@ async def delete_client(
id: uuid.UUID,
token: Annotated[models.DecodedClientToken, Depends(services.require_scope("clients", "delete"))],
):
"""Delete a client by ID.

Permanently removes a client from the system. This action cannot be undone.

Args:
id: UUID of the client to delete.
token: Decoded client token with 'clients:delete' scope.

Raises:
NotFoundError: If no client with the given ID exists.
InvalidClient: If the requesting client lacks 'clients:delete' scope.
"""
await services.delete_client(id)


Expand All @@ -85,4 +163,20 @@ async def delete_client(
summary="Issue a new client token",
)
async def issue_client_token(data: models.IssueClientToken):
"""Issue a new JWT access token for a client.

Generates a new access token that the client can use for authentication
in subsequent API requests. The token includes the client's scopes
and has an expiration time.

Args:
data: Client credentials including ID and secret for authentication.

Returns:
ClientToken containing the access token, type, expiration, and scopes.

Raises:
InvalidClient: If client ID or secret is invalid.
ServiceUnavailable: If token generation service is unavailable.
"""
return await services.issue_jwt_client_token(client_id=data.client_id, client_secret=data.client_secret)
23 changes: 23 additions & 0 deletions fastpubsub/api/routers/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""API endpoints for monitoring and health check operations."""

from fastapi import APIRouter, status

from fastpubsub import models, services
Expand All @@ -13,6 +15,15 @@
summary="Liveness probe",
)
async def liveness_probe():
"""Check if the application is alive.

Simple liveness check that always returns "alive" status.
Used by Kubernetes and other orchestration systems to determine
if the application process is running.

Returns:
HealthCheck model with status "alive".
"""
return models.HealthCheck(status="alive")


Expand All @@ -24,6 +35,18 @@ async def liveness_probe():
summary="Readiness probe",
)
async def readiness_probe():
"""Check if the application is ready to serve traffic.

Comprehensive health check that verifies database connectivity.
Returns "ready" status only if all critical dependencies are available.
Used by Kubernetes to determine if the application can handle requests.

Returns:
HealthCheck model with status "ready".

Raises:
ServiceUnavailable: If database connection fails.
"""
try:
is_db_ok = await services.database_ping()
if not is_db_ok:
Expand Down
Loading