Skip to content

Commit 14a56e1

Browse files
committed
feat(keycardai-oauth): support for impersonation token exchange
- Add substitute-user token type and unsigned JWT builder - Add impersonate method to Client and AsyncClient - Add user_identifier callback to MCP grant decorator - Add impersonation token exchange example
1 parent a9e6f04 commit 14a56e1

File tree

14 files changed

+1289
-5
lines changed

14 files changed

+1289
-5
lines changed

packages/mcp/src/keycardai/mcp/server/auth/provider.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ def get_token_verifier(
429429
client_factory=self.client_factory,
430430
)
431431

432-
def grant(self, resources: str | list[str]):
432+
def grant(self, resources: str | list[str], user_identifier: Callable[..., str] | None = None):
433433
"""Decorator for automatic delegated token exchange.
434434
435435
This decorator automates the OAuth token exchange process for accessing
@@ -478,6 +478,18 @@ async def my_async_tool(access_ctx: AccessContext, ctx: Context, user_id: str):
478478
- Have a parameter annotated with `Context` type from MCP (e.g., `ctx: Context`)
479479
- Can be either async or sync (the decorator handles both cases automatically)
480480
481+
When ``user_identifier`` is provided, the decorator uses impersonation
482+
(substitute-user token exchange) instead of subject-token-based exchange.
483+
The callable receives only the tool's **keyword** arguments (not positional
484+
arguments) and should return the user identifier string::
485+
486+
@provider.grant(
487+
"https://graph.microsoft.com",
488+
user_identifier=lambda **kw: kw["user_email"],
489+
)
490+
async def get_calendar(access_ctx: AccessContext, ctx: Context, user_email: str):
491+
token = access_ctx.access("https://graph.microsoft.com").access_token
492+
481493
Error handling:
482494
- Sets error state in AccessContext if token exchange fails
483495
- Preserves original function signature and behavior
@@ -643,27 +655,46 @@ async def wrapper(*args, **kwargs) -> Any:
643655
_resource_list = (
644656
[resources] if isinstance(resources, str) else resources
645657
)
658+
659+
# Resolve user identifier for impersonation if callback provided
660+
_resolved_user_id: str | None = None
661+
if user_identifier is not None:
662+
try:
663+
_resolved_user_id = user_identifier(**kwargs)
664+
except Exception as e:
665+
_set_error({
666+
"message": "Failed to resolve user_identifier from tool arguments.",
667+
"raw_error": str(e),
668+
}, None, _access_ctx)
669+
return await _call_func(_is_async_func, func, *args, **kwargs)
670+
646671
_access_tokens = {}
647672
for resource in _resource_list:
648673
try:
649-
# Prepare token exchange request using application identity provider
650-
if self.application_credential:
674+
if _resolved_user_id is not None:
675+
# Impersonation path: use substitute-user token exchange
676+
_token_response = await _client.impersonate(
677+
user_identifier=_resolved_user_id,
678+
resource=resource,
679+
)
680+
elif self.application_credential:
681+
# Prepare token exchange request using application identity provider
651682
_token_exchange_request = await self.application_credential.prepare_token_exchange_request(
652683
client=_client,
653684
subject_token=_keycardai_auth_info["access_token"],
654685
resource=resource,
655686
auth_info=_keycardai_auth_info,
656687
)
688+
_token_response = await _client.exchange_token(_token_exchange_request)
657689
else:
658690
# Basic token exchange without client authentication
659691
_token_exchange_request = TokenExchangeRequest(
660692
subject_token=_keycardai_auth_info["access_token"],
661693
resource=resource,
662694
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
663695
)
696+
_token_response = await _client.exchange_token(_token_exchange_request)
664697

665-
# Execute token exchange
666-
_token_response = await _client.exchange_token(_token_exchange_request)
667698
_access_tokens[resource] = _token_response
668699
except Exception as e:
669700
_error_dict: dict[str, str] = {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Keycard zone URL (from Zone Settings in Keycard Console)
2+
ZONE_URL=https://example.keycard.cloud
3+
4+
# Landing Page app: public client, used for browser-based authorization
5+
LANDING_PAGE_CLIENT_ID=landing-page
6+
7+
# Background Agent app: confidential client, used for offline impersonation
8+
AGENT_CLIENT_ID=background-agent
9+
AGENT_CLIENT_SECRET=your-client-secret-here
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.env
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.12
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Impersonation Token Exchange Example
2+
3+
Demonstrates user impersonation via OAuth 2.0 token exchange (RFC 8693) using the Keycard OAuth SDK. Users grant access through a landing page by authenticating with their identity provider and approving resource access. A background agent then obtains resource tokens on their behalf, without the user being present.
4+
5+
## How It Works
6+
7+
### Landing page (one-time, interactive)
8+
9+
The landing page is a web application where users sign in and grant the background agent permission to act on their behalf.
10+
11+
1. Serves a local web page
12+
2. The user clicks "Continue with Keycard" and authenticates with their identity provider
13+
3. The user grants access to the requested resources
14+
4. Keycard creates a delegated grant for each resource
15+
16+
The same landing page can be used again when new resources need to be authorized.
17+
18+
### Background agent (repeatable, offline)
19+
20+
The background agent is a confidential client (e.g. `client_secret_basic` or workload identity).
21+
22+
1. The agent authenticates and requests a token for a specific user and resource via `client.impersonate()`
23+
2. Keycard validates the delegated grant and issues a scoped, short-lived resource token
24+
25+
No browser, no user interaction. Impersonation is forbidden by default. An administrator must explicitly allow specific applications to impersonate.
26+
27+
## Prerequisites
28+
29+
- Python 3.10+
30+
- [uv](https://docs.astral.sh/uv/) installed
31+
- Access to Keycard Console and a Keycard zone
32+
33+
## Configuration
34+
35+
Copy `.env.example` to `.env`:
36+
37+
```bash
38+
cp .env.example .env
39+
```
40+
41+
Set up the following in Keycard Console:
42+
43+
1. **Set `ZONE_URL`** in `.env` to your Keycard zone URL from the Zone Settings.
44+
2. **Create a provider** (e.g. `https://github.com` as the identity provider).
45+
3. **Create a resource** (e.g. `https://api.github.com`) and link it to the provider.
46+
4. **Create a Landing Page application**
47+
- Public credential, identifier: `landing-page`
48+
- Redirect URI: `http://localhost/callback`
49+
- Add the resource as a dependency
50+
5. **Create a Background Agent application**
51+
- Password credential, identifier: `background-agent`
52+
- Set `AGENT_CLIENT_SECRET` in `.env`.
53+
6. **Add a policy enabling impersonation** in Keycard Console. To allow the Background Agent app to impersonate a specific user:
54+
```
55+
permit (
56+
principal is Keycard::Application,
57+
action,
58+
resource
59+
)
60+
when {
61+
principal.identifier == "background-agent" &&
62+
context.impersonate == true &&
63+
context has subject &&
64+
context.subject.identifier == "user@example.com"
65+
};
66+
```
67+
68+
69+
## Usage
70+
71+
### Step 1: Install dependencies
72+
73+
```bash
74+
uv sync
75+
```
76+
77+
### Step 2: Landing page (interactive, one-time)
78+
79+
Starts the landing page where users sign in and grant the background agent access to resources. In production, this would be deployed as a hosted web application. Resources are determined by the application's dependencies configured in Keycard Console.
80+
81+
```bash
82+
uv run python landing_page.py --port 3000
83+
```
84+
85+
Example output:
86+
87+
```
88+
═══ Landing Page ═══
89+
Auth: PKCE (no secret)
90+
Listening: http://localhost:3000
91+
92+
Landing page running at http://localhost:3000
93+
Press Ctrl+C to stop.
94+
```
95+
96+
Open `http://localhost:3000` in a browser and click "Continue with Keycard".
97+
98+
### Step 3: Get the user identifier from Keycard Console
99+
100+
After the user signs in, find their identifier in Keycard Console under the Users section. The Provider can be configured to map claims (e.g. `email`, `sub`) to the user identifier on creation. The identifier can also be changed from Keycard Console at any time.
101+
102+
### Step 4: Run background agent (offline, repeatable)
103+
104+
The background agent obtains a resource token for the user without any browser interaction.
105+
106+
```bash
107+
uv run python background_agent.py \
108+
--user-identifier user@example.com \
109+
--resource https://api.github.com
110+
```
111+
112+
Example output:
113+
114+
```
115+
═══ Background Agent ═══
116+
Auth: client_credentials
117+
On behalf of: user@example.com
118+
Access resource: https://api.github.com
119+
120+
Access Token: eyJhbG...
121+
Token Type: Bearer
122+
Expires In: 3600s
123+
```
124+
125+
If the policy does not allow impersonation, you will see an error:
126+
127+
```
128+
Error: OAuth error: access_denied - Access denied by policy. Policy set: <policy-set-id>. Policy set version: <policy-set-version>. Determining policies: default-user-grants.
129+
```
130+
131+
### Step 5: Verify in audit logs
132+
133+
In Keycard Console, navigate to Audit Logs to see the user authorization and the credential issued to the background agent on behalf of the user.
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#!/usr/bin/env python3
2+
"""Background Agent app.
3+
4+
Authenticates with client credentials and obtains a resource-specific access
5+
token on behalf of a user, without the user being present. The user must have
6+
previously granted access through the landing page.
7+
"""
8+
9+
import argparse
10+
import os
11+
import sys
12+
from typing import NoReturn
13+
14+
from dotenv import load_dotenv
15+
16+
from keycardai.oauth import Client
17+
from keycardai.oauth.exceptions import OAuthHttpError, OAuthProtocolError
18+
from keycardai.oauth.http.auth import BasicAuth
19+
from keycardai.oauth.types.models import ClientConfig
20+
21+
22+
def run_background_agent(
23+
zone_url: str,
24+
client_id: str,
25+
client_secret: str,
26+
user_identifier: str,
27+
resource: str,
28+
) -> None:
29+
"""Impersonate a user and print the resulting resource token.
30+
31+
Args:
32+
zone_url: Keycard zone URL for metadata discovery.
33+
client_id: Confidential client ID.
34+
client_secret: Confidential client secret.
35+
user_identifier: User identifier (e.g. email, oid).
36+
resource: Target resource URI.
37+
"""
38+
try:
39+
with Client(
40+
base_url=zone_url,
41+
auth=BasicAuth(client_id, client_secret),
42+
config=ClientConfig(
43+
enable_metadata_discovery=True,
44+
auto_register_client=False,
45+
),
46+
) as client:
47+
response = client.impersonate(
48+
user_identifier=user_identifier,
49+
resource=resource,
50+
)
51+
52+
print(f"Access Token: {response.access_token[:6]}...")
53+
print(f"Token Type: {response.token_type}")
54+
if response.expires_in:
55+
print(f"Expires In: {response.expires_in}s")
56+
if response.scope:
57+
print(f"Scope: {' '.join(response.scope)}")
58+
59+
except OAuthProtocolError as e:
60+
desc = f" - {e.error_description}" if e.error_description else ""
61+
raise SystemExit(f"Error: OAuth error: {e.error}{desc}") from None
62+
except OAuthHttpError as e:
63+
raise SystemExit(f"Error: HTTP {e.status_code}: {e.response_body}") from None
64+
65+
66+
# ---------------------------------------------------------------------------
67+
# CLI entry point
68+
# ---------------------------------------------------------------------------
69+
70+
def _error_exit(message: str) -> NoReturn:
71+
print(f"Error: {message}", file=sys.stderr)
72+
sys.exit(1)
73+
74+
75+
def main() -> None:
76+
load_dotenv()
77+
78+
parser = argparse.ArgumentParser(
79+
description="Background Agent: obtain a resource token on behalf of a user (offline)",
80+
)
81+
parser.add_argument(
82+
"--zone-url",
83+
default=os.getenv("ZONE_URL"),
84+
help="Keycard zone URL (env: ZONE_URL)",
85+
)
86+
parser.add_argument(
87+
"--client-id",
88+
default=os.getenv("AGENT_CLIENT_ID"),
89+
help="Confidential client ID (env: AGENT_CLIENT_ID)",
90+
)
91+
parser.add_argument(
92+
"--client-secret",
93+
default=os.getenv("AGENT_CLIENT_SECRET"),
94+
help="Confidential client secret (env: AGENT_CLIENT_SECRET)",
95+
)
96+
parser.add_argument(
97+
"--user-identifier",
98+
help="User identifier for impersonation",
99+
)
100+
parser.add_argument(
101+
"--resource",
102+
help="Resource URI to get a token for",
103+
)
104+
105+
args = parser.parse_args()
106+
107+
if not args.zone_url:
108+
_error_exit("--zone-url is required (or set ZONE_URL)")
109+
if not args.client_id:
110+
_error_exit("--client-id is required (or set AGENT_CLIENT_ID)")
111+
if not args.client_secret:
112+
_error_exit("--client-secret is required (or set AGENT_CLIENT_SECRET)")
113+
if not args.user_identifier:
114+
_error_exit("--user-identifier is required")
115+
if not args.resource:
116+
_error_exit("--resource is required")
117+
118+
print("═══ Background Agent ═══")
119+
print(" Auth: client_credentials")
120+
print(f" On behalf of: {args.user_identifier}")
121+
print(f" Access resource: {args.resource}")
122+
print()
123+
124+
run_background_agent(
125+
zone_url=args.zone_url,
126+
client_id=args.client_id,
127+
client_secret=args.client_secret,
128+
user_identifier=args.user_identifier,
129+
resource=args.resource,
130+
)
131+
132+
133+
if __name__ == "__main__":
134+
main()

0 commit comments

Comments
 (0)