diff --git a/HUMANS.md b/HUMANS.md index ee9949c..b2eee82 100644 --- a/HUMANS.md +++ b/HUMANS.md @@ -160,6 +160,106 @@ woltspace init # creates bob All wolts share one container. `woltspace.json` tracks which wolt is active. Auth is shared — after the first wolt authenticates, new ones reuse the token. +## Multi-user permissions (optional) + +By default a woltspace container is single-tenant — anyone past the tunnel +sees and controls every wolt. To gate per user (collaborators, family, +small group), there's an opt-in mode that uses your existing Cloudflare +Access setup. + +### Setup + +1. **Seed yourself** into `wolts/.space/auth/users.json` so you don't + lock yourself out. From inside the container (or via a wolt session + using the `woltspace-access` skill): + ```bash + access add you@example.com '*' + ``` + The `'*'` is a wildcard meaning "every wolt." + +2. **Enable auth** in `~/.woltspace/wolts/.env`: + ```bash + WOLTSPACE_AUTH=cloudflare + WOLTSPACE_CF_TEAM_DOMAIN=yourteam.cloudflareaccess.com + WOLTSPACE_CF_AUD= + ``` + +3. **Restart** the server. Visit the lodge — Cloudflare Access asks for + email OTP, the JWT lands at the server, the middleware validates it + and looks up your email in `users.json`. + +### Adding collaborators + +Two steps: + +1. Add their email to the Cloudflare Access policy + (Zero Trust → Access → Applications → policies → emails) so they + can reach the tunnel. + +2. Add them to `users.json`: + ```bash + access add bob@example.com bloggo shared-wolt + ``` + +`users.json` looks like: + +```json +{ + "users": [ + {"email": "you@example.com", "wolts": ["*"]}, + {"email": "collaborator@example.com", "wolts": ["bloggo"]} + ] +} +``` + +Users see and control only wolts in their allow-list. Apps inherit +access from their keeper wolt. A user can also create new wolts via the +lodge UI — when they do, the new wolt is auto-appended to their own +allow-list (self-onboarding). + +### Accessing via localhost (WOLTSPACE_AUTH_TRUST_LOCAL) + +When auth is on, requests must carry a Cloudflare Access JWT — which only +exists for traffic that came through the tunnel (`yourname.woltspace.com`). +Hitting `localhost:7777` directly has **no JWT**, so by default you'll see +no wolts. That's the secure default: no Cloudflare login = no identity. + +In-container tools (the `notify`, `push-view`, `access` CLIs) still work — +they originate from genuine loopback (127.0.0.1) and are always trusted. + +If you want plain `localhost` browser access to work while auth is on, set: + +```bash +WOLTSPACE_AUTH_TRUST_LOCAL=true +``` + +This trusts callers on the private network (including your host browser, +which Docker presents as the bridge gateway address) and grants them full +wildcard access without a JWT. + +> ⚠️ **Only enable this on a network you trust.** The container publishes +> its port with `-p 7777:7777` (binds `0.0.0.0`), so the port is reachable +> from your whole LAN — and every such caller appears as the same private +> address inside the container. There is no way to distinguish your own +> browser from another device on the network. With this flag on, **anyone +> who can reach `your-machine-ip:7777` on your LAN gets unauthenticated +> full access.** Leave it off if your machine is on shared/untrusted wifi; +> use the tunnel URL instead. + +Default is OFF. Remote users always go through the tunnel + Cloudflare +Access regardless of this setting. + +### Scope of enforcement + +This is application-layer. The lodge UI shows only what the user is +allowed to see, and the REST API refuses cross-wolt requests. It does +**not** stop a session that's already running from reading another +wolt's files via the shell — all sessions still run as the same OS +user. That's tracked separately (filesystem isolation, issue #354). + +Default mode is `WOLTSPACE_AUTH=none` — single-tenant, today's behavior, +zero configuration. Skip this whole section if that's what you want. + ## Messaging (Telegram, etc.) Wolts can talk through messaging apps. Add config to `~/.woltspace/wolts/.env`: diff --git a/container/bin/access b/container/bin/access new file mode 100755 index 0000000..efbf528 --- /dev/null +++ b/container/bin/access @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +"""Manage woltspace user permissions — edits wolts/.space/auth/users.json. + +This is a convenience CLI. It is NOT a security boundary — anyone with shell +access in the container can write the underlying file directly. Real +enforcement requires filesystem-level isolation (issue #354). See HUMANS.md. + +Usage: + access list + access add EMAIL [WOLTS...] — add a user with an allow-list (use "*" for all wolts) + access grant EMAIL WOLTS... — append wolts to an existing user's allow-list + access revoke EMAIL WOLTS... — remove wolts from a user's allow-list + access remove EMAIL — delete a user entry entirely + access check EMAIL WOLT — does EMAIL have access to WOLT? +""" + +import json +import os +import sys +import time +from pathlib import Path + + +WOLTS_DIR = Path(os.environ.get("WOLTS_DIR", "/workspace/wolts")) +USERS_FILE = WOLTS_DIR / ".space" / "auth" / "users.json" + + +def load() -> list[dict]: + if not USERS_FILE.exists(): + return [] + try: + data = json.loads(USERS_FILE.read_text()) + return data.get("users", []) if isinstance(data, dict) else [] + except json.JSONDecodeError as e: + die(f"users.json is corrupt: {e}") + + +def save(users: list[dict]) -> None: + USERS_FILE.parent.mkdir(parents=True, exist_ok=True) + USERS_FILE.write_text(json.dumps({"users": users}, indent=2) + "\n") + + +def die(msg: str, code: int = 1) -> None: + print(f"error: {msg}", file=sys.stderr) + sys.exit(code) + + +def find(users: list[dict], email: str) -> dict | None: + email = email.lower() + for u in users: + if (u.get("email") or "").lower() == email: + return u + return None + + +def fmt_user(u: dict) -> str: + wolts = u.get("wolts") or [] + allow = "all" if "*" in wolts else (", ".join(wolts) if wolts else "(none)") + return f" {u['email']:<35} wolts: {allow}" + + +# --- Subcommands --- + +def cmd_list(_args: list[str]) -> None: + users = load() + if not users: + print("(no users yet — run 'access add EMAIL *' to add the first one)") + return + print(f"users ({len(users)}):") + for u in users: + print(fmt_user(u)) + + +def cmd_add(args: list[str]) -> None: + if not args: + die("usage: access add EMAIL [WOLTS...]") + email = args[0].strip().lower() + wolts = list(args[1:]) + users = load() + if find(users, email): + die(f"{email} already exists — use 'grant' to add wolts") + users.append({ + "email": email, + "wolts": wolts, + "added_at": int(time.time()), + }) + save(users) + print(f"added {email} with wolts: {wolts or '(none)'}") + + +def cmd_grant(args: list[str]) -> None: + if len(args) < 2: + die("usage: access grant EMAIL WOLTS...") + email = args[0].strip().lower() + new_wolts = args[1:] + users = load() + u = find(users, email) + if not u: + die(f"no such user: {email}. use 'add' to create.") + allow = u.get("wolts") or [] + if "*" in allow: + die(f"{email} already has wildcard access ('*') to everything") + for w in new_wolts: + if w not in allow: + allow.append(w) + u["wolts"] = allow + save(users) + print(f"granted {email}: {', '.join(new_wolts)}") + print(fmt_user(u)) + + +def cmd_revoke(args: list[str]) -> None: + if len(args) < 2: + die("usage: access revoke EMAIL WOLTS...") + email = args[0].strip().lower() + remove = set(args[1:]) + users = load() + u = find(users, email) + if not u: + die(f"no such user: {email}") + u["wolts"] = [w for w in (u.get("wolts") or []) if w not in remove] + save(users) + print(f"revoked from {email}: {', '.join(remove)}") + print(fmt_user(u)) + + +def cmd_remove(args: list[str]) -> None: + if not args: + die("usage: access remove EMAIL") + email = args[0].strip().lower() + users = load() + before = len(users) + users = [u for u in users if (u.get("email") or "").lower() != email] + if len(users) == before: + die(f"no such user: {email}") + save(users) + print(f"removed {email}") + + +def cmd_check(args: list[str]) -> None: + if len(args) < 2: + die("usage: access check EMAIL WOLT") + email = args[0].strip().lower() + wolt = args[1] + users = load() + u = find(users, email) + if not u: + print(f"{email} → NOT IN users.json (denied)") + sys.exit(2) + allow = u.get("wolts") or [] + if "*" in allow or wolt in allow: + print(f"{email} → ALLOWED for {wolt}") + sys.exit(0) + print(f"{email} → DENIED for {wolt} (allow-list: {allow})") + sys.exit(2) + + +COMMANDS = { + "list": cmd_list, + "ls": cmd_list, + "add": cmd_add, + "grant": cmd_grant, + "revoke": cmd_revoke, + "remove": cmd_remove, + "rm": cmd_remove, + "check": cmd_check, +} + + +def main() -> None: + if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help", "help"): + print(__doc__) + sys.exit(0) + cmd, *args = sys.argv[1:] + fn = COMMANDS.get(cmd) + if not fn: + die(f"unknown command: {cmd}. try: {', '.join(sorted(set(COMMANDS.keys())))}") + fn(args) + + +if __name__ == "__main__": + main() diff --git a/container/skills/woltspace-access/SKILL.md b/container/skills/woltspace-access/SKILL.md new file mode 100644 index 0000000..454944b --- /dev/null +++ b/container/skills/woltspace-access/SKILL.md @@ -0,0 +1,177 @@ +--- +name: woltspace-access +description: Turn on multi-user auth, manage which users can access which wolts, and troubleshoot access. Use when the operator asks to enable per-user permissions, add a user, grant or revoke wolt access, audit who has access, or debug "I see no wolts". Wraps the `access` CLI (edits wolts/.space/auth/users.json) and the Cloudflare Access layer. +--- + +# Access — Multi-User Permissions + +By default a woltspace container is single-tenant: anyone past the Cloudflare tunnel sees and controls every wolt. This skill is for the opt-in mode where each user only sees the wolts in their personal allow-list. + +**Two layers, both required:** +1. **Cloudflare Access** — controls *who can reach the lodge at all* (the front door). Gated at the edge by email OTP. +2. **users.json** — controls *what each authenticated user sees inside* (per-wolt allow-list). + +A user needs to be in BOTH: the CF Access policy (to get through the tunnel) and `users.json` (to see any wolts). + +## ⚠️ Honesty note + +This skill is a convenience layer, **not a security boundary**. Any wolt session can edit `wolts/.space/auth/users.json` directly via the shell, with or without this skill. The skill makes admin tasks ergonomic; it does not enforce who can perform them. Real OS-level enforcement is tracked in issue #354. Until that lands, trust the people you let into the container. + +--- + +## How it works (mental model) + +- Every request through the tunnel (`yourname.woltspace.com`) carries a Cloudflare-signed JWT with the user's email. The server validates it and looks the email up in `users.json`. +- The wildcard `"*"` in a user's `wolts` list means "every wolt." It's a convenience, not a role — there is no admin concept in this MVP. +- Apps inherit access from their **keeper wolt** (the wolt that owns them). See an app iff you can see its keeper. +- Requests that didn't traverse Cloudflare (direct `localhost`) have no JWT — see the localhost section below. +- Config is read from `wolts/.env` live, so editing it + reloading the server (no full container restart) is enough. + +--- + +## Flow 1 — First-time auth setup + +When the operator says "turn on multi-user auth" / "enable per-user permissions": + +### 1. Seed the operator into users.json FIRST + +Before flipping the toggle, add the operator so they don't lock themselves out: + +```bash +access add OPERATOR_EMAIL '*' +``` + +`'*'` = every wolt. Single-quote it so the shell doesn't glob. + +### 2. Find the Cloudflare config values + +You need the team domain and the lodge app's AUD tag. If `CLOUDFLARE_API_TOKEN` + `CLOUDFLARE_ACCOUNT_ID` are in `.env`, look them up directly: + +```bash +# team domain — the redirect target on an unauthenticated request +curl -sI https://YOURNAME.woltspace.com | grep -i location +# → .../cdn-cgi/access/login/... the host is your team domain +# (e.g. jerpint.cloudflareaccess.com) + +# AUD tag for the lodge app +. /workspace/wolts/.env && curl -s -H "Authorization: Bearer $CLOUDFLARE_API_TOKEN" \ + "https://api.cloudflare.com/client/v4/accounts/$CLOUDFLARE_ACCOUNT_ID/access/apps" \ + | python3 -c "import sys,json; [print(a['name'], a['aud'], a['domain']) for a in json.load(sys.stdin)['result']]" +# → pick the row whose domain is your lodge (e.g. jerpint.woltspace.com) +``` + +Or read both from the Cloudflare Zero Trust dashboard: Settings → Custom Pages shows the team domain; Access → Applications → your lodge app → Overview shows the AUD tag. + +### 3. Configure env vars + +Add to `wolts/.env`: + +```bash +WOLTSPACE_AUTH=cloudflare +WOLTSPACE_CF_TEAM_DOMAIN=yourteam.cloudflareaccess.com +WOLTSPACE_CF_AUD= +``` + +### 4. Reload the server + +Config is read from `.env` live, but the JWKS cache + module state want a clean reload. Touch the server entry so uvicorn reloads: + +```bash +touch /workspace/woltspace/server/app.py +``` + +(Or restart the container. A full rebuild is only needed if `PyJWT` isn't installed yet — see Troubleshooting.) + +### 5. Verify with /auth/debug + +From inside the container (loopback only): + +```bash +curl -s http://localhost:7777/auth/debug | python3 -m json.tool +``` + +Confirm: `auth_mode: cloudflare`, `pyjwt_installed: true`, `team_domain` set, `aud_set: true`, and your `users_emails` listed. `last_auth_error` should be `null`. + +### 6. Log in + +Visit the lodge URL. Cloudflare OTP-logs you in, the JWT reaches the server, you see your wolts. A CF-authenticated user who isn't in `users.json` gets a 403 "pending approval" — that's why step 1 matters. + +### 7. (Optional) localhost browser access + +With auth on, `localhost:7777` shows no wolts: Docker's `-p 7777:7777` presents your host browser as the bridge gateway (e.g. `172.17.0.1`), not `127.0.0.1`, and it carries no JWT. To allow it: + +```bash +WOLTSPACE_AUTH_TRUST_LOCAL=true +``` + +> ⚠️ The port binds `0.0.0.0`, so it's reachable from your whole LAN, and every such caller looks like the same private address inside the container. With this flag on, **anyone on your network who can reach `your-machine-ip:7777` gets unauthenticated full access.** Only enable on a trusted network. Default OFF. Genuine in-container loopback (notify, access, push-view CLIs) is always trusted regardless of this flag; remote users always go through the tunnel + CF Access. + +--- + +## Flow 2 — Day-to-day user management + +### Add a new user (two steps) + +**Step A — Cloudflare Access** (lets them reach the lodge). Add their email to the lodge app's policy. Via dashboard: Zero Trust → Access → Applications → your lodge app → Policies → add email. Via API (if your token has `Access: Apps and Policies: Edit`): + +```bash +. /workspace/wolts/.env +APP_AUD= # the app id, same as its AUD tag +# Create a small dedicated policy (works even with create-only token scope): +curl -s -H "Authorization: Bearer $CLOUDFLARE_API_TOKEN" -H "Content-Type: application/json" \ + "https://api.cloudflare.com/client/v4/accounts/$CLOUDFLARE_ACCOUNT_ID/access/apps/$APP_AUD/policies" \ + -d '{"name":"allow-NAME","decision":"allow","precedence":2,"include":[{"email":{"email":"bob@example.com"}}]}' +``` + +> Token-scope gotcha: a token with only create scope returns `10405 "method not allowed"` on PUT/DELETE. You can still CREATE new policies (multiple allow-policies stack — any match lets the user in), but can't edit/delete existing ones. To get full control, add **Account → Access: Apps and Policies → Edit** to the token at dash.cloudflare.com/profile/api-tokens. + +**Step B — users.json** (controls what they see inside): + +```bash +access add bob@example.com bloggo +``` + +Without Step A, Bob can't reach the lodge. Without Step B, he reaches it but sees nothing (403 pending-approval). + +### Other commands + +```bash +access grant bob@example.com shared-wolt corework # add wolts +access revoke bob@example.com corework # remove specific wolts +access add alice@example.com '*' # wildcard (sees everything) +access remove bob@example.com # delete the user entry +access list # show all users + allow-lists +access check bob@example.com bloggo # exit 0 = allowed, 2 = denied +``` + +There's no "set" command by design — to switch a user to wildcard, `remove` + re-`add`, or hand-edit the JSON. + +--- + +## Self-onboarding (browser-driven) + +When auth is on and a user creates a wolt through the lodge UI, the server auto-appends that new wolt to the creator's allow-list. So a user with `wolts: []` can create their first wolt and immediately use it — no manual grant needed. + +To onboard a collaborator who'll have their own wolts: `access add them@email.com` with an empty allow-list (still do the CF Access step), and let them create what they need. To share an *existing* wolt, use `grant`. + +--- + +## Troubleshooting + +**"I see no wolts" (operator or user):** run `curl -s http://localhost:7777/auth/debug | python3 -m json.tool` from inside the container and check, in order: + +- `pyjwt_installed: false` → the server venv is missing PyJWT. Rebuild the container (`woltspace rebuild`) so `uv sync` installs it, or as a stopgap `uv pip install --python /workspace/woltspace/server/.venv/bin/python "PyJWT[crypto]"`. This is the #1 cause — a missing decoder makes every request anonymous. +- `auth_mode` not `cloudflare` → env var not picked up; check `wolts/.env` and reload. +- `team_domain: null` or `aud_set: false` → missing `WOLTSPACE_CF_TEAM_DOMAIN` / `WOLTSPACE_CF_AUD`. +- `last_auth_error` non-null → it tells you exactly what failed (wrong AUD, kid not in JWKS, JWKS unreachable, etc). +- On localhost specifically, `request_email` will be `__local__` only if loopback/trust-local applies — otherwise it's `null` and you see nothing (set `WOLTSPACE_AUTH_TRUST_LOCAL=true`, step 7). + +**User stuck at Cloudflare login / can't reach lodge:** they're not in the CF Access policy (Step A), independent of users.json. + +**Rolling back entirely:** set `WOLTSPACE_AUTH=none` in `wolts/.env` and reload. Middleware becomes a no-op; everyone sees everything again. `users.json` stays on disk, dormant. + +--- + +## Confirm before destructive ops + +Always confirm with the operator before `remove`, revoking a user's last wolt, or deleting a Cloudflare policy — these can lock people out of their own work. diff --git a/server/app.py b/server/app.py index b4fb49d..bdbad87 100644 --- a/server/app.py +++ b/server/app.py @@ -45,6 +45,7 @@ load_dotenv, ) from . import tunnel as tunnel_mgr +from . import auth as auth_mod from .notify import send_notification from .sparks import get_spark_with_chain, list_sparks @@ -149,6 +150,7 @@ async def lifespan(app: FastAPI): wolt: {WOLT_NAME} tui proxy → localhost:{TUI_PORT} tunnel: {tunnel_mgr.get_tunnel_url() or 'disabled'} + auth: {auth_mod.auth_mode()} """) yield tunnel_mgr.stop_tunnel() @@ -179,6 +181,26 @@ async def cors_middleware(request: Request, call_next): response.headers["Access-Control-Allow-Headers"] = "Content-Type" return response + +@app.middleware("http") +async def auth_middleware(request: Request, call_next): + """Extract the Cloudflare Access JWT and attach the email to request.state. + + In auth=none mode this is a no-op. In auth=cloudflare mode the email is + extracted (None if missing/invalid); route guards then decide what to do + with it. + + Loopback safety net: in-container localhost callers have no JWT (they + didn't traverse the CF edge). Treat them as a trusted "__local__" user + with wildcard access so the operator can never lock themselves out via + direct localhost access (desktop app, debugging, etc). + """ + email = auth_mod.extract_email(request) + if not email and auth_mod.is_enabled() and auth_mod.is_loopback(request): + email = "__local__" + request.state.user_email = email + return await call_next(request) + def _extract_app_subdomain(host_header: str) -> str | None: """Extract app name from subdomain hostname, or None if not an app subdomain. @@ -409,6 +431,41 @@ async def _serve_platform_file(filename: str) -> Response | None: # ============================================================ # jerpint: good idea, but eventually should be tied to version of woltspace package not just plaintext +@app.get("/auth/debug") +async def auth_debug(request: Request): + """Diagnose what the auth middleware sees on this request. + + Only callable from in-container loopback (127.0.0.1). Don't expose externally. + """ + if not auth_mod.is_loopback(request): + return JSONResponse({"error": "loopback only"}, status_code=403) + return { + "auth_mode": auth_mod.auth_mode(), + "team_domain": auth_mod._team_domain() or None, + "aud_set": bool(auth_mod._aud_tag()), + "aud_tail": auth_mod._aud_tag()[-8:] if auth_mod._aud_tag() else None, + "request_email": auth_mod.user_email(request), + "client_host": request.client.host if request.client else None, + "has_jwt_header": bool( + request.headers.get(auth_mod.AUTH_HEADER) + or request.headers.get(auth_mod.AUTH_HEADER.lower()) + ), + "users_count": len(auth_mod.load_users()), + "users_emails": [u.get("email") for u in auth_mod.load_users()], + "last_auth_error": auth_mod.last_error() or None, + "pyjwt_installed": _pyjwt_available(), + } + + +def _pyjwt_available() -> bool: + try: + import jwt as _jwt # noqa: F401 + from jwt.algorithms import RSAAlgorithm # noqa: F401 + return True + except ImportError: + return False + + @app.get("/version") async def version(): return PlainTextResponse("woltspace-v1") @@ -591,7 +648,18 @@ async def session_new_create(request: Request): The server scaffolds the full wolt directory (including .claude/ isolation) before spawning the session. No fallback HOME needed. + + In cloudflare auth mode: requires an authenticated caller. The new wolt's + name is auto-appended to the caller's allow-list so they can see what + they just made. """ + # auth_mode=cloudflare: caller must have a valid JWT (the middleware + # already validated it; if email is None, no JWT was presented). + caller_email: str | None = None + if auth_mod.is_enabled(): + caller_email = auth_mod.user_email(request) + if not caller_email: + return JSONResponse({"error": "not authenticated"}, status_code=401) body = await request.json() wolt_name = (body.get("name") or "").strip().lower() wolt_type = (body.get("type") or "").strip().lower() @@ -615,6 +683,11 @@ async def session_new_create(request: Request): create_creature_wolt(wolt_name, wolt_type) print(f"[sessions/create] scaffolded wolt '{wolt_name}' ({wolt_type})") + # Step 1b: in cloudflare auth mode, grant the creator access to what + # they just made (auto-onboards a user with an empty allow-list). + if caller_email: + auth_mod.grant_wolt(caller_email, wolt_name) + # Step 2: Start a session — full isolation, site auto-start, viewport result = start_session( wolt=wolt_name, @@ -636,6 +709,8 @@ async def session_new_lodge(request: Request): wolt = body.get("wolt") if not wolt: return JSONResponse({"error": "wolt required"}, status_code=400) + if (denied := auth_mod.require_wolt(request, wolt)) is not None: + return denied try: result = start_session( wolt=wolt, @@ -660,6 +735,8 @@ async def session_new_telegram(request: Request): wolt = body.get("wolt") if not wolt: return JSONResponse({"error": "wolt required"}, status_code=400) + if (denied := auth_mod.require_wolt(request, wolt)) is not None: + return denied try: result = start_session( wolt=wolt, @@ -688,6 +765,8 @@ async def session_new_slack(request: Request): wolt = body.get("wolt") if not wolt: return JSONResponse({"error": "wolt required"}, status_code=400) + if (denied := auth_mod.require_wolt(request, wolt)) is not None: + return denied try: result = start_session( wolt=wolt, @@ -713,11 +792,14 @@ async def session_new_slack(request: Request): # --- Sessions list --- @app.get("/sessions") -async def list_sessions(): +async def list_sessions(request: Request): from sessions import SessionRegistry reg = SessionRegistry(WOLTS_DIR) sessions = reg.list() sessions.sort(key=lambda s: (0 if s.get("status") == "running" else 1, -(s.get("created_at") or 0))) + if auth_mod.is_enabled(): + email = auth_mod.user_email(request) + sessions = [s for s in sessions if auth_mod.can_access_wolt(email, s.get("wolt") or "")] return sessions @@ -762,7 +844,7 @@ async def history_detail(spark_id: str): # --- Wolts --- @app.get("/wolts") -async def list_wolts(): +async def list_wolts(request: Request): """List all wolts by scanning WOLTS_DIR for wolt/wolt.json files.""" wolts = [] if WOLTS_DIR.exists(): @@ -780,6 +862,8 @@ async def list_wolts(): }) except Exception: pass + if auth_mod.is_enabled(): + wolts = auth_mod.visible_wolts(auth_mod.user_email(request), wolts) return wolts @@ -788,12 +872,15 @@ async def list_wolts(): # App names are globally unique. Keeper (owning wolt) is in woltspace.json. @app.get("/apps") -async def list_apps_api(): +async def list_apps_api(request: Request): """List all apps that have woltspace.json.""" apps = discover_apps() running = {r["name"]: r for r in running_apps()} + email = auth_mod.user_email(request) result = [] for a in apps: + if auth_mod.is_enabled() and not auth_mod.can_access_wolt(email, a.keeper): + continue entry = a.model_dump() run_state = running.get(a.name) entry["running"] = run_state is not None @@ -806,11 +893,13 @@ async def list_apps_api(): @app.get("/apps/{name}") -async def app_detail(name: str): +async def app_detail(name: str, request: Request): """Get a single app's manifest and running state.""" app_obj = get_app(name) if not app_obj: return JSONResponse({"error": f"app {name} not found"}, status_code=404) + if (denied := auth_mod.require_app(request, name)) is not None: + return denied running = {r["name"]: r for r in running_apps()} entry = app_obj.model_dump() run_state = running.get(name) @@ -823,8 +912,10 @@ async def app_detail(name: str): @app.post("/apps/{name}/start") -async def app_start(name: str): +async def app_start(name: str, request: Request): """Start an app's dev server.""" + if (denied := auth_mod.require_app(request, name)) is not None: + return denied try: state = start_app(name) print(f"[apps] started {name} on port {state['port']}") @@ -836,8 +927,10 @@ async def app_start(name: str): @app.post("/apps/{name}/stop") -async def app_stop(name: str): +async def app_stop(name: str, request: Request): """Stop a running app.""" + if (denied := auth_mod.require_app(request, name)) is not None: + return denied was_running = stop_app(name) if was_running: print(f"[apps] stopped {name}") @@ -846,8 +939,10 @@ async def app_stop(name: str): @app.post("/apps/{name}/share") -async def app_share(name: str): +async def app_share(name: str, request: Request): """Start a cloudflared tunnel to the app port and return the public URL.""" + if (denied := auth_mod.require_app(request, name)) is not None: + return denied import asyncio try: # share_app blocks (polls cloudflared log up to 30s) — run in thread @@ -861,8 +956,10 @@ async def app_share(name: str): @app.post("/apps/{name}/unshare") -async def app_unshare(name: str): +async def app_unshare(name: str, request: Request): """Stop the cloudflared tunnel for an app.""" + if (denied := auth_mod.require_app(request, name)) is not None: + return denied was_sharing = unshare_app(name) if was_sharing: print(f"[apps] unshared {name}") @@ -884,6 +981,10 @@ async def app_unshare_all(): async def session_resume(name: str, request: Request): """Resume a stopped/orphaned session by name.""" safe = "".join(c for c in name if c.isalnum() or c in "-_") + if auth_mod.is_enabled(): + wolt = safe.rsplit("-", 3)[0] if safe.count("-") >= 3 else safe + if (denied := auth_mod.require_wolt(request, wolt)) is not None: + return denied body = await request.json() prompt = body.get("prompt", "") try: @@ -897,9 +998,13 @@ async def session_resume(name: str, request: Request): @app.post("/sessions/{name}/stop") -async def session_stop(name: str): +async def session_stop(name: str, request: Request): """Stop a running session — kill tmux, mark as stopped.""" safe = "".join(c for c in name if c.isalnum() or c in "-_") + if auth_mod.is_enabled(): + wolt = safe.rsplit("-", 3)[0] if safe.count("-") >= 3 else safe + if (denied := auth_mod.require_wolt(request, wolt)) is not None: + return denied try: result = stop_session(safe) print(f"[sessions/stop] {safe} → stopped (was_alive={result.get('was_alive')})") @@ -915,14 +1020,20 @@ async def session_stop(name: str): # Sites auto-start when a session begins outside an app context. @app.get("/sites") -async def list_sites_api(): +async def list_sites_api(request: Request): """List all running wolt sites.""" - return running_sites() + sites = running_sites() + if auth_mod.is_enabled(): + email = auth_mod.user_email(request) + sites = [s for s in sites if auth_mod.can_access_wolt(email, s.get("wolt") or "")] + return sites @app.get("/sites/{wolt_name}") -async def site_detail(wolt_name: str): +async def site_detail(wolt_name: str, request: Request): """Get a wolt's site state.""" + if (denied := auth_mod.require_wolt(request, wolt_name)) is not None: + return denied state = get_site_state(wolt_name) sdir = site_dir(wolt_name) return { @@ -935,8 +1046,10 @@ async def site_detail(wolt_name: str): @app.post("/sites/{wolt_name}/start") -async def site_start(wolt_name: str): +async def site_start(wolt_name: str, request: Request): """Start a wolt's site livereload server.""" + if (denied := auth_mod.require_wolt(request, wolt_name)) is not None: + return denied sdir = site_dir(wolt_name) wolt_dir = WOLTS_DIR / wolt_name if not wolt_dir.exists(): @@ -950,8 +1063,10 @@ async def site_start(wolt_name: str): @app.post("/sites/{wolt_name}/stop") -async def site_stop(wolt_name: str): +async def site_stop(wolt_name: str, request: Request): """Stop a wolt's site livereload server.""" + if (denied := auth_mod.require_wolt(request, wolt_name)) is not None: + return denied was_running = stop_site(wolt_name) if was_running: print(f"[sites] stopped {wolt_name}") @@ -966,6 +1081,9 @@ async def serve_wolt_site(wolt_name: str, request: Request, path: str = ""): if not re.match(r"^[a-zA-Z][a-zA-Z0-9_-]*$", wolt_name): return JSONResponse({"error": "invalid wolt name"}, status_code=400) + if (denied := auth_mod.require_wolt(request, wolt_name)) is not None: + return denied + wolt_dir = WOLTS_DIR / wolt_name if not wolt_dir.exists(): return PlainTextResponse(f"Wolt {wolt_name} not found", status_code=404) @@ -1045,6 +1163,10 @@ async def site_livereload_ws(wolt_name: str, ws: WebSocket): """Watch a wolt's site dir for changes and push reload via WebSocket.""" from watchfiles import awatch + if auth_mod.is_enabled() and not auth_mod.ws_can_access_wolt(ws, wolt_name): + await ws.close(code=1008) + return + sdir = WOLTS_DIR / wolt_name / "wolt" / "site" if not sdir.exists(): await ws.close() @@ -1073,6 +1195,8 @@ async def serve_app(app_name: str, request: Request, path: str = ""): """ if not re.match(r"^[a-zA-Z][a-zA-Z0-9_-]*$", app_name): return JSONResponse({"error": "invalid app name"}, status_code=400) + if (denied := auth_mod.require_app(request, app_name)) is not None: + return denied adir = app_dir(app_name) if not adir.exists(): return HTMLResponse(_app_not_found(app_name), status_code=404) @@ -1196,6 +1320,21 @@ async def tui_proxy(ws: WebSocket): import websockets session = ws.query_params.get("session", "main") + + # Gate terminal attach by the session's wolt. The http auth middleware + # doesn't run for websockets, so check here directly. "main" is the + # platform shell — loopback only. + if auth_mod.is_enabled(): + if session == "main": + if not auth_mod.is_loopback(ws): + await ws.close(code=1008) + return + else: + wolt = auth_mod.wolt_from_session(session) + if not auth_mod.ws_can_access_wolt(ws, wolt): + await ws.close(code=1008) + return + await ws.accept() try: diff --git a/server/auth.py b/server/auth.py new file mode 100644 index 0000000..a315c29 --- /dev/null +++ b/server/auth.py @@ -0,0 +1,435 @@ +"""Web-layer auth — Cloudflare Access JWT + per-user wolt allow-lists. + +Two modes, picked at boot via WOLTSPACE_AUTH: + - "none" (default): no-op. Every request allowed. Today's behavior. + - "cloudflare": Validate Cf-Access-Jwt-Assertion, look up email + in users.json, gate access to wolts (and apps, + via their keeper wolt). + +Data model — wolts/.space/auth/users.json: + + { + "users": [ + {"email": "alice@example.com", "wolts": ["*"]}, + {"email": "bob@example.com", "wolts": ["bloggo", "shared"]} + ] + } + +The wildcard "*" in wolts means "every wolt" — a convenience, not a role. +This MVP has no admin concept; access is purely allow-list. An app is +accessible iff its keeper wolt is. + +See: github.com/jerpint/woltspace/issues/353 +""" + +from __future__ import annotations + +import json +import os +import time +from pathlib import Path +from typing import Any + +from fastapi import Request +from fastapi.responses import JSONResponse + + +AUTH_HEADER = "Cf-Access-Jwt-Assertion" + + +# --- Mode + config --- + +def _env(key: str) -> str: + """Read env var. Falls back to the shared wolts/.env so auth settings + can be flipped without restarting the server process.""" + v = os.environ.get(key) + if v: + return v + # Fallback: parse the shared wolts root .env directly. Auth config lives + # here (next to CLOUDFLARE_* etc) and may have been edited after server + # boot. + try: + wolts_dir = Path(os.environ.get("WOLTS_DIR", "/workspace/wolts")) + env_file = wolts_dir / ".env" + if env_file.exists(): + for line in env_file.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, _, val = line.partition("=") + if k.strip() == key: + return val.strip().strip('"').strip("'") + except Exception: + pass + return "" + + +def auth_mode() -> str: + """Return 'cloudflare' or 'none' (default).""" + return (_env("WOLTSPACE_AUTH") or "none").strip().lower() + + +def is_enabled() -> bool: + return auth_mode() == "cloudflare" + + +def _team_domain() -> str: + """e.g. 'jerpint.cloudflareaccess.com'. Set via env.""" + return _env("WOLTSPACE_CF_TEAM_DOMAIN").strip() + + +def _aud_tag() -> str: + """The Application AUD tag for the lodge Access app. Set via env.""" + return _env("WOLTSPACE_CF_AUD").strip() + + +# --- users.json --- + +def _users_path() -> Path: + # Imported lazily so tests can patch WOLTS_DIR via the env var. + wolts_dir = Path(os.environ.get("WOLTS_DIR", "/workspace/wolts")) + return wolts_dir / ".space" / "auth" / "users.json" + + +def load_users() -> list[dict[str, Any]]: + """Read users.json. Returns [] if missing or unparseable.""" + p = _users_path() + if not p.exists(): + return [] + try: + data = json.loads(p.read_text()) + return data.get("users", []) if isinstance(data, dict) else [] + except Exception: + return [] + + +def save_users(users: list[dict[str, Any]]) -> None: + p = _users_path() + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(json.dumps({"users": users}, indent=2) + "\n") + + +def find_user(email: str) -> dict[str, Any] | None: + if not email: + return None + email = email.lower() + for u in load_users(): + if (u.get("email") or "").lower() == email: + return u + return None + + +def add_user(email: str, wolts: list[str] | None = None) -> dict: + """Add a user or return the existing entry. Idempotent.""" + email = email.strip().lower() + users = load_users() + for u in users: + if (u.get("email") or "").lower() == email: + return u + entry = {"email": email, "wolts": wolts or [], "added_at": int(time.time())} + users.append(entry) + save_users(users) + return entry + + +def grant_wolt(email: str, wolt_name: str) -> None: + """Append wolt_name to email's allow-list if not already there. Idempotent. + + Creates the user entry if it doesn't exist (used by auto-onboarding on + wolt creation).""" + email = email.strip().lower() + users = load_users() + u = None + for entry in users: + if (entry.get("email") or "").lower() == email: + u = entry + break + if u is None: + u = {"email": email, "wolts": [], "added_at": int(time.time())} + users.append(u) + allow = u.get("wolts") or [] + if "*" in allow or wolt_name in allow: + return + allow.append(wolt_name) + u["wolts"] = allow + save_users(users) + + +# --- Permission resolution --- + +def can_access_wolt(email: str | None, wolt_name: str) -> bool: + """Auth disabled → True. Otherwise: user must exist and the wolt must be + in their list (or they hold the wildcard). + + The synthetic '__local__' email is granted full access — see is_loopback + in the middleware. This is the in-container localhost safety net. + """ + if not is_enabled(): + return True + if not email: + return False + if email == "__local__": + return True + u = find_user(email) + if not u: + return False + allowed = u.get("wolts") or [] + return "*" in allowed or wolt_name in allowed + + +def can_access_app(email: str | None, app_name: str) -> bool: + """An app is accessible iff its keeper wolt is.""" + if not is_enabled(): + return True + # Resolve keeper. Lazy import — avoids circular dep at module load. + try: + import sys + from pathlib import Path as _P + lib = _P(__file__).resolve().parent.parent / "container" / "lib" + if str(lib) not in sys.path: + sys.path.insert(0, str(lib)) + from apps import get_app # type: ignore + app = get_app(app_name) + except Exception: + return False + if not app: + return False + return can_access_wolt(email, app.keeper) + + +def visible_wolts(email: str | None, all_wolts: list[dict]) -> list[dict]: + """Filter a list of wolt dicts (with 'dir' or 'name' key) to those the user can see.""" + if not is_enabled(): + return all_wolts + if not email: + return [] + if email == "__local__": + return all_wolts + u = find_user(email) + if not u: + return [] + allowed = set(u.get("wolts") or []) + if "*" in allowed: + return all_wolts + return [w for w in all_wolts if (w.get("dir") or w.get("name")) in allowed] + + +# --- JWT validation --- + +_jwks_cache: dict[str, Any] = {"keys": None, "fetched_at": 0} +_JWKS_TTL = 3600 # 1h + + +def _fetch_jwks() -> list[dict] | None: + """Fetch and cache Cloudflare Access public keys.""" + td = _team_domain() + if not td: + return None + now = time.time() + if _jwks_cache["keys"] and now - _jwks_cache["fetched_at"] < _JWKS_TTL: + return _jwks_cache["keys"] + try: + import urllib.request + url = f"https://{td}/cdn-cgi/access/certs" + with urllib.request.urlopen(url, timeout=5) as resp: + data = json.loads(resp.read()) + keys = data.get("keys") or [] + _jwks_cache["keys"] = keys + _jwks_cache["fetched_at"] = now + return keys + except Exception as e: + print(f"[auth] failed to fetch JWKS: {e}") + return _jwks_cache["keys"] # fall back to stale cache if any + + +_last_error: str = "" + + +def last_error() -> str: + """Most recent JWT validation failure (for /auth/debug endpoint).""" + return _last_error + + +def _fail(msg: str) -> None: + """Record an auth failure loudly — to stderr and to last_error().""" + global _last_error + _last_error = msg + print(f"[auth] {msg}", flush=True) + + +def extract_email(request: Request) -> str | None: + """Validate the CF Access JWT on the request and return the email claim. + + Returns None if no JWT, invalid JWT, or auth disabled. + """ + if not is_enabled(): + return None + token = request.headers.get(AUTH_HEADER) or request.headers.get(AUTH_HEADER.lower()) + if not token: + return None + try: + import jwt as pyjwt # PyJWT[crypto] + from jwt.algorithms import RSAAlgorithm + except ImportError as e: + _fail(f"PyJWT not installed — cannot validate JWT ({e}). Run 'uv sync --project server' or install PyJWT[crypto].") + return None + + keys = _fetch_jwks() + if not keys: + _fail(f"JWKS empty — team_domain={_team_domain() or ''}") + return None + + aud = _aud_tag() + + try: + unverified = pyjwt.get_unverified_header(token) + except Exception as e: + _fail(f"JWT header parse failed: {e}") + return None + kid = unverified.get("kid") + key_data = next((k for k in keys if k.get("kid") == kid), None) + if not key_data: + _fail(f"JWT kid={kid!r} not in JWKS (have {[k.get('kid') for k in keys]})") + return None + + try: + public_key = RSAAlgorithm.from_jwk(json.dumps(key_data)) + claims = pyjwt.decode( + token, + public_key, + algorithms=["RS256"], + audience=aud if aud else None, + options={"verify_aud": bool(aud)}, + ) + except Exception as e: + _fail(f"JWT decode failed: {e!r} aud_set={bool(aud)} aud_tail={aud[-8:] if aud else ''}") + return None + + email = (claims.get("email") or "").strip().lower() + return email or None + + +def _trust_local_net() -> bool: + """WOLTSPACE_AUTH_TRUST_LOCAL — opt-in to trust private-network callers. + + OFF by default. See is_loopback for the security tradeoff.""" + return _env("WOLTSPACE_AUTH_TRUST_LOCAL").strip().lower() in ("1", "true", "yes", "on") + + +def is_loopback(request: Request) -> bool: + """True if the request should be trusted as a local/in-container caller. + + In auth=cloudflare mode, callers that didn't traverse the Cloudflare edge + have no JWT. We trust certain local sources so the operator (and + in-container tools like the notify/access CLIs) aren't locked out. + + Two tiers: + + - **Loopback (always trusted):** 127.0.0.1 / ::1. These are genuine + in-container calls (notify, push-view, access CLI). Safe unconditionally. + + - **Private network (opt-in via WOLTSPACE_AUTH_TRUST_LOCAL):** When the + container publishes its port with `-p 7777:7777` (binds 0.0.0.0), the + host browser's traffic arrives as the Docker bridge gateway + (e.g. 172.17.0.1), NOT 127.0.0.1 — so plain localhost access would + otherwise show nothing. Enabling the flag also trusts RFC1918 private + addresses, restoring localhost convenience. + + TRADEOFF: with 0.0.0.0 binding, ANY device on the same LAN that can + reach the published port also appears as a private address and is + indistinguishable from the operator's own browser. So enabling this + flag grants unauthenticated full access to anyone on your local + network. Only enable it on a trusted network. See HUMANS.md. + """ + client = request.client + if not client: + return False + host = client.host + if host in ("127.0.0.1", "::1", "localhost"): + return True + if _trust_local_net(): + try: + import ipaddress + ip = ipaddress.ip_address(host) + return ip.is_private or ip.is_loopback + except ValueError: + return False + return False + + +# --- HTTP helpers --- + +def forbid(detail: str = "forbidden") -> JSONResponse: + return JSONResponse({"error": detail}, status_code=403) + + +def pending_approval(email: str) -> JSONResponse: + return JSONResponse( + { + "error": "pending approval", + "email": email, + "detail": ( + "You're authenticated via Cloudflare Access but haven't been " + "granted access to any wolts yet. Ask the admin to add you to " + "users.json." + ), + }, + status_code=403, + ) + + +def require_wolt(request: Request, wolt_name: str) -> JSONResponse | None: + """Return a 403 response if request user can't access wolt_name. None if OK.""" + if not is_enabled(): + return None + email = getattr(request.state, "user_email", None) + if not email: + return forbid("not authenticated") + if not can_access_wolt(email, wolt_name): + return forbid(f"access to wolt '{wolt_name}' denied") + return None + + +def require_app(request: Request, app_name: str) -> JSONResponse | None: + if not is_enabled(): + return None + email = getattr(request.state, "user_email", None) + if not email: + return forbid("not authenticated") + if not can_access_app(email, app_name): + return forbid(f"access to app '{app_name}' denied") + return None + + +def user_email(request: Request) -> str | None: + return getattr(request.state, "user_email", None) + + +# --- WebSocket auth --- +# The @app.middleware("http") hook does NOT run for websocket connections, so +# request.state.user_email is never populated for them. These helpers re-derive +# the caller's identity directly from the upgrade request's headers/client +# (extract_email + is_loopback only touch .headers / .client, which WebSocket +# objects also expose). + +def ws_email(ws) -> str | None: + """Resolve the caller email for a websocket upgrade, applying the same + loopback safety net as the http middleware.""" + email = extract_email(ws) + if not email and is_enabled() and is_loopback(ws): + return "__local__" + return email + + +def ws_can_access_wolt(ws, wolt_name: str) -> bool: + """True if the websocket caller may access wolt_name (or auth disabled).""" + if not is_enabled(): + return True + return can_access_wolt(ws_email(ws), wolt_name) + + +def wolt_from_session(session_name: str) -> str: + """Extract the wolt name from a session slug ({wolt}-{adj}-{noun}-{hex}).""" + if session_name.count("-") >= 3: + return session_name.rsplit("-", 3)[0] + return session_name diff --git a/server/pyproject.toml b/server/pyproject.toml index d62ecac..5a96442 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -11,4 +11,5 @@ dependencies = [ "python-dotenv", "livereload", "jinja2", + "PyJWT[crypto]", ] diff --git a/test/test_auth.py b/test/test_auth.py new file mode 100644 index 0000000..f55cf6a --- /dev/null +++ b/test/test_auth.py @@ -0,0 +1,391 @@ +"""Web-layer auth (issue #353) — both modes. + +Pure-Python unit tests. No live server needed. Patches WOLTSPACE_AUTH + +WOLTS_DIR env vars so users.json is read from tmp_path. + +Usage: uv run --extra test pytest test/test_auth.py -v +""" + +import json +import os +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + + +@pytest.fixture +def auth_env(tmp_path, monkeypatch): + """Point WOLTS_DIR at a tmp dir, enable cloudflare auth.""" + monkeypatch.setenv("WOLTSPACE_AUTH", "cloudflare") + monkeypatch.setenv("WOLTS_DIR", str(tmp_path)) + # Reload module so module-level state honors new env + import server.auth as auth + import importlib + importlib.reload(auth) + return auth + + +@pytest.fixture +def auth_off(tmp_path, monkeypatch): + """Default mode — auth disabled.""" + monkeypatch.setenv("WOLTSPACE_AUTH", "none") + monkeypatch.setenv("WOLTS_DIR", str(tmp_path)) + import server.auth as auth + import importlib + importlib.reload(auth) + return auth + + +class TestAuthMode: + def test_default_is_none(self, tmp_path, monkeypatch): + monkeypatch.delenv("WOLTSPACE_AUTH", raising=False) + # Point WOLTS_DIR at an empty tmp so the .env fallback can't see + # the real /workspace/wolts/.env where WOLTSPACE_AUTH may be set. + monkeypatch.setenv("WOLTS_DIR", str(tmp_path)) + import server.auth as auth + import importlib + importlib.reload(auth) + assert auth.auth_mode() == "none" + assert auth.is_enabled() is False + + def test_cloudflare_mode(self, auth_env): + assert auth_env.auth_mode() == "cloudflare" + assert auth_env.is_enabled() is True + + +class TestUsersFile: + def test_empty_when_missing(self, auth_env): + assert auth_env.load_users() == [] + + def test_roundtrip(self, auth_env): + users = [{"email": "a@x.com", "wolts": ["foo"]}] + auth_env.save_users(users) + assert auth_env.load_users() == users + + def test_find_user_case_insensitive(self, auth_env): + auth_env.save_users([{"email": "Mixed@Case.com", "wolts": ["foo"]}]) + assert auth_env.find_user("mixed@case.com") is not None + assert auth_env.find_user("MIXED@CASE.COM") is not None + assert auth_env.find_user("other@x.com") is None + + def test_corrupt_users_json_returns_empty(self, auth_env, tmp_path): + (tmp_path / ".space" / "auth").mkdir(parents=True) + (tmp_path / ".space" / "auth" / "users.json").write_text("not json {{{") + assert auth_env.load_users() == [] + + +class TestAddUser: + def test_add_new(self, auth_env): + auth_env.add_user("alice@x.com", ["foo"]) + u = auth_env.find_user("alice@x.com") + assert u is not None + assert u["wolts"] == ["foo"] + + def test_add_existing_returns_same(self, auth_env): + auth_env.add_user("alice@x.com", ["foo"]) + auth_env.add_user("alice@x.com", ["bar"]) + # Existing entry preserved, not overwritten + users = auth_env.load_users() + assert len(users) == 1 + assert users[0]["wolts"] == ["foo"] + + +class TestGrantWolt: + def test_grant_creates_entry_if_missing(self, auth_env): + auth_env.grant_wolt("alice@x.com", "foo") + u = auth_env.find_user("alice@x.com") + assert u is not None + assert u["wolts"] == ["foo"] + + def test_grant_appends_to_existing(self, auth_env): + auth_env.save_users([{"email": "alice@x.com", "wolts": ["foo"]}]) + auth_env.grant_wolt("alice@x.com", "bar") + u = auth_env.find_user("alice@x.com") + assert u["wolts"] == ["foo", "bar"] + + def test_grant_is_idempotent(self, auth_env): + auth_env.grant_wolt("alice@x.com", "foo") + auth_env.grant_wolt("alice@x.com", "foo") + assert auth_env.find_user("alice@x.com")["wolts"] == ["foo"] + + def test_grant_noop_on_wildcard(self, auth_env): + auth_env.save_users([{"email": "alice@x.com", "wolts": ["*"]}]) + auth_env.grant_wolt("alice@x.com", "foo") + assert auth_env.find_user("alice@x.com")["wolts"] == ["*"] + + +class TestPermissions: + def test_auth_off_allows_everything(self, auth_off): + assert auth_off.can_access_wolt(None, "anything") is True + assert auth_off.can_access_wolt("any@x.com", "anything") is True + + def test_auth_on_unknown_user_denied(self, auth_env): + assert auth_env.can_access_wolt("unknown@x.com", "foo") is False + assert auth_env.can_access_wolt(None, "foo") is False + + def test_auth_on_wildcard_sees_all(self, auth_env): + auth_env.save_users([{"email": "alice@x.com", "wolts": ["*"]}]) + assert auth_env.can_access_wolt("alice@x.com", "anything") is True + + def test_auth_on_user_scoped_to_allow_list(self, auth_env): + auth_env.save_users([{"email": "u@x.com", "wolts": ["foo", "bar"]}]) + assert auth_env.can_access_wolt("u@x.com", "foo") is True + assert auth_env.can_access_wolt("u@x.com", "bar") is True + assert auth_env.can_access_wolt("u@x.com", "baz") is False + + +class TestVisibleWolts: + def test_wildcard_sees_all(self, auth_env): + auth_env.save_users([{"email": "alice@x.com", "wolts": ["*"]}]) + wolts = [{"dir": "a"}, {"dir": "b"}, {"dir": "c"}] + assert auth_env.visible_wolts("alice@x.com", wolts) == wolts + + def test_user_filtered_to_allow_list(self, auth_env): + auth_env.save_users([{"email": "u@x.com", "wolts": ["a", "c"]}]) + wolts = [{"dir": "a"}, {"dir": "b"}, {"dir": "c"}] + result = auth_env.visible_wolts("u@x.com", wolts) + assert [w["dir"] for w in result] == ["a", "c"] + + def test_unknown_sees_nothing(self, auth_env): + wolts = [{"dir": "a"}] + assert auth_env.visible_wolts("ghost@x.com", wolts) == [] + assert auth_env.visible_wolts(None, wolts) == [] + + def test_auth_off_passes_through(self, auth_off): + wolts = [{"dir": "a"}, {"dir": "b"}] + assert auth_off.visible_wolts(None, wolts) == wolts + + +class TestJWTRoundtrip: + """End-to-end: self-sign a JWT with a known key, serve it as the + 'JWKS' response, and verify the middleware decodes the email correctly. + + This catches bugs in the actual decode path that 'garbage in, None out' + tests miss — e.g. PyJWT API drift, audience claim shape, kid lookup. + """ + + def test_valid_jwt_extracts_email(self, auth_env, monkeypatch): + try: + import jwt as pyjwt + from jwt.algorithms import RSAAlgorithm + except ImportError: + import pytest + pytest.skip("PyJWT[crypto] not installed") + + from cryptography.hazmat.primitives.asymmetric import rsa + from cryptography.hazmat.primitives import serialization + import json as _json + + # Generate an ephemeral RSA keypair for this test + privkey = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pubkey = privkey.public_key() + priv_pem = privkey.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + jwk = _json.loads(RSAAlgorithm.to_jwk(pubkey)) + jwk["kid"] = "test-kid-1" + + # Pre-seed the JWKS cache so the middleware doesn't hit the network + auth_env._jwks_cache["keys"] = [jwk] + auth_env._jwks_cache["fetched_at"] = 9_999_999_999 + + monkeypatch.setenv("WOLTSPACE_CF_TEAM_DOMAIN", "test.cloudflareaccess.com") + monkeypatch.setenv("WOLTSPACE_CF_AUD", "test-aud") + + token = pyjwt.encode( + {"email": "alice@x.com", "aud": "test-aud", "iss": "test"}, + priv_pem, + algorithm="RS256", + headers={"kid": "test-kid-1"}, + ) + + class FakeReq: + headers = {"Cf-Access-Jwt-Assertion": token} + + result = auth_env.extract_email(FakeReq()) + assert result == "alice@x.com" + + def test_wrong_audience_rejected(self, auth_env, monkeypatch): + try: + import jwt as pyjwt + from jwt.algorithms import RSAAlgorithm + except ImportError: + import pytest + pytest.skip("PyJWT[crypto] not installed") + + from cryptography.hazmat.primitives.asymmetric import rsa + from cryptography.hazmat.primitives import serialization + import json as _json + + privkey = rsa.generate_private_key(public_exponent=65537, key_size=2048) + priv_pem = privkey.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + jwk = _json.loads(RSAAlgorithm.to_jwk(privkey.public_key())) + jwk["kid"] = "test-kid-1" + + auth_env._jwks_cache["keys"] = [jwk] + auth_env._jwks_cache["fetched_at"] = 9_999_999_999 + monkeypatch.setenv("WOLTSPACE_CF_TEAM_DOMAIN", "test.cloudflareaccess.com") + monkeypatch.setenv("WOLTSPACE_CF_AUD", "expected-aud") + + token = pyjwt.encode( + {"email": "alice@x.com", "aud": "WRONG-aud"}, + priv_pem, + algorithm="RS256", + headers={"kid": "test-kid-1"}, + ) + + class FakeReq: + headers = {"Cf-Access-Jwt-Assertion": token} + + assert auth_env.extract_email(FakeReq()) is None + + +class TestJWTExtraction: + def test_extract_none_when_auth_disabled(self, auth_off): + from fastapi import Request + # auth disabled → always None regardless of header + class FakeReq: + headers = {"Cf-Access-Jwt-Assertion": "anything"} + assert auth_off.extract_email(FakeReq()) is None + + def test_extract_none_when_no_header(self, auth_env): + class FakeReq: + headers = {} + assert auth_env.extract_email(FakeReq()) is None + + def test_extract_none_when_no_team_domain(self, auth_env, monkeypatch): + monkeypatch.delenv("WOLTSPACE_CF_TEAM_DOMAIN", raising=False) + class FakeReq: + headers = {"Cf-Access-Jwt-Assertion": "garbage.token.here"} + assert auth_env.extract_email(FakeReq()) is None + + def test_extract_rejects_invalid_token(self, auth_env, monkeypatch): + # Even with team domain set, garbage token returns None (doesn't raise) + monkeypatch.setenv("WOLTSPACE_CF_TEAM_DOMAIN", "example.cloudflareaccess.com") + # Avoid network — pre-seed the cache + auth_env._jwks_cache["keys"] = [{"kid": "fake", "kty": "RSA", "n": "x", "e": "AQAB"}] + auth_env._jwks_cache["fetched_at"] = 9_999_999_999 + class FakeReq: + headers = {"Cf-Access-Jwt-Assertion": "not.a.real.jwt"} + assert auth_env.extract_email(FakeReq()) is None + + +class TestLocalLoopback: + def test_local_pseudo_user_can_access_anything(self, auth_env): + assert auth_env.can_access_wolt("__local__", "any-wolt") is True + + def test_local_pseudo_user_sees_all_wolts(self, auth_env): + wolts = [{"dir": "a"}, {"dir": "b"}] + assert auth_env.visible_wolts("__local__", wolts) == wolts + + def test_is_loopback_127(self, auth_env): + from types import SimpleNamespace + req = SimpleNamespace(client=SimpleNamespace(host="127.0.0.1")) + assert auth_env.is_loopback(req) is True + + def test_is_loopback_external(self, auth_env): + from types import SimpleNamespace + req = SimpleNamespace(client=SimpleNamespace(host="8.8.8.8")) + assert auth_env.is_loopback(req) is False + + def test_is_loopback_no_client(self, auth_env): + from types import SimpleNamespace + req = SimpleNamespace(client=None) + assert auth_env.is_loopback(req) is False + + def test_private_net_denied_by_default(self, auth_env, monkeypatch): + # Docker bridge gateway — host browser traffic. Denied unless flag set. + monkeypatch.delenv("WOLTSPACE_AUTH_TRUST_LOCAL", raising=False) + from types import SimpleNamespace + req = SimpleNamespace(client=SimpleNamespace(host="172.17.0.1")) + assert auth_env.is_loopback(req) is False + + def test_private_net_trusted_when_flag_on(self, auth_env, monkeypatch): + monkeypatch.setenv("WOLTSPACE_AUTH_TRUST_LOCAL", "true") + from types import SimpleNamespace + # Docker gateway + other RFC1918 ranges all trusted + for host in ("172.17.0.1", "192.168.1.50", "10.0.0.3"): + req = SimpleNamespace(client=SimpleNamespace(host=host)) + assert auth_env.is_loopback(req) is True, host + + def test_public_ip_never_trusted_even_with_flag(self, auth_env, monkeypatch): + monkeypatch.setenv("WOLTSPACE_AUTH_TRUST_LOCAL", "true") + from types import SimpleNamespace + req = SimpleNamespace(client=SimpleNamespace(host="8.8.8.8")) + assert auth_env.is_loopback(req) is False + + def test_loopback_always_trusted_regardless_of_flag(self, auth_env, monkeypatch): + monkeypatch.delenv("WOLTSPACE_AUTH_TRUST_LOCAL", raising=False) + from types import SimpleNamespace + req = SimpleNamespace(client=SimpleNamespace(host="127.0.0.1")) + assert auth_env.is_loopback(req) is True + + +class TestWebSocketAuth: + def test_wolt_from_session(self, auth_env): + assert auth_env.wolt_from_session("uxwolt-wild-thicket-14076f") == "uxwolt" + assert auth_env.wolt_from_session("neowolt-keen-willow-ea4fff") == "neowolt" + assert auth_env.wolt_from_session("main") == "main" + + def test_ws_email_loopback(self, auth_env): + from types import SimpleNamespace + ws = SimpleNamespace(headers={}, client=SimpleNamespace(host="127.0.0.1")) + assert auth_env.ws_email(ws) == "__local__" + + def test_ws_email_external_no_jwt(self, auth_env): + from types import SimpleNamespace + ws = SimpleNamespace(headers={}, client=SimpleNamespace(host="203.0.113.5")) + assert auth_env.ws_email(ws) is None + + def test_ws_can_access_wolt_loopback(self, auth_env): + from types import SimpleNamespace + ws = SimpleNamespace(headers={}, client=SimpleNamespace(host="127.0.0.1")) + assert auth_env.ws_can_access_wolt(ws, "any-wolt") is True + + def test_ws_can_access_wolt_external_denied(self, auth_env): + from types import SimpleNamespace + ws = SimpleNamespace(headers={}, client=SimpleNamespace(host="203.0.113.5")) + assert auth_env.ws_can_access_wolt(ws, "any-wolt") is False + + def test_ws_can_access_wolt_disabled(self, auth_off): + from types import SimpleNamespace + ws = SimpleNamespace(headers={}, client=SimpleNamespace(host="203.0.113.5")) + assert auth_off.ws_can_access_wolt(ws, "any-wolt") is True + + +class TestRequireHelpers: + def test_require_wolt_passes_when_disabled(self, auth_off): + from types import SimpleNamespace + req = SimpleNamespace(state=SimpleNamespace(user_email=None)) + assert auth_off.require_wolt(req, "anything") is None + + def test_require_wolt_blocks_unknown(self, auth_env): + from types import SimpleNamespace + req = SimpleNamespace(state=SimpleNamespace(user_email=None)) + resp = auth_env.require_wolt(req, "foo") + assert resp is not None + assert resp.status_code == 403 + + def test_require_wolt_allows_wildcard(self, auth_env): + from types import SimpleNamespace + auth_env.save_users([{"email": "a@x.com", "wolts": ["*"]}]) + req = SimpleNamespace(state=SimpleNamespace(user_email="a@x.com")) + assert auth_env.require_wolt(req, "any") is None + + def test_require_wolt_blocks_wrong_wolt(self, auth_env): + from types import SimpleNamespace + auth_env.save_users([{"email": "u@x.com", "wolts": ["foo"]}]) + req = SimpleNamespace(state=SimpleNamespace(user_email="u@x.com")) + resp = auth_env.require_wolt(req, "bar") + assert resp is not None + assert resp.status_code == 403