-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdependencies.py
More file actions
77 lines (56 loc) · 2.63 KB
/
dependencies.py
File metadata and controls
77 lines (56 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import logging
from dataclasses import dataclass
from fastapi import HTTPException, Request
from jose import ExpiredSignatureError, JWTError, jwt
from auth.dao import UserDAO
from auth.service import UserService
from config import config
from exceptions import DatabaseError
logger = logging.getLogger(__name__)
@dataclass
class AuthenticatedUser:
"""Represents the verified user extracted from a JWT."""
id: str
scopes: list[str]
def get_current_user(request: Request) -> AuthenticatedUser:
"""FastAPI dependency that validates the JWT and returns the authenticated user.
Reads the token from the ``jwt`` HttpOnly cookie (browser) or the
``Authorization: Bearer <token>`` header (curl / testing). Raises
``HTTPException(401)`` if the token is missing, invalid, or expired, or if
the referenced user does not exist in the database.
"""
token = _extract_token(request)
try:
payload = jwt.decode(token, config.JWT_SECRET, algorithms=[config.JWT_ALGORITHM])
except ExpiredSignatureError:
logger.warning("JWT validation failed: token expired")
raise HTTPException(status_code=401, detail="Invalid or expired token")
except JWTError as exc:
logger.warning("JWT validation failed: %s", exc)
raise HTTPException(status_code=401, detail="Invalid or expired token")
user_id: str | None = payload.get("user_id")
scopes: list[str] = payload.get("scopes", [])
if not user_id:
logger.warning("JWT payload missing user_id")
raise HTTPException(status_code=401, detail="Invalid or expired token")
_verify_user_exists(user_id)
return AuthenticatedUser(id=user_id, scopes=scopes)
def _extract_token(request: Request) -> str:
"""Return the raw JWT string from the request cookie or Authorization header."""
token = request.cookies.get("jwt")
if token:
return token
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[len("Bearer "):]
raise HTTPException(status_code=401, detail="Invalid or expired token")
def _verify_user_exists(user_id: str) -> None:
"""Confirm the user referenced by the JWT is present in the database."""
try:
user = UserService(UserDAO()).get_user_by_id(user_id)
except DatabaseError:
logger.error("Database error while verifying user_id=%s", user_id)
raise HTTPException(status_code=503, detail="Something went wrong. Please try again.")
if user is None:
logger.warning("JWT references unknown user_id=%s", user_id)
raise HTTPException(status_code=401, detail="User not found")