Skip to content

Commit 21bf638

Browse files
committed
feat: support for impersonation token exchange
1 parent 0d011da commit 21bf638

File tree

21 files changed

+2091
-15
lines changed

21 files changed

+2091
-15
lines changed

packages/mcp/src/keycardai/mcp/server/auth/provider.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ def get_token_verifier(
429429
client_factory=self.client_factory,
430430
)
431431

432-
def grant(self, resources: str | list[str]):
432+
def grant(self, resources: str | list[str], user_identifier: Callable[..., str] | None = None):
433433
"""Decorator for automatic delegated token exchange.
434434
435435
This decorator automates the OAuth token exchange process for accessing
@@ -478,6 +478,18 @@ async def my_async_tool(access_ctx: AccessContext, ctx: Context, user_id: str):
478478
- Have a parameter annotated with `Context` type from MCP (e.g., `ctx: Context`)
479479
- Can be either async or sync (the decorator handles both cases automatically)
480480
481+
When ``user_identifier`` is provided, the decorator uses impersonation
482+
(substitute-user token exchange) instead of subject-token-based exchange.
483+
The callable receives only the tool's **keyword** arguments (not positional
484+
arguments) and should return the user identifier string::
485+
486+
@provider.grant(
487+
"https://graph.microsoft.com",
488+
user_identifier=lambda **kw: kw["user_email"],
489+
)
490+
async def get_calendar(access_ctx: AccessContext, ctx: Context, user_email: str):
491+
token = access_ctx.access("https://graph.microsoft.com").access_token
492+
481493
Error handling:
482494
- Sets error state in AccessContext if token exchange fails
483495
- Preserves original function signature and behavior
@@ -643,27 +655,46 @@ async def wrapper(*args, **kwargs) -> Any:
643655
_resource_list = (
644656
[resources] if isinstance(resources, str) else resources
645657
)
658+
659+
# Resolve user identifier for impersonation if callback provided
660+
_resolved_user_id: str | None = None
661+
if user_identifier is not None:
662+
try:
663+
_resolved_user_id = user_identifier(**kwargs)
664+
except Exception as e:
665+
_set_error({
666+
"message": "Failed to resolve user_identifier from tool arguments.",
667+
"raw_error": str(e),
668+
}, None, _access_ctx)
669+
return await _call_func(_is_async_func, func, *args, **kwargs)
670+
646671
_access_tokens = {}
647672
for resource in _resource_list:
648673
try:
649-
# Prepare token exchange request using application identity provider
650-
if self.application_credential:
674+
if _resolved_user_id is not None:
675+
# Impersonation path: use substitute-user token exchange
676+
_token_response = await _client.impersonate(
677+
user_identifier=_resolved_user_id,
678+
resource=resource,
679+
)
680+
elif self.application_credential:
681+
# Prepare token exchange request using application identity provider
651682
_token_exchange_request = await self.application_credential.prepare_token_exchange_request(
652683
client=_client,
653684
subject_token=_keycardai_auth_info["access_token"],
654685
resource=resource,
655686
auth_info=_keycardai_auth_info,
656687
)
688+
_token_response = await _client.exchange_token(_token_exchange_request)
657689
else:
658690
# Basic token exchange without client authentication
659691
_token_exchange_request = TokenExchangeRequest(
660692
subject_token=_keycardai_auth_info["access_token"],
661693
resource=resource,
662694
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
663695
)
696+
_token_response = await _client.exchange_token(_token_exchange_request)
664697

665-
# Execute token exchange
666-
_token_response = await _client.exchange_token(_token_exchange_request)
667698
_access_tokens[resource] = _token_response
668699
except Exception as e:
669700
_error_dict: dict[str, str] = {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Keycard zone URL
2+
ZONE_URL=https://your-zone-id.keycard.cloud
3+
4+
# Landing Page app: public client (no secret), used for browser-based authorization
5+
LANDING_PAGE_CLIENT_ID=your-landing-page-client-id
6+
7+
# Background Agent app: confidential client, used for offline impersonation
8+
AGENT_CLIENT_ID=your-agent-client-id
9+
AGENT_CLIENT_SECRET=your-agent-client-secret
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.env
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.12
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# Impersonation Token Exchange Example
2+
3+
Demonstrates user impersonation via OAuth 2.0 token exchange (RFC 8693) using the Keycard OAuth SDK. Users grant access through a landing page by authenticating with their identity provider and approving resource access. A background agent then obtains resource tokens on their behalf, without the user being present.
4+
5+
## How It Works
6+
7+
### Landing page (one-time, interactive)
8+
9+
The landing page is a web application where users sign in and grant the background agent permission to act on their behalf.
10+
11+
1. Serves a local web page
12+
2. The user clicks "Continue with Keycard" and authenticates with their identity provider
13+
3. The user grants access to the requested resources
14+
4. Keycard creates a delegated grant for each resource
15+
16+
The same landing page can be used again when new resources need to be authorized.
17+
18+
### Background agent (repeatable, offline)
19+
20+
The background agent is a confidential client (e.g. `client_secret_basic` or workload identity).
21+
22+
1. The agent authenticates and requests a token for a specific user and resource via `client.impersonate()`
23+
2. Keycard validates the delegated grant and issues a scoped, short-lived resource token
24+
25+
No browser, no user interaction. Impersonation is forbidden by default. An administrator must explicitly allow specific applications to impersonate.
26+
27+
## Prerequisites
28+
29+
- Python 3.10+
30+
- A Keycard zone, set up in Console:
31+
1. **Create a provider** (e.g. GitHub as the identity provider).
32+
2. **Create a resource** (e.g. `https://api.github.com`) and link it to the provider.
33+
3. **Create a "Landing Page" application**
34+
- Public credential (client_id, no secret)
35+
- Redirect URI: `http://localhost/callback`
36+
- Add the resource as a dependency
37+
4. **Create a "Background Agent" application**
38+
- Password credential (client_id + client_secret)
39+
- Add the resource as a dependency
40+
5. **Add a policy enabling impersonation** in Console. To allow the Background Agent app to impersonate a specific user:
41+
```
42+
permit (
43+
principal is Keycard::Application,
44+
action,
45+
resource
46+
)
47+
when {
48+
principal.identifier == "background-agent" &&
49+
context.impersonate == true &&
50+
context has subject &&
51+
context.subject.identifier == "user@example.com"
52+
};
53+
```
54+
55+
## Install
56+
57+
```bash
58+
uv sync
59+
```
60+
61+
## Configuration
62+
63+
Copy `.env.example` to `.env` and fill in your values:
64+
65+
```bash
66+
cp .env.example .env
67+
```
68+
69+
All parameters can also be passed as CLI flags (run `--help` on each script for details).
70+
71+
72+
| Variable | Description |
73+
|---|---|
74+
| `ZONE_URL` | Keycard zone URL |
75+
| `LANDING_PAGE_CLIENT_ID` | Public client ID for the Landing Page app |
76+
| `AGENT_CLIENT_ID` | Confidential client ID for the Background Agent app |
77+
| `AGENT_CLIENT_SECRET` | Confidential client secret for the Background Agent app |
78+
79+
80+
## Usage
81+
82+
### Step 1: Landing page (interactive, one-time)
83+
84+
Starts the landing page where users sign in and grant the background agent access to resources. In production, this would be deployed as a hosted web application. Resources are determined by the application's dependencies configured in Keycard Console.
85+
86+
```bash
87+
uv run python landing_page.py --port 3000
88+
```
89+
90+
Example output:
91+
92+
```
93+
═══ Landing Page ═══
94+
Auth: PKCE (no secret)
95+
Listening: http://localhost:3000
96+
97+
Landing page running at http://localhost:3000
98+
Press Ctrl+C to stop.
99+
```
100+
101+
Open `http://localhost:3000` in a browser and click "Continue with Keycard".
102+
103+
### Step 2: Get the user identifier from Console
104+
105+
After the user signs in, find their identifier in Keycard Console under the Users section. The Provider can be configured to map claims (e.g. `email`, `sub`) to the user identifier on creation. The identifier can also be changed from the Console at any time.
106+
107+
### Step 3: Run background agent (offline, repeatable)
108+
109+
The background agent obtains a resource token for the user without any browser interaction.
110+
111+
```bash
112+
uv run python background_agent.py \
113+
--user-identifier user@example.com \
114+
--resource https://api.github.com
115+
```
116+
117+
Example output:
118+
119+
```
120+
═══ Background Agent ═══
121+
Auth: client_credentials
122+
On behalf of: user@example.com
123+
Access resource: https://api.github.com
124+
125+
Access Token: eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
126+
Token Type: Bearer
127+
Expires In: 3600s
128+
```
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#!/usr/bin/env python3
2+
"""Background Agent app.
3+
4+
Authenticates with client credentials and obtains a resource-specific access
5+
token on behalf of a user, without the user being present. The user must have
6+
previously granted access through the landing page.
7+
"""
8+
9+
import argparse
10+
import os
11+
import sys
12+
from typing import NoReturn
13+
14+
from dotenv import load_dotenv
15+
16+
from keycardai.oauth import Client
17+
from keycardai.oauth.exceptions import OAuthHttpError, OAuthProtocolError
18+
from keycardai.oauth.http.auth import BasicAuth
19+
from keycardai.oauth.types.models import ClientConfig
20+
21+
22+
def run_background_agent(
23+
zone_url: str,
24+
client_id: str,
25+
client_secret: str,
26+
user_identifier: str,
27+
resource: str,
28+
) -> None:
29+
"""Impersonate a user and print the resulting resource token.
30+
31+
Args:
32+
zone_url: Keycard zone URL for metadata discovery.
33+
client_id: Confidential client ID.
34+
client_secret: Confidential client secret.
35+
user_identifier: User identifier (e.g. email, oid).
36+
resource: Target resource URI.
37+
"""
38+
try:
39+
with Client(
40+
base_url=zone_url,
41+
auth=BasicAuth(client_id, client_secret),
42+
config=ClientConfig(
43+
enable_metadata_discovery=True,
44+
auto_register_client=False,
45+
),
46+
) as client:
47+
response = client.impersonate(
48+
user_identifier=user_identifier,
49+
resource=resource,
50+
)
51+
52+
print(f"Access Token: {response.access_token}")
53+
print(f"Token Type: {response.token_type}")
54+
if response.expires_in:
55+
print(f"Expires In: {response.expires_in}s")
56+
if response.scope:
57+
print(f"Scope: {' '.join(response.scope)}")
58+
59+
except OAuthProtocolError as e:
60+
desc = f" - {e.error_description}" if e.error_description else ""
61+
raise SystemExit(f"Error: OAuth error: {e.error}{desc}") from None
62+
except OAuthHttpError as e:
63+
raise SystemExit(f"Error: HTTP {e.status_code}: {e.response_body}") from None
64+
65+
66+
# ---------------------------------------------------------------------------
67+
# CLI entry point
68+
# ---------------------------------------------------------------------------
69+
70+
def _error_exit(message: str) -> NoReturn:
71+
print(f"Error: {message}", file=sys.stderr)
72+
sys.exit(1)
73+
74+
75+
def main() -> None:
76+
load_dotenv()
77+
78+
parser = argparse.ArgumentParser(
79+
description="Background Agent: obtain a resource token on behalf of a user (offline)",
80+
)
81+
parser.add_argument(
82+
"--zone-url",
83+
default=os.getenv("ZONE_URL"),
84+
help="Keycard zone URL (env: ZONE_URL)",
85+
)
86+
parser.add_argument(
87+
"--client-id",
88+
default=os.getenv("AGENT_CLIENT_ID"),
89+
help="Confidential client ID (env: AGENT_CLIENT_ID)",
90+
)
91+
parser.add_argument(
92+
"--client-secret",
93+
default=os.getenv("AGENT_CLIENT_SECRET"),
94+
help="Confidential client secret (env: AGENT_CLIENT_SECRET)",
95+
)
96+
parser.add_argument(
97+
"--user-identifier",
98+
help="User identifier for impersonation",
99+
)
100+
parser.add_argument(
101+
"--resource",
102+
help="Resource URI to get a token for",
103+
)
104+
105+
args = parser.parse_args()
106+
107+
if not args.zone_url:
108+
_error_exit("--zone-url is required (or set ZONE_URL)")
109+
if not args.client_id:
110+
_error_exit("--client-id is required (or set AGENT_CLIENT_ID)")
111+
if not args.client_secret:
112+
_error_exit("--client-secret is required (or set AGENT_CLIENT_SECRET)")
113+
if not args.user_identifier:
114+
_error_exit("--user-identifier is required")
115+
if not args.resource:
116+
_error_exit("--resource is required")
117+
118+
print("═══ Background Agent ═══")
119+
print(" Auth: client_credentials")
120+
print(f" On behalf of: {args.user_identifier}")
121+
print(f" Access resource: {args.resource}")
122+
print()
123+
124+
run_background_agent(
125+
zone_url=args.zone_url,
126+
client_id=args.client_id,
127+
client_secret=args.client_secret,
128+
user_identifier=args.user_identifier,
129+
resource=args.resource,
130+
)
131+
132+
133+
if __name__ == "__main__":
134+
main()

0 commit comments

Comments
 (0)