Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions apps/api/plane/authentication/middleware/proxy_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from uuid import uuid4

from django.conf import settings
from django.contrib.auth import logout
from django.contrib.auth.hashers import make_password
from django.db import IntegrityError

Expand Down Expand Up @@ -55,30 +56,29 @@ def __init__(self, get_response):
)

def __call__(self, request):
# Layer 2 session already valid — nothing to do.
if request.user.is_authenticated:
return self.get_response(request)

# Bypass paths use their own auth (god-mode local login, instance admin).
# TODO(mpass): Keep OPTIONS bypass at the proxy layer; add an app-level
# fallback here only if preflight routing becomes inconsistent.
if _is_bypass_path(request.path, self.bypass_paths):
return self.get_response(request)

email = (request.META.get("HTTP_X_AUTH_REQUEST_EMAIL") or "").strip()
if email and "@" not in email:
# Header holds a bare username (user_id_claim=cognito:username). Synth email.
domain = getattr(settings, "DEFAULT_EMAIL_DOMAIN", "askii.ai")
email = f"{email}@{domain}"
if not email:
username = (request.META.get("HTTP_X_AUTH_REQUEST_USER") or "").strip()
domain = getattr(settings, "DEFAULT_EMAIL_DOMAIN", "askii.ai")
if username:
email = f"{username}@{domain}"
if not email:
return self.get_response(request)
email = _normalise_email(self._read_proxy_email(request))

if request.user.is_authenticated:
# Short-circuit only when the upstream-asserted identity matches the
# current Django session, or when no header is present (request did
# not pass through ForwardAuth — header absence is not a logout signal).
current = _normalise_email(request.user.email or "")
if not email or current == email:
return self.get_response(request)

Comment thread
awais786 marked this conversation as resolved.
# Mismatch detected: proxy asserts a different identity than the
# current session. Flush the stale session immediately so that if
# subsequent re-auth fails (e.g., incoming user is inactive), the
# request proceeds as unauthenticated rather than retaining the
# previous user's identity.
logout(request)

Comment thread
awais786 marked this conversation as resolved.
email = _normalise_email(email)
if not email:
return self.get_response(request)

Expand All @@ -92,6 +92,38 @@ def __call__(self, request):
user_login(request=request, user=user, is_app=True)
return self.get_response(request)

@staticmethod
def _read_proxy_email(request):
"""Extract the upstream-asserted email from oauth2-proxy headers.

Handles three cases:
- X-Auth-Request-Email contains a real email → use as-is
- X-Auth-Request-Email contains a bare username (user_id_claim=
cognito:username) → synthesise <username>@DEFAULT_EMAIL_DOMAIN
- X-Auth-Request-Email is empty but X-Auth-Request-User has a username
→ synthesise the same way

Comment on lines +97 to +105

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 417b884 — updated the docstring from "Handles two header shapes" to "Handles three cases" to match the three bullets actually described.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in commit 417b884 — the _read_proxy_email docstring wording now reflects three cases, matching the listed behavior.

Returns the raw (un-normalised) email string, or "" if none could be
derived. Caller is responsible for `_normalise_email` before using.

TODO(security): the bare-username synthesis paths let a Cognito
principal whose username collides with a real Plane user's email
local-part impersonate that user (e.g. `cognito:username=alice` →
synthesised to `alice@askii.ai` → resolves to an existing `alice@askii.ai`
Plane user). The defensive fix is to drop these synthesis paths and
require a real email claim from the upstream proxy.
"""
email = (request.META.get("HTTP_X_AUTH_REQUEST_EMAIL") or "").strip()
if email and "@" not in email:
domain = getattr(settings, "DEFAULT_EMAIL_DOMAIN", "askii.ai")
email = f"{email}@{domain}"
if not email:
username = (request.META.get("HTTP_X_AUTH_REQUEST_USER") or "").strip()
if username:
domain = getattr(settings, "DEFAULT_EMAIL_DOMAIN", "askii.ai")
email = f"{username}@{domain}"
return email

def _resolve_user(self, email):
username_hint = email.split("@")[0] or uuid4().hex
try:
Expand Down
199 changes: 194 additions & 5 deletions apps/api/plane/authentication/tests/test_proxy_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@

