diff --git a/apps/api/plane/authentication/middleware/proxy_auth.py b/apps/api/plane/authentication/middleware/proxy_auth.py index 3ff2bf71475..c91dc5f84f1 100644 --- a/apps/api/plane/authentication/middleware/proxy_auth.py +++ b/apps/api/plane/authentication/middleware/proxy_auth.py @@ -5,6 +5,7 @@ from uuid import uuid4 from django.conf import settings +from django.contrib.auth import logout from django.contrib.auth.hashers import make_password from django.db import IntegrityError @@ -55,30 +56,29 @@ def __init__(self, get_response): ) def __call__(self, request): - # Layer 2 session already valid — nothing to do. - if request.user.is_authenticated: - return self.get_response(request) - # Bypass paths use their own auth (god-mode local login, instance admin). # TODO(mpass): Keep OPTIONS bypass at the proxy layer; add an app-level # fallback here only if preflight routing becomes inconsistent. if _is_bypass_path(request.path, self.bypass_paths): return self.get_response(request) - email = (request.META.get("HTTP_X_AUTH_REQUEST_EMAIL") or "").strip() - if email and "@" not in email: - # Header holds a bare username (user_id_claim=cognito:username). Synth email. - domain = getattr(settings, "DEFAULT_EMAIL_DOMAIN", "askii.ai") - email = f"{email}@{domain}" - if not email: - username = (request.META.get("HTTP_X_AUTH_REQUEST_USER") or "").strip() - domain = getattr(settings, "DEFAULT_EMAIL_DOMAIN", "askii.ai") - if username: - email = f"{username}@{domain}" - if not email: - return self.get_response(request) + email = _normalise_email(self._read_proxy_email(request)) + + if request.user.is_authenticated: + # Short-circuit only when the upstream-asserted identity matches the + # current Django session, or when no header is present (request did + # not pass through ForwardAuth — header absence is not a logout signal). + current = _normalise_email(request.user.email or "") + if not email or current == email: + return self.get_response(request) + + # Mismatch detected: proxy asserts a different identity than the + # current session. Flush the stale session immediately so that if + # subsequent re-auth fails (e.g., incoming user is inactive), the + # request proceeds as unauthenticated rather than retaining the + # previous user's identity. + logout(request) - email = _normalise_email(email) if not email: return self.get_response(request) @@ -92,6 +92,38 @@ def __call__(self, request): user_login(request=request, user=user, is_app=True) return self.get_response(request) + @staticmethod + def _read_proxy_email(request): + """Extract the upstream-asserted email from oauth2-proxy headers. + + Handles three cases: + - X-Auth-Request-Email contains a real email → use as-is + - X-Auth-Request-Email contains a bare username (user_id_claim= + cognito:username) → synthesise @DEFAULT_EMAIL_DOMAIN + - X-Auth-Request-Email is empty but X-Auth-Request-User has a username + → synthesise the same way + + Returns the raw (un-normalised) email string, or "" if none could be + derived. Caller is responsible for `_normalise_email` before using. + + TODO(security): the bare-username synthesis paths let a Cognito + principal whose username collides with a real Plane user's email + local-part impersonate that user (e.g. `cognito:username=alice` → + synthesised to `alice@askii.ai` → resolves to an existing `alice@askii.ai` + Plane user). The defensive fix is to drop these synthesis paths and + require a real email claim from the upstream proxy. + """ + email = (request.META.get("HTTP_X_AUTH_REQUEST_EMAIL") or "").strip() + if email and "@" not in email: + domain = getattr(settings, "DEFAULT_EMAIL_DOMAIN", "askii.ai") + email = f"{email}@{domain}" + if not email: + username = (request.META.get("HTTP_X_AUTH_REQUEST_USER") or "").strip() + if username: + domain = getattr(settings, "DEFAULT_EMAIL_DOMAIN", "askii.ai") + email = f"{username}@{domain}" + return email + def _resolve_user(self, email): username_hint = email.split("@")[0] or uuid4().hex try: diff --git a/apps/api/plane/authentication/tests/test_proxy_auth.py b/apps/api/plane/authentication/tests/test_proxy_auth.py index 98bc6d6329c..e4ca4d61c0c 100644 --- a/apps/api/plane/authentication/tests/test_proxy_auth.py +++ b/apps/api/plane/authentication/tests/test_proxy_auth.py @@ -12,12 +12,19 @@ Design contract being tested ----------------------------- -- Reads HTTP_X_AUTH_REQUEST_EMAIL from request.META -- If request.user.is_authenticated → pass through immediately (no DB, no login) +- Reads identity headers from request.META: + * HTTP_X_AUTH_REQUEST_EMAIL + * HTTP_X_AUTH_REQUEST_USER (fallback when email header is empty) - If path starts with a bypass prefix → pass through immediately (no DB, no login) Default bypass prefixes: ["/god-mode", "/api/instances"] -- If email header is absent → pass through unauthenticated -- If email is present → get_or_create User, create Profile on first creation, +- If request.user.is_authenticated: + * proxy header absent or matches request.user.email → short-circuit + * proxy header asserts a DIFFERENT email → logout() to flush the stale session, + then fall through and re-authenticate + (defends against the "stale Django session survives upstream logout" + class of bug — see TestProxyAuthMiddlewareUserSwitch) +- If both identity headers are absent (and no existing session) → pass through unauthenticated +- If identity can be derived from headers → get_or_create User, create Profile on first creation, then call user_login(request, user, is_app=True) to establish session - New users get: set_unusable_password(), is_password_autoset=True, is_email_verified=True - username is always uuid4().hex (never the Cognito sub — avoids length/collision issues) @@ -28,7 +35,7 @@ """ import pytest -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, patch from django.contrib.auth.models import AnonymousUser from django.test import RequestFactory @@ -36,6 +43,7 @@ from plane.db.models import User, Profile PATCH_USER_LOGIN = "plane.authentication.middleware.proxy_auth.user_login" +PATCH_LOGOUT = "plane.authentication.middleware.proxy_auth.logout" # --------------------------------------------------------------------------- @@ -98,6 +106,155 @@ def test_skips_when_user_already_authenticated(self, django_user_model): assert User.objects.count() == count_before +class TestProxyAuthMiddlewareUserSwitch: + """Stale Django session must not survive an upstream identity change.""" + + @pytest.mark.django_db + def test_logs_in_new_user_when_proxy_email_differs(self, django_user_model): + """ + GIVEN the current Django session belongs to alice + AND X-Auth-Request-Email = bob's email (oauth2-proxy now says bob) + WHEN the middleware processes the request + THEN logout() is called to flush alice's stale session + AND user_login is called with bob (not short-circuited on alice) + + Real-world repro: portal "log out of all apps" clears the shared + _oauth2_proxy cookie + Cognito session but NOT Plane's own Django + session cookie. The next user logs in upstream; this tab refreshes + and must re-bind to the new identity. + """ + django_user_model.objects.create_user( + email="alice@example.com", username="alice", password="x", + ) + bob = django_user_model.objects.create_user( + email="bob@example.com", username="bob", password="x", + ) + alice = django_user_model.objects.get(email="alice@example.com") + middleware = make_middleware() + request = make_request( + meta={"HTTP_X_AUTH_REQUEST_EMAIL": "bob@example.com"}, + authenticated_user=alice, + ) + + # Use a manager mock to track call order + manager = Mock() + + with patch(PATCH_USER_LOGIN) as mock_login, \ + patch(PATCH_LOGOUT) as mock_logout: + # Attach mocks to manager to track order + manager.attach_mock(mock_logout, 'logout') + manager.attach_mock(mock_login, 'login') + + middleware(request) + + # Session should be flushed when mismatch is detected + mock_logout.assert_called_once_with(request) + # Then re-auth with the new user + mock_login.assert_called_once() + assert mock_login.call_args.kwargs["user"].pk == bob.pk + + # Verify logout happens before user_login (order matters for security) + call_names = [call[0] for call in manager.mock_calls] + assert 'logout' in call_names and 'login' in call_names, \ + "Both logout and login should be called" + assert call_names.index('logout') < call_names.index('login'), \ + "logout() must be called before user_login() to flush stale session" + + @pytest.mark.django_db + def test_no_logout_when_header_absent(self, django_user_model): + """ + GIVEN the current Django session belongs to alice + AND no X-Auth-Request-Email header is present + WHEN the middleware processes the request + THEN user_login is NOT called — header absence is not a logout signal. + + Header absence can occur on internal requests, bypass paths reached + via redirect, or test paths. Treating it as "log out" would break + every such call. + """ + alice = django_user_model.objects.create_user( + email="alice@example.com", username="alice", password="x", + ) + middleware = make_middleware() + request = make_request(authenticated_user=alice) + + with patch(PATCH_USER_LOGIN) as mock_login: + middleware(request) + + mock_login.assert_not_called() + + @pytest.mark.django_db + def test_match_is_case_and_whitespace_insensitive(self, django_user_model): + """ + GIVEN the current Django session belongs to alice@example.com + AND X-Auth-Request-Email = " ALICE@example.com " + WHEN the middleware processes the request + THEN user_login is NOT called — match comparison runs through the + same normalisation as user creation does. + """ + alice = django_user_model.objects.create_user( + email="alice@example.com", username="alice", password="x", + ) + middleware = make_middleware() + request = make_request( + meta={"HTTP_X_AUTH_REQUEST_EMAIL": " ALICE@example.com "}, + authenticated_user=alice, + ) + + with patch(PATCH_USER_LOGIN) as mock_login: + middleware(request) + + mock_login.assert_not_called() + + @pytest.mark.django_db + def test_flushes_session_when_incoming_user_is_inactive(self, django_user_model): + """ + GIVEN the current Django session belongs to alice (active) + AND X-Auth-Request-Email = bob's email + AND bob exists but is_active=False + WHEN the middleware processes the request + THEN logout() is called to flush alice's session + AND user_login is NOT called (bob is inactive) + AND the request proceeds without re-authentication + + This prevents a stale session from surviving when re-auth fails. + Without the explicit logout(), alice's session would remain active + and the request would proceed as alice even though the proxy now + asserts bob's identity. + """ + alice = django_user_model.objects.create_user( + email="alice@example.com", username="alice", password="x" + ) + django_user_model.objects.create_user( + email="bob@example.com", username="bob", password="x", is_active=False + ) + + # Capture what request object get_response receives + received_request = None + def capture_get_response(req): + nonlocal received_request + received_request = req + return MagicMock(status_code=200) + + middleware = make_middleware(get_response=capture_get_response) + request = make_request( + meta={"HTTP_X_AUTH_REQUEST_EMAIL": "bob@example.com"}, + authenticated_user=alice, + ) + + with patch(PATCH_USER_LOGIN) as mock_login, \ + patch(PATCH_LOGOUT) as mock_logout: + middleware(request) + + # Session should be flushed when mismatch is detected + mock_logout.assert_called_once_with(request) + # user_login should NOT be called because bob is inactive + mock_login.assert_not_called() + # Verify get_response was called with the request (middleware didn't block it) + assert received_request is request, \ + "Middleware should pass the request to get_response after logout" + + class TestProxyAuthMiddlewareNoHeader: """Middleware must pass through cleanly when the email header is absent.""" @@ -307,6 +464,38 @@ def test_instances_path_is_bypassed(self): mock_login.assert_not_called() assert User.objects.count() == count_before + @pytest.mark.django_db + def test_bypass_dominates_mismatched_proxy_header(self, django_user_model): + """ + GIVEN the current Django session belongs to alice + AND the request targets a bypass path (/god-mode/setup/) + AND X-Auth-Request-Email = bob's email (mismatch) + WHEN the middleware processes the request + THEN logout() is NOT called — bypass dominates the mismatch flow + AND user_login() is NOT called + + The bypass check runs at the top of __call__, before the proxy header + is read or session identity is compared. This guards god-mode local + admin sessions against being kicked out by an unrelated mPass + identity reaching the same browser. + """ + alice = django_user_model.objects.create_user( + email="alice@example.com", username="alice", password="x", + ) + middleware = make_middleware() + request = make_request( + path="/god-mode/setup/", + meta={"HTTP_X_AUTH_REQUEST_EMAIL": "bob@example.com"}, + authenticated_user=alice, + ) + + with patch(PATCH_USER_LOGIN) as mock_login, \ + patch(PATCH_LOGOUT) as mock_logout: + middleware(request) + + mock_logout.assert_not_called() + mock_login.assert_not_called() + class TestProxyAuthMiddlewareEdgeCases: """Email normalisation and concurrent creation races."""