diff --git a/.github/workflows/test-cicd.yml b/.github/workflows/test-cicd.yml index 419c3e549..94d9c93af 100644 --- a/.github/workflows/test-cicd.yml +++ b/.github/workflows/test-cicd.yml @@ -102,6 +102,12 @@ jobs: restore-keys: | ${{ runner.os }}-pre-commit- + - name: Install aac CLI (Agent Access Protocol) + run: | + curl -sL https://github.com/bitwarden/agent-access/releases/latest/download/aac-linux-x86_64.tar.gz | tar xz + sudo mv aac /usr/local/bin/ + aac --version + - name: Install dependencies run: uv sync --dev --all-extras diff --git a/examples/auth-aac-agent/agent.py b/examples/auth-aac-agent/agent.py new file mode 100644 index 000000000..03b2ed144 --- /dev/null +++ b/examples/auth-aac-agent/agent.py @@ -0,0 +1,50 @@ +"""Example: Using AacVault with a local notte agent. + +Prerequisites: +- `aac` CLI installed (https://github.com/bitwarden/agent-access/releases) +- User running `aac listen` on their machine (connected to their Bitwarden vault) +- Pairing token from `aac listen` output + +Usage: + # Terminal 1: User starts aac listen + aac listen + + # Terminal 2: Run this agent with the pairing token + export NOTTE_API_KEY="your-notte-api-key" # pragma: allowlist secret + python agent.py --token ABC-DEF-GHI +""" + +import argparse +import os + +from dotenv import load_dotenv +from notte_core.credentials.aac import AacVault +from notte_sdk import NotteClient + +_ = load_dotenv() + + +def main(): + parser = argparse.ArgumentParser(description="Run notte agent with aac vault") + parser.add_argument("--token", help="aac pairing token (or set AAC_TOKEN env var)") + args = parser.parse_args() + + token = args.token or os.environ.get("AAC_TOKEN") + if not token: + print("Error: provide --token or set AAC_TOKEN env var") + print("Run 'aac listen' in another terminal to get a pairing token") + exit(1) + + notte = NotteClient() + + with AacVault(token=token) as vault, notte.Session(open_viewer=True) as session: + agent = notte.Agent(vault=vault, session=session) + output = agent.run(task="Go to github.com and login with your provided credentials") + + print(output) + if not output.success: + exit(-1) + + +if __name__ == "__main__": + main() diff --git a/examples/auth-bitwarden-agent/agent.py b/examples/auth-bitwarden-agent/agent.py new file mode 100644 index 000000000..759e16904 --- /dev/null +++ b/examples/auth-bitwarden-agent/agent.py @@ -0,0 +1,37 @@ +"""Example: Using BitwardenVault with a local notte agent. + +Prerequisites: +- pip install bitwarden-sdk +- BWS_ACCESS_TOKEN environment variable set +- BWS_ORGANIZATION_ID environment variable set +- Secrets stored in Bitwarden Secrets Manager with JSON values: + {"url": "https://github.com/login", "password": "...", "username": "...", "email": "..."} + +Usage: + export BWS_ACCESS_TOKEN="0.your-token-here..." # pragma: allowlist secret + export BWS_ORGANIZATION_ID="your-org-id" + export NOTTE_API_KEY="your-notte-api-key" # pragma: allowlist secret + python agent.py +""" + +from dotenv import load_dotenv +from notte_core.credentials.bitwarden import BitwardenVault +from notte_sdk import NotteClient + +_ = load_dotenv() + + +def main(): + notte = NotteClient() + + with BitwardenVault() as vault, notte.Session(open_viewer=True) as session: + agent = notte.Agent(vault=vault, session=session) + output = agent.run(task="Go to github.com and login with your provided credentials") + + print(output) + if not output.success: + exit(-1) + + +if __name__ == "__main__": + main() diff --git a/packages/notte-core/pyproject.toml b/packages/notte-core/pyproject.toml index 234d6de38..c668993cc 100644 --- a/packages/notte-core/pyproject.toml +++ b/packages/notte-core/pyproject.toml @@ -32,6 +32,9 @@ dependencies = [ ] [project.optional-dependencies] +bitwarden = [ + "bitwarden-sdk>=2.0.0", +] tempo = [ "opentelemetry-exporter-otlp>=1.27.0", ] diff --git a/packages/notte-core/src/notte_core/credentials/aac.py b/packages/notte-core/src/notte_core/credentials/aac.py new file mode 100644 index 000000000..acb3e928d --- /dev/null +++ b/packages/notte-core/src/notte_core/credentials/aac.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +import json +import os +import shutil +import subprocess +from typing import Unpack + +from typing_extensions import override + +from notte_core.common.logging import logger +from notte_core.common.resource import SyncResource +from notte_core.credentials.base import BaseVault, Credential, CredentialsDict, CreditCardDict +from notte_core.utils.url import get_root_domain + + +class AacVault(BaseVault, SyncResource): + """Vault backed by the Agent Access Protocol (aac). + + Fetches credentials on-demand through an E2E encrypted Noise tunnel + via Bitwarden's proxy. Works with any aac-compatible credential provider + (Bitwarden Password Manager, 1Password, etc.). + + Credentials are requested per-domain when the agent encounters a login + form — never bulk-loaded, never persisted. + + The user must be running `aac listen` on their machine for this vault + to function. + """ + + def __init__( + self, + token: str | None = None, + proxy_url: str = "wss://ap.lesspassword.dev", + session: str | None = None, + timeout: int = 120, + aac_path: str = "aac", + ): + """ + Args: + token: Rendezvous code (ABC-DEF-GHI) or PSK token for pairing. + If None, uses a cached session. + proxy_url: WebSocket URL of the aac proxy server. + session: Hex fingerprint (or prefix) of a cached session to use. + If None and no token, auto-selects the single cached session. + timeout: Timeout in seconds for credential responses. + aac_path: Path to the aac CLI binary. + """ + super().__init__() + self.token: str | None = token or os.environ.get("AAC_TOKEN") + self.proxy_url: str = proxy_url + self._session: str | None = session + self._timeout: int = timeout + self._aac_path: str = aac_path + self._paired: bool = False + + @override + def start(self) -> None: + if shutil.which(self._aac_path) is None: + raise RuntimeError( + f"'{self._aac_path}' CLI not found. Install from: https://github.com/bitwarden/agent-access/releases" + ) + if self.token: + self._pair() + + @override + def stop(self) -> None: + self._paired = False + + def _pair(self) -> None: + """Establish the E2E tunnel by pairing with the user's aac listen.""" + assert self.token is not None + cmd: list[str] = [ + self._aac_path, + "connect", + "--token", + self.token, + "--proxy-url", + self.proxy_url, + "--output", + "json", + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) # noqa: S603 + if result.returncode != 0: + try: + data = json.loads(result.stdout) + msg = data.get("error", {}).get("message", result.stderr) + except (json.JSONDecodeError, TypeError): + msg = result.stderr.strip() or result.stdout.strip() + raise RuntimeError(f"aac pairing failed: {msg}") + + try: + data = json.loads(result.stdout) + if not data.get("success"): + msg = data.get("error", {}).get("message", result.stdout.strip()) + raise RuntimeError(f"aac pairing failed: {msg}") + except (json.JSONDecodeError, TypeError): + pass + + self._paired = True + logger.info("[AacVault] Paired successfully via aac tunnel") + + def _request_credential(self, domain: str) -> dict[str, str | None] | None: + """Request a single credential through the E2E tunnel.""" + cmd: list[str] = [ + self._aac_path, + "connect", + "--domain", + domain, + "--proxy-url", + self.proxy_url, + "--output", + "json", + "--timeout", + str(self._timeout), + ] + if self._session: + cmd.extend(["--session", self._session]) + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=self._timeout + 10) # noqa: S603 + if result.returncode != 0: + try: + data = json.loads(result.stdout) + msg = data.get("error", {}).get("message", "unknown error") + except (json.JSONDecodeError, TypeError): + msg = result.stderr.strip() or f"exit code {result.returncode}" + logger.warning(f"[AacVault] Credential request failed for {domain}: {msg}") + return None + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + logger.warning(f"[AacVault] Invalid JSON response for {domain}") + return None + + if not data.get("success"): + return None + + return data.get("credential") + + @override + async def _get_credentials_impl(self, url: str) -> CredentialsDict | None: + domain = get_root_domain(url) + if not domain: + return None + + cred = self._request_credential(domain) + if cred is None: + return None + + password = cred.get("password") + if not password: + return None + + result: CredentialsDict = {"password": password} + username = cred.get("username") + if username: + result["username"] = username + totp = cred.get("totp") + if totp: + result["mfa_secret"] = totp + return result + + @override + async def get_credentials_async(self, url: str) -> CredentialsDict | None: + """Override to bypass TOTP generation — aac returns live codes, not secrets.""" + credentials = await self._get_credentials_impl(url) + if credentials is None: + return None + # Track retrieved credentials for screenshot masking + self._retrieved_credentials[url] = credentials + return credentials + + @override + async def _add_credentials(self, url: str, creds: CredentialsDict) -> None: + raise NotImplementedError("aac is read-only — manage credentials in your vault app") + + @override + async def delete_credentials_async(self, url: str) -> None: + raise NotImplementedError("aac is read-only — manage credentials in your vault app") + + @override + async def list_credentials_async(self) -> list[Credential]: + return [] + + @override + async def set_credit_card_async(self, **kwargs: Unpack[CreditCardDict]) -> None: + raise NotImplementedError("Credit card not supported via aac") + + @override + async def get_credit_card_async(self) -> CreditCardDict: + raise NotImplementedError("Credit card not supported via aac") + + @override + async def delete_credit_card_async(self) -> None: + raise NotImplementedError("Credit card not supported via aac") diff --git a/packages/notte-core/src/notte_core/credentials/bitwarden.py b/packages/notte-core/src/notte_core/credentials/bitwarden.py new file mode 100644 index 000000000..23f5a61dc --- /dev/null +++ b/packages/notte-core/src/notte_core/credentials/bitwarden.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +import json +import os +import uuid +from typing import TYPE_CHECKING, Any, Unpack + +from typing_extensions import override + +from notte_core.common.logging import logger +from notte_core.common.resource import SyncResource +from notte_core.credentials.base import BaseVault, Credential, CredentialsDict, CreditCardDict +from notte_core.utils.url import get_root_domain + +if TYPE_CHECKING: + from bitwarden_sdk import BitwardenClient # pyright: ignore[reportMissingTypeStubs] + + +def _get_sdk() -> tuple[Any, Any, Any]: + """Import and return (BitwardenClient, DeviceType, client_settings_from_dict).""" + try: + from bitwarden_sdk import BitwardenClient as Cls # pyright: ignore[reportMissingTypeStubs] + from bitwarden_sdk import DeviceType as DT # pyright: ignore[reportMissingTypeStubs] + from bitwarden_sdk import client_settings_from_dict as csfd # pyright: ignore[reportMissingTypeStubs] + + return Cls, DT, csfd + except ImportError: + raise ImportError( + "bitwarden-sdk is required for BitwardenVault. Install it with: pip install bitwarden-sdk" + ) from None + + +class BitwardenVault(BaseVault, SyncResource): + """Vault backed by Bitwarden Secrets Manager via the official Python SDK. + + Requires: pip install bitwarden-sdk + + Credentials are fetched from a Bitwarden Secrets Manager project. + Each secret's value must be a JSON object with the notte credential format: + {"url": "https://...", "password": "...", "username": "...", "email": "...", "mfa_secret": "..."} + + Only `url` and `password` are required. + """ + + def __init__( + self, + access_token: str | None = None, + organization_id: str | None = None, + project_id: str | None = None, + api_url: str = "https://api.bitwarden.com", + identity_url: str = "https://identity.bitwarden.com", + ): + super().__init__() + self._access_token: str = access_token or os.environ.get("BWS_ACCESS_TOKEN", "") + self._organization_id: str | None = organization_id or os.environ.get("BWS_ORGANIZATION_ID") + self._project_id: str | None = project_id + self._api_url: str = api_url + self._identity_url: str = identity_url + self._client: BitwardenClient | None = None + self._secrets_cache: list[Any] | None = None + + @override + def start(self) -> None: + if not self._access_token: + raise ValueError("Bitwarden access token required. Set BWS_ACCESS_TOKEN env var or pass access_token=") + + bw_client_cls, device_type, settings_fn = _get_sdk() + client: BitwardenClient = bw_client_cls( + settings_fn( + { + "apiUrl": self._api_url, + "deviceType": device_type.SDK, + "identityUrl": self._identity_url, + "userAgent": "notte", + } + ) + ) + _ = client.auth().login_access_token(self._access_token) + self._client = client + self._secrets_cache = self._fetch_secrets() + logger.info(f"[BitwardenVault] Loaded {len(self._secrets_cache)} secrets") + + @override + def stop(self) -> None: + self._client = None + self._secrets_cache = None + + def _fetch_secrets(self) -> list[Any]: + assert self._client is not None + if not self._organization_id: + raise ValueError( + "organization_id required to list secrets. Set BWS_ORGANIZATION_ID or pass organization_id=" + ) + + response = self._client.secrets().list(self._organization_id) + if response.data is None: + return [] + + secret_ids = [s.id for s in response.data.data] + if not secret_ids: + return [] + + full_secrets = self._client.secrets().get_by_ids(secret_ids) + if full_secrets.data is None: + return [] + + return list(full_secrets.data.data) + + def _parse_secret(self, secret: Any) -> tuple[str, CredentialsDict] | None: + """Parse a BWS secret into (url, CredentialsDict). Returns None if invalid.""" + try: + data = json.loads(secret.value) + except (json.JSONDecodeError, TypeError): + logger.warning(f"[BitwardenVault] Skipping secret '{secret.key}': invalid JSON value") + return None + + url = data.get("url") + password = data.get("password") + if not url or not password: + logger.warning(f"[BitwardenVault] Skipping secret '{secret.key}': missing url or password") + return None + + creds: CredentialsDict = {"password": password} + if data.get("username"): + creds["username"] = data["username"] + if data.get("email"): + creds["email"] = data["email"] + if data.get("mfa_secret"): + creds["mfa_secret"] = data["mfa_secret"] + return url, creds + + @override + async def _get_credentials_impl(self, url: str) -> CredentialsDict | None: + secrets = self._secrets_cache if self._secrets_cache is not None else self._fetch_secrets() + target_domain = get_root_domain(url) + for secret in secrets: + parsed = self._parse_secret(secret) + if parsed is None: + continue + secret_url, creds = parsed + if get_root_domain(secret_url) == target_domain: + return creds + return None + + @override + async def _add_credentials(self, url: str, creds: CredentialsDict) -> None: + assert self._client is not None + if not self._organization_id: + raise ValueError("organization_id required to add credentials") + if not self._project_id: + raise ValueError("project_id required to add credentials") + value = json.dumps({"url": url, **creds}) + _ = self._client.secrets().create( + uuid.UUID(self._organization_id), + url, + value, + None, + [uuid.UUID(self._project_id)], + ) + self._secrets_cache = None + + @override + async def delete_credentials_async(self, url: str) -> None: + assert self._client is not None + secrets = self._secrets_cache if self._secrets_cache is not None else self._fetch_secrets() + target_domain = get_root_domain(url) + for secret in secrets: + parsed = self._parse_secret(secret) + if parsed is None: + continue + secret_url, _ = parsed + if get_root_domain(secret_url) == target_domain: + _ = self._client.secrets().delete([secret.id]) + self._secrets_cache = None + return + raise ValueError(f"No credentials found for {url}") + + @override + async def list_credentials_async(self) -> list[Credential]: + secrets = self._secrets_cache if self._secrets_cache is not None else self._fetch_secrets() + credentials: list[Credential] = [] + for secret in secrets: + parsed = self._parse_secret(secret) + if parsed is None: + continue + secret_url, creds = parsed + credentials.append( + Credential( + url=secret_url, + username=creds.get("username"), + email=creds.get("email"), + ) + ) + return credentials + + @override + async def set_credit_card_async(self, **kwargs: Unpack[CreditCardDict]) -> None: + raise NotImplementedError("Credit card storage not supported in BitwardenVault") + + @override + async def get_credit_card_async(self) -> CreditCardDict: + raise NotImplementedError("Credit card storage not supported in BitwardenVault") + + @override + async def delete_credit_card_async(self) -> None: + raise NotImplementedError("Credit card storage not supported in BitwardenVault") diff --git a/pyproject.toml b/pyproject.toml index c5791f3f2..89be08bc8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ integrations = ["notte-integrations==1.4.4.dev"] eval = ["notte-eval==1.4.4.dev"] mcp = ["notte-mcp==1.4.4.dev"] +bitwarden = ["bitwarden-sdk>=2.0.0"] [tool.uv.sources] notte-core = { workspace = true } diff --git a/tests/integration/test_aac_vault_integration.py b/tests/integration/test_aac_vault_integration.py new file mode 100644 index 000000000..b792ca59c --- /dev/null +++ b/tests/integration/test_aac_vault_integration.py @@ -0,0 +1,47 @@ +"""Integration tests for AacVault — validates the aac CLI binary is installed and responds correctly. + +These tests run by default when the `aac` CLI is available. They do NOT require +a running `aac listen` session — they verify CLI availability, error handling, +and response parsing against the real binary. +""" + +import asyncio +import shutil + +import pytest +from notte_core.credentials.aac import AacVault + +AAC_AVAILABLE = shutil.which("aac") is not None + +pytestmark = pytest.mark.skipif(not AAC_AVAILABLE, reason="aac CLI not installed") + + +def test_aac_vault_starts_without_token() -> None: + vault = AacVault(token=None) + vault.start() + assert vault._paired is False + vault.stop() + + +def test_aac_vault_list_credentials_returns_empty() -> None: + vault = AacVault(token=None) + vault.start() + creds = asyncio.run(vault.list_credentials_async()) + assert creds == [] + vault.stop() + + +def test_aac_vault_get_credentials_returns_none_without_session() -> None: + """Without a paired session, credential requests should return None gracefully.""" + vault = AacVault(token=None, timeout=5) + vault.start() + creds = asyncio.run(vault.get_credentials_async("https://example.com")) + assert creds is None + vault.stop() + + +def test_aac_vault_pairing_fails_with_invalid_token() -> None: + """An invalid token should raise RuntimeError, not hang.""" + vault = AacVault(token="INVALID99", timeout=10) + with pytest.raises(RuntimeError, match="aac pairing failed"): + vault.start() diff --git a/tests/integration/test_bitwarden_vault_integration.py b/tests/integration/test_bitwarden_vault_integration.py new file mode 100644 index 000000000..e84e9a8fe --- /dev/null +++ b/tests/integration/test_bitwarden_vault_integration.py @@ -0,0 +1,59 @@ +"""Integration tests for BitwardenVault — validates the bitwarden-sdk works correctly. + +Tests that require a real BWS_ACCESS_TOKEN + BWS_ORGANIZATION_ID are skipped unless +both env vars are set. SDK availability is checked at import time. +""" + +import asyncio +import os + +import pytest + +try: + from bitwarden_sdk import BitwardenClient # noqa: F401 + + _HAS_SDK = True +except ImportError: + _HAS_SDK = False + +from notte_core.credentials.bitwarden import BitwardenVault + +BWS_TOKEN = os.environ.get("BWS_ACCESS_TOKEN", "") +BWS_ORG_ID = os.environ.get("BWS_ORGANIZATION_ID", "") +BWS_AUTHENTICATED = _HAS_SDK and bool(BWS_TOKEN) and bool(BWS_ORG_ID) + +pytestmark = pytest.mark.skipif(not _HAS_SDK, reason="bitwarden-sdk not installed") + + +def test_bws_vault_validates_missing_token() -> None: + vault = BitwardenVault(access_token="") + with pytest.raises(ValueError, match="access token required"): + vault.start() + + +def test_bws_vault_start_fails_with_invalid_token() -> None: + vault = BitwardenVault( + access_token="invalid-token", # pragma: allowlist secret + organization_id="invalid-org", + ) + with pytest.raises(Exception): + vault.start() + + +@pytest.mark.skipif(not BWS_AUTHENTICATED, reason="BWS_ACCESS_TOKEN and BWS_ORGANIZATION_ID not set") +def test_list_secrets() -> None: + with BitwardenVault() as vault: + creds = asyncio.run(vault.list_credentials_async()) + assert isinstance(creds, list) + + +@pytest.mark.skipif(not BWS_AUTHENTICATED, reason="BWS_ACCESS_TOKEN and BWS_ORGANIZATION_ID not set") +def test_get_credentials_for_known_domain() -> None: + with BitwardenVault() as vault: + creds = asyncio.run(vault.list_credentials_async()) + if len(creds) == 0: + pytest.skip("No secrets found in BWS project") + first_url = creds[0].url + result = asyncio.run(vault.get_credentials_async(first_url)) + assert result is not None + assert "password" in result # pragma: allowlist secret diff --git a/tests/test_aac_vault.py b/tests/test_aac_vault.py new file mode 100644 index 000000000..ae96baa54 --- /dev/null +++ b/tests/test_aac_vault.py @@ -0,0 +1,212 @@ +import asyncio +import json +import os +import stat +import tempfile + +import pytest +from notte_core.credentials.aac import AacVault + + +def _make_fake_aac(credential: dict | None = None, fail: bool = False) -> str: + """Create a fake aac script that simulates the aac CLI.""" + f = tempfile.NamedTemporaryFile(mode="w", suffix=".sh", delete=False) + + if fail: + f.write( + "#!/bin/bash\n" + 'echo \'{"error":{"code":"general_error","message":"connection failed"},"success":false}\'\n' + "exit 2\n" + ) + else: + cred = credential or { + "username": "octocat", + "password": "gh-secret-123", # pragma: allowlist secret + "totp": "654321", + "uri": "https://github.com", + "notes": None, + } + success_response = json.dumps( + { + "credential": cred, + "domain": "github.com", + "success": True, + } + ) + pairing_response = json.dumps({"success": True}) + # Return pairing response if --token is present, credential otherwise + f.write( + "#!/bin/bash\n" + f'if echo "$@" | grep -q "\\-\\-domain"; then\n' + f" echo '{success_response}'\n" + f" exit 0\n" + f"fi\n" + f"echo '{pairing_response}'\n" + f"exit 0\n" + ) + + f.close() + os.chmod(f.name, stat.S_IRWXU) + return f.name + + +@pytest.fixture() +def fake_aac(): + path = _make_fake_aac() + yield path + os.unlink(path) + + +@pytest.fixture() +def fake_aac_fail(): + path = _make_fake_aac(fail=True) + yield path + os.unlink(path) + + +@pytest.fixture() +def vault(fake_aac: str): + v = AacVault(token="ABC-DEF-123", aac_path=fake_aac) + v.start() + yield v + v.stop() + + +class TestAacVaultGetCredentials: + def test_fetches_credential_by_domain(self, vault: AacVault) -> None: + creds = asyncio.run(vault.get_credentials_async("https://github.com/login")) + assert creds is not None + assert creds["password"] == "gh-secret-123" # pragma: allowlist secret + assert creds["username"] == "octocat" + + def test_returns_totp_as_mfa_secret(self, vault: AacVault) -> None: + creds = asyncio.run(vault.get_credentials_async("https://github.com")) + assert creds is not None + assert creds["mfa_secret"] == "654321" + + def test_bypasses_totp_generation(self, vault: AacVault) -> None: + """AacVault overrides get_credentials_async to skip TOTP().now() — + aac returns live codes, not base32 secrets.""" + creds = asyncio.run(vault.get_credentials_async("https://github.com")) + assert creds is not None + # The raw TOTP code should be returned as-is, not processed + assert creds["mfa_secret"] == "654321" + + def test_tracks_credentials_for_screenshot_masking(self, vault: AacVault) -> None: + asyncio.run(vault.get_credentials_async("https://github.com")) + past = vault.past_credentials() + assert len(past) == 1 + assert "https://github.com" in past + + def test_returns_none_on_failure(self, fake_aac_fail: str) -> None: + vault = AacVault(token=None, aac_path=fake_aac_fail) + vault.start() # no pairing since token=None + creds = asyncio.run(vault.get_credentials_async("https://github.com")) + assert creds is None + vault.stop() + + def test_handles_credential_without_totp(self) -> None: + path = _make_fake_aac( + credential={ + "username": "user", + "password": "pass", # pragma: allowlist secret + "totp": None, + "uri": "https://example.com", + "notes": None, + } + ) + try: + vault = AacVault(token="ABC-DEF-123", aac_path=path) + vault.start() + creds = asyncio.run(vault.get_credentials_async("https://example.com")) + assert creds is not None + assert creds["password"] == "pass" # pragma: allowlist secret + assert "mfa_secret" not in creds + vault.stop() + finally: + os.unlink(path) + + def test_handles_credential_without_username(self) -> None: + path = _make_fake_aac( + credential={ + "username": None, + "password": "pass", # pragma: allowlist secret + "totp": None, + "uri": None, + "notes": None, + } + ) + try: + vault = AacVault(token="ABC-DEF-123", aac_path=path) + vault.start() + creds = asyncio.run(vault.get_credentials_async("https://example.com")) + assert creds is not None + assert creds["password"] == "pass" # pragma: allowlist secret + assert "username" not in creds + vault.stop() + finally: + os.unlink(path) + + +class TestAacVaultPairing: + def test_pairs_on_start_when_token_provided(self, fake_aac: str) -> None: + vault = AacVault(token="ABC-DEF-123", aac_path=fake_aac) + vault.start() + assert vault._paired is True + vault.stop() + + def test_skips_pairing_when_no_token(self, fake_aac: str) -> None: + vault = AacVault(token=None, aac_path=fake_aac) + vault.start() + assert vault._paired is False + vault.stop() + + def test_pairing_failure_raises(self, fake_aac_fail: str) -> None: + vault = AacVault(token="ABC-DEF-123", aac_path=fake_aac_fail) + with pytest.raises(RuntimeError, match="aac pairing failed"): + vault.start() + + def test_reads_token_from_env(self, fake_aac: str) -> None: + os.environ["AAC_TOKEN"] = "ENV-TOK-123" + try: + vault = AacVault(aac_path=fake_aac) + assert vault.token == "ENV-TOK-123" + finally: + del os.environ["AAC_TOKEN"] + + +class TestAacVaultLifecycle: + def test_start_validates_aac_exists(self) -> None: + vault = AacVault(token="test", aac_path="/nonexistent/aac") + with pytest.raises(RuntimeError, match="CLI not found"): + vault.start() + + def test_context_manager(self, fake_aac: str) -> None: + with AacVault(token="ABC-DEF-123", aac_path=fake_aac) as vault: + assert vault._paired is True + assert vault._paired is False + + +class TestAacVaultReadOnly: + def test_add_credentials_raises(self, vault: AacVault) -> None: + with pytest.raises(NotImplementedError, match="read-only"): + asyncio.run(vault._add_credentials("https://example.com", {"password": "test"})) # pragma: allowlist secret + + def test_delete_credentials_raises(self, vault: AacVault) -> None: + with pytest.raises(NotImplementedError, match="read-only"): + asyncio.run(vault.delete_credentials_async("https://example.com")) + + def test_list_credentials_returns_empty(self, vault: AacVault) -> None: + creds = asyncio.run(vault.list_credentials_async()) + assert creds == [] + + def test_credit_card_not_supported(self, vault: AacVault) -> None: + with pytest.raises(NotImplementedError): + asyncio.run( + vault.set_credit_card_async( + card_holder_name="Test", + card_number="4242", + card_cvv="123", + card_full_expiration="12/30", + ) + ) diff --git a/tests/test_bitwarden_vault.py b/tests/test_bitwarden_vault.py new file mode 100644 index 000000000..5398bf8ab --- /dev/null +++ b/tests/test_bitwarden_vault.py @@ -0,0 +1,178 @@ +import asyncio +import json +from unittest.mock import MagicMock, patch + +import pytest +from notte_core.credentials.base import Credential +from notte_core.credentials.bitwarden import BitwardenVault + + +def _make_secret_response(secret_id: str, key: str, value: dict, project_id: str = "proj-1") -> MagicMock: + s = MagicMock() + s.id = secret_id + s.key = key + s.value = json.dumps(value) + s.project_id = project_id + s.note = "" + return s + + +GITHUB_SECRET = _make_secret_response( + "00000000-0000-0000-0000-000000000001", + "GitHub Login", + { + "url": "https://github.com/login", + "password": "gh-pass-123", # pragma: allowlist secret + "username": "octocat", + "email": "octocat@github.com", + }, +) + +NOTTE_SECRET = _make_secret_response( + "00000000-0000-0000-0000-000000000002", + "Notte", + { + "url": "https://app.notte.cc", + "password": "notte-pass", # pragma: allowlist secret + "email": "user@notte.cc", + "mfa_secret": "JBSWY3DPEHPK3PXP", # pragma: allowlist secret + }, +) + +INVALID_SECRET = _make_secret_response("00000000-0000-0000-0000-000000000003", "Bad Secret", {}) +# Override value to be invalid JSON +INVALID_SECRET.value = "not-valid-json" + +MISSING_PASSWORD_SECRET = _make_secret_response( + "00000000-0000-0000-0000-000000000004", + "No Password", # pragma: allowlist secret + {"url": "https://example.com", "username": "test"}, +) + +ALL_SECRETS = [GITHUB_SECRET, NOTTE_SECRET, INVALID_SECRET, MISSING_PASSWORD_SECRET] + + +def _mock_client(secrets: list[MagicMock] | None = None) -> MagicMock: + """Create a mock BitwardenClient with preset secrets.""" + if secrets is None: + secrets = ALL_SECRETS + + client = MagicMock() + + # Mock auth + client.auth.return_value.login_access_token.return_value = MagicMock() + + # Mock secrets().list() -> returns identifiers + list_response = MagicMock() + list_data = MagicMock() + list_data.data = [MagicMock(id=s.id) for s in secrets] + list_response.data = list_data + client.secrets.return_value.list.return_value = list_response + + # Mock secrets().get_by_ids() -> returns full secrets + full_response = MagicMock() + full_data = MagicMock() + full_data.data = secrets + full_response.data = full_data + client.secrets.return_value.get_by_ids.return_value = full_response + + return client + + +def _mock_get_sdk(secrets: list[MagicMock] | None = None): + """Return a mock _get_sdk that returns (ClientClass, DeviceType, settings_fn).""" + client = _mock_client(secrets) + client_cls = MagicMock(return_value=client) + device_type = MagicMock() + settings_fn = MagicMock() + return client_cls, device_type, settings_fn + + +@pytest.fixture() +def vault(): + with patch("notte_core.credentials.bitwarden._get_sdk", return_value=_mock_get_sdk()): + v = BitwardenVault( + access_token="fake-token", # pragma: allowlist secret + organization_id="org-1", + ) + v.start() + yield v + v.stop() + + +class TestBitwardenVaultGetCredentials: + def test_finds_credentials_by_domain(self, vault: BitwardenVault) -> None: + creds = asyncio.run(vault.get_credentials_async("https://github.com/whatever")) + assert creds is not None + assert creds["password"] == "gh-pass-123" # pragma: allowlist secret + assert creds["username"] == "octocat" + + def test_finds_credentials_with_subdomain(self, vault: BitwardenVault) -> None: + creds = asyncio.run(vault.get_credentials_async("https://app.notte.cc/dashboard")) + assert creds is not None + assert creds["password"] == "notte-pass" # pragma: allowlist secret + + def test_returns_none_for_unknown_domain(self, vault: BitwardenVault) -> None: + creds = asyncio.run(vault.get_credentials_async("https://unknown-site.com")) + assert creds is None + + def test_includes_mfa_secret(self, vault: BitwardenVault) -> None: + creds = asyncio.run(vault.get_credentials_async("https://notte.cc")) + assert creds is not None + assert "mfa_secret" in creds + + def test_tracks_retrieved_credentials(self, vault: BitwardenVault) -> None: + asyncio.run(vault.get_credentials_async("https://github.com")) + past = vault.past_credentials() + assert len(past) == 1 + + +class TestBitwardenVaultListCredentials: + def test_lists_valid_credentials_only(self, vault: BitwardenVault) -> None: + cred_list = asyncio.run(vault.list_credentials_async()) + assert len(cred_list) == 2 + urls = {c.url for c in cred_list} + assert "https://github.com/login" in urls + assert "https://app.notte.cc" in urls + + def test_returns_credential_objects(self, vault: BitwardenVault) -> None: + cred_list = asyncio.run(vault.list_credentials_async()) + assert all(isinstance(c, Credential) for c in cred_list) + + +class TestBitwardenVaultLifecycle: + def test_start_validates_sdk_installed(self) -> None: + with patch("notte_core.credentials.bitwarden._get_sdk", side_effect=ImportError("bitwarden-sdk is required")): + vault = BitwardenVault(access_token="test") # pragma: allowlist secret + with pytest.raises(ImportError, match="bitwarden-sdk is required"): + vault.start() + + def test_start_validates_token(self) -> None: + vault = BitwardenVault(access_token="") + with pytest.raises(ValueError, match="access token required"): + vault.start() + + def test_context_manager(self) -> None: + with patch("notte_core.credentials.bitwarden._get_sdk", return_value=_mock_get_sdk()): + with BitwardenVault(access_token="test", organization_id="org-1") as vault: # pragma: allowlist secret + creds = asyncio.run(vault.list_credentials_async()) + assert len(creds) == 2 + + +class TestBitwardenVaultReadOnly: + def test_credit_card_not_supported(self, vault: BitwardenVault) -> None: + with pytest.raises(NotImplementedError): + asyncio.run( + vault.set_credit_card_async( + card_holder_name="Test", + card_number="4242", + card_cvv="123", + card_full_expiration="12/30", + ) + ) + + with pytest.raises(NotImplementedError): + asyncio.run(vault.get_credit_card_async()) + + with pytest.raises(NotImplementedError): + asyncio.run(vault.delete_credit_card_async()) diff --git a/uv.lock b/uv.lock index dd2b1a47b..9e2744fc9 100644 --- a/uv.lock +++ b/uv.lock @@ -342,6 +342,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/94/fe/3aed5d0be4d404d12d36ab97e2f1791424d9ca39c2f754a6285d59a3b01d/beautifulsoup4-4.14.2-py3-none-any.whl", hash = "sha256:5ef6fa3a8cbece8488d66985560f97ed091e22bbc4e9c2338508a9d5de6d4515", size = 106392, upload-time = "2025-09-29T10:05:43.771Z" }, ] +[[package]] +name = "bitwarden-sdk" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/c8/66/177264df153e6714fa3d18fcac45374cc7ff481dd347acb5b02592f9d45a/bitwarden_sdk-2.0.0-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:054ccdda8fbf615d0e582e1e75c7c316bf291ed03b27efd774325ecc3b0fb0b8", size = 3593062, upload-time = "2026-02-05T23:00:22.595Z" }, + { url = "https://files.pythonhosted.org/packages/42/b1/88c0d624b16d974bbcc3636d68cee15d9908980dc05c881b1ff46a19509d/bitwarden_sdk-2.0.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:3cc41cae078d086b1eaeb9e4f781454cfba7b77119f70908d074bed259dce8a6", size = 3397507, upload-time = "2026-02-05T23:00:24.683Z" }, + { url = "https://files.pythonhosted.org/packages/77/93/8905bd312f40a0c1c7fabd939a3271380ccad959b9f99eda452f254fabfa/bitwarden_sdk-2.0.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a49739ebe37103df677e263637fdabc61682f10a5f4518fb22b130d530fffbcd", size = 3192519, upload-time = "2026-02-05T23:00:26.342Z" }, + { url = "https://files.pythonhosted.org/packages/55/03/55ea33d637e74bf7e50f39c361b82c8f03630d6f35ae8944d3a4a8399585/bitwarden_sdk-2.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1c0e2dd8d8a3db2f0cb29d6dcd55fd578bd66839f666b8e1712712b6e9791021", size = 3397007, upload-time = "2026-02-05T23:00:28.572Z" }, + { url = "https://files.pythonhosted.org/packages/bf/17/a96033bc9a55521a9d2a4022d69f72da302df4ac83eb182379fec9df102c/bitwarden_sdk-2.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:9b9a523dab81cca56151481bd0dce07bd58695fd14b4c5bcab520c7a24fc16c3", size = 3347145, upload-time = "2026-02-05T23:00:30.569Z" }, +] + [[package]] name = "black" version = "25.9.0" @@ -2828,6 +2843,9 @@ dependencies = [ ] [package.optional-dependencies] +bitwarden = [ + { name = "bitwarden-sdk" }, +] eval = [ { name = "notte-eval" }, ] @@ -2870,6 +2888,7 @@ lint = [ [package.metadata] requires-dist = [ + { name = "bitwarden-sdk", marker = "extra == 'bitwarden'", specifier = ">=2.0.0" }, { name = "google-auth", specifier = ">=2.39.0" }, { name = "notte-agent", editable = "packages/notte-agent" }, { name = "notte-browser", editable = "packages/notte-browser" }, @@ -2880,7 +2899,7 @@ requires-dist = [ { name = "notte-sdk", editable = "packages/notte-sdk" }, { name = "toml", specifier = ">=0.10.2" }, ] -provides-extras = ["integrations", "eval", "mcp"] +provides-extras = ["integrations", "eval", "mcp", "bitwarden"] [package.metadata.requires-dev] dev = [ @@ -2977,6 +2996,9 @@ dependencies = [ ] [package.optional-dependencies] +bitwarden = [ + { name = "bitwarden-sdk" }, +] observability = [ { name = "opentelemetry-exporter-otlp" }, { name = "pyroscope-io" }, @@ -2993,6 +3015,7 @@ tempo = [ [package.metadata] requires-dist = [ { name = "aiohttp", specifier = "~=3.11.13" }, + { name = "bitwarden-sdk", marker = "extra == 'bitwarden'", specifier = ">=2.0.0" }, { name = "cryptography", specifier = ">=43.0.3" }, { name = "loguru", specifier = ">=0.7.3" }, { name = "nest-asyncio", specifier = ">=1.6.0" }, @@ -3016,7 +3039,7 @@ requires-dist = [ { name = "tldextract", specifier = ">=5.3.0" }, { name = "toml", specifier = ">=0.10.2" }, ] -provides-extras = ["tempo", "pyroscope", "observability"] +provides-extras = ["bitwarden", "tempo", "pyroscope", "observability"] [[package]] name = "notte-eval"