Design contract being tested
-----------------------------
- Reads HTTP_X_AUTH_REQUEST_EMAIL from request.META
- If request.user.is_authenticated → pass through immediately (no DB, no login)
- Reads identity headers from request.META:
* HTTP_X_AUTH_REQUEST_EMAIL
* HTTP_X_AUTH_REQUEST_USER (fallback when email header is empty)
- If path starts with a bypass prefix → pass through immediately (no DB, no login)
Default bypass prefixes: ["/god-mode", "/api/instances"]
- If email header is absent → pass through unauthenticated
- If email is present → get_or_create User, create Profile on first creation,
- If request.user.is_authenticated:
* proxy header absent or matches request.user.email → short-circuit
* proxy header asserts a DIFFERENT email → logout() to flush the stale session,
then fall through and re-authenticate
(defends against the "stale Django session survives upstream logout"
class of bug — see TestProxyAuthMiddlewareUserSwitch)
- If both identity headers are absent (and no existing session) → pass through unauthenticated
- If identity can be derived from headers → get_or_create User, create Profile on first creation,
then call user_login(request, user, is_app=True) to establish session
- New users get: set_unusable_password(), is_password_autoset=True, is_email_verified=True
- username is always uuid4().hex (never the Cognito sub — avoids length/collision issues)
Expand All @@ -28,14 +35,15 @@
"""

import pytest
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, Mock, patch
from django.contrib.auth.models import AnonymousUser
from django.test import RequestFactory

from plane.authentication.middleware.proxy_auth import ProxyAuthMiddleware
from plane.db.models import User, Profile

PATCH_USER_LOGIN = "plane.authentication.middleware.proxy_auth.user_login"
PATCH_LOGOUT = "plane.authentication.middleware.proxy_auth.logout"


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -98,6 +106,155 @@ def test_skips_when_user_already_authenticated(self, django_user_model):
assert User.objects.count() == count_before


class TestProxyAuthMiddlewareUserSwitch:
"""Stale Django session must not survive an upstream identity change."""

@pytest.mark.django_db
def test_logs_in_new_user_when_proxy_email_differs(self, django_user_model):
"""
GIVEN the current Django session belongs to alice
AND X-Auth-Request-Email = bob's email (oauth2-proxy now says bob)
WHEN the middleware processes the request
THEN logout() is called to flush alice's stale session
AND user_login is called with bob (not short-circuited on alice)

Real-world repro: portal "log out of all apps" clears the shared
_oauth2_proxy cookie + Cognito session but NOT Plane's own Django
session cookie. The next user logs in upstream; this tab refreshes
and must re-bind to the new identity.
"""
django_user_model.objects.create_user(
email="alice@example.com", username="alice", password="x",
)
bob = django_user_model.objects.create_user(
email="bob@example.com", username="bob", password="x",
)
alice = django_user_model.objects.get(email="alice@example.com")
middleware = make_middleware()
request = make_request(
meta={"HTTP_X_AUTH_REQUEST_EMAIL": "bob@example.com"},
authenticated_user=alice,
)

# Use a manager mock to track call order
manager = Mock()

with patch(PATCH_USER_LOGIN) as mock_login, \
patch(PATCH_LOGOUT) as mock_logout:
# Attach mocks to manager to track order
manager.attach_mock(mock_logout, 'logout')
manager.attach_mock(mock_login, 'login')

middleware(request)

# Session should be flushed when mismatch is detected
mock_logout.assert_called_once_with(request)
# Then re-auth with the new user
mock_login.assert_called_once()
assert mock_login.call_args.kwargs["user"].pk == bob.pk
Comment on lines +142 to +154

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cc1107b — the test now uses a manager mock with attach_mock() to track call order and asserts that logout() is called before user_login(). This strengthens the regression guard by ensuring the stale session is flushed before any re-authentication work begins.


# Verify logout happens before user_login (order matters for security)
call_names = [call[0] for call in manager.mock_calls]
assert 'logout' in call_names and 'login' in call_names, \
"Both logout and login should be called"
assert call_names.index('logout') < call_names.index('login'), \
"logout() must be called before user_login() to flush stale session"

@pytest.mark.django_db
def test_no_logout_when_header_absent(self, django_user_model):
"""
GIVEN the current Django session belongs to alice
AND no X-Auth-Request-Email header is present
WHEN the middleware processes the request
THEN user_login is NOT called — header absence is not a logout signal.

