diff --git a/ingestify/application/secrets_manager.py b/ingestify/application/secrets_manager.py index f5dbabe..2f0bf4a 100644 --- a/ingestify/application/secrets_manager.py +++ b/ingestify/application/secrets_manager.py @@ -11,6 +11,7 @@ class SecretsManager: def __init__(self): self._aws_client = None + self._gcp_client = None @property def aws_client(self): @@ -18,8 +19,26 @@ def aws_client(self): self._aws_client = boto3.client("secretsmanager") return self._aws_client + @property + def gcp_client(self): + if not self._gcp_client: + try: + from google.cloud import secretmanager + except ImportError as e: + raise ConfigurationError( + "google-cloud-secret-manager is required for vault+gcp:// " + "secrets. Install with: pip install google-cloud-secret-manager" + ) from e + self._gcp_client = secretmanager.SecretManagerServiceClient() + return self._gcp_client + def load_as_dict(self, url: str) -> dict: - """Load a secret from the supported vault. In this case only AWS Secrets Manager""" + """Load a JSON secret from a supported vault. + + Supported schemes: + vault+aws:// (AWS Secrets Manager) + vault+gcp:/// (GCP Secret Manager) + """ parts = urlparse(url) if parts.scheme == "vault+aws": secret_id = parts.netloc + parts.path @@ -35,12 +54,32 @@ def load_as_dict(self, url: str) -> dict: except JSONDecodeError: raise Exception(f"Secret url '{url}' could not be parsed.") + elif parts.scheme == "vault+gcp": + project = parts.netloc + secret_name = parts.path.lstrip("/") + if not project or not secret_name: + raise ConfigurationError( + f"Invalid GCP secret URL '{url}'. " + f"Expected format: vault+gcp:///" + ) + name = f"projects/{project}/secrets/{secret_name}/versions/latest" + try: + response = self.gcp_client.access_secret_version(request={"name": name}) + except Exception as err: + raise ConfigurationError( + f"Couldn't load GCP secret '{url}': {err}" + ) from err + try: + secrets = json.loads(response.payload.data.decode("utf-8")) + except JSONDecodeError: + raise Exception(f"Secret url '{url}' could not be parsed.") + else: raise Exception(f"Secret url '{url}' is not supported.") return secrets def supports(self, url: str): - return url.startswith("vault+aws://") + return url.startswith("vault+aws://") or url.startswith("vault+gcp://") def load_as_db_url(self, secret_uri: str): """Load the secret and return it as a database url."""