Header absence can occur on internal requests, bypass paths reached
via redirect, or test paths. Treating it as "log out" would break
every such call.
"""
alice = django_user_model.objects.create_user(
email="alice@example.com", username="alice", password="x",
)
middleware = make_middleware()
request = make_request(authenticated_user=alice)

with patch(PATCH_USER_LOGIN) as mock_login:
middleware(request)

mock_login.assert_not_called()

@pytest.mark.django_db
def test_match_is_case_and_whitespace_insensitive(self, django_user_model):
"""
GIVEN the current Django session belongs to alice@example.com
AND X-Auth-Request-Email = " ALICE@example.com "
WHEN the middleware processes the request
THEN user_login is NOT called — match comparison runs through the
same normalisation as user creation does.
"""
alice = django_user_model.objects.create_user(
email="alice@example.com", username="alice", password="x",
)
middleware = make_middleware()
request = make_request(
meta={"HTTP_X_AUTH_REQUEST_EMAIL": " ALICE@example.com "},
authenticated_user=alice,
)

with patch(PATCH_USER_LOGIN) as mock_login:
middleware(request)

mock_login.assert_not_called()

@pytest.mark.django_db
def test_flushes_session_when_incoming_user_is_inactive(self, django_user_model):
"""
GIVEN the current Django session belongs to alice (active)
AND X-Auth-Request-Email = bob's email
AND bob exists but is_active=False
WHEN the middleware processes the request
THEN logout() is called to flush alice's session
AND user_login is NOT called (bob is inactive)
AND the request proceeds without re-authentication

Comment on lines +210 to +219

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 17e64ac — the test now uses a custom get_response stub to capture the request object and verify that the middleware passes it through after calling logout() and not calling user_login(). This confirms the request proceeds without re-authentication when the incoming user is inactive.

This prevents a stale session from surviving when re-auth fails.
Without the explicit logout(), alice's session would remain active
and the request would proceed as alice even though the proxy now
asserts bob's identity.
"""
alice = django_user_model.objects.create_user(
email="alice@example.com", username="alice", password="x"
)
django_user_model.objects.create_user(
email="bob@example.com", username="bob", password="x", is_active=False
)

# Capture what request object get_response receives
received_request = None
def capture_get_response(req):
nonlocal received_request
received_request = req
return MagicMock(status_code=200)

middleware = make_middleware(get_response=capture_get_response)
request = make_request(
meta={"HTTP_X_AUTH_REQUEST_EMAIL": "bob@example.com"},
authenticated_user=alice,
)

with patch(PATCH_USER_LOGIN) as mock_login, \
patch(PATCH_LOGOUT) as mock_logout:
middleware(request)

# Session should be flushed when mismatch is detected
mock_logout.assert_called_once_with(request)
# user_login should NOT be called because bob is inactive
mock_login.assert_not_called()
# Verify get_response was called with the request (middleware didn't block it)
assert received_request is request, \
"Middleware should pass the request to get_response after logout"


class TestProxyAuthMiddlewareNoHeader:
"""Middleware must pass through cleanly when the email header is absent."""

Expand Down Expand Up @@ -307,6 +464,38 @@ def test_instances_path_is_bypassed(self):
mock_login.assert_not_called()
assert User.objects.count() == count_before

@pytest.mark.django_db
def test_bypass_dominates_mismatched_proxy_header(self, django_user_model):
"""
GIVEN the current Django session belongs to alice
AND the request targets a bypass path (/god-mode/setup/)
AND X-Auth-Request-Email = bob's email (mismatch)
WHEN the middleware processes the request
THEN logout() is NOT called — bypass dominates the mismatch flow
AND user_login() is NOT called

The bypass check runs at the top of __call__, before the proxy header
is read or session identity is compared. This guards god-mode local
admin sessions against being kicked out by an unrelated mPass
identity reaching the same browser.
"""
alice = django_user_model.objects.create_user(
email="alice@example.com", username="alice", password="x",
)
middleware = make_middleware()
request = make_request(
path="/god-mode/setup/",
meta={"HTTP_X_AUTH_REQUEST_EMAIL": "bob@example.com"},
authenticated_user=alice,
)

with patch(PATCH_USER_LOGIN) as mock_login, \
patch(PATCH_LOGOUT) as mock_logout:
middleware(request)

mock_logout.assert_not_called()
mock_login.assert_not_called()


class TestProxyAuthMiddlewareEdgeCases:
"""Email normalisation and concurrent creation races."""
Expand Down
Loading