Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions ingestify/application/secrets_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,34 @@
class SecretsManager:
def __init__(self):
self._aws_client = None
self._gcp_client = None

@property
def aws_client(self):
if not self._aws_client:
self._aws_client = boto3.client("secretsmanager")
return self._aws_client

@property
def gcp_client(self):
if not self._gcp_client:
try:
from google.cloud import secretmanager
except ImportError as e:
raise ConfigurationError(
"google-cloud-secret-manager is required for vault+gcp:// "
"secrets. Install with: pip install google-cloud-secret-manager"
) from e
self._gcp_client = secretmanager.SecretManagerServiceClient()
return self._gcp_client

def load_as_dict(self, url: str) -> dict:
"""Load a secret from the supported vault. In this case only AWS Secrets Manager"""
"""Load a JSON secret from a supported vault.

Supported schemes:
vault+aws://<secret_id> (AWS Secrets Manager)
vault+gcp://<project>/<secret> (GCP Secret Manager)
"""
parts = urlparse(url)
if parts.scheme == "vault+aws":
secret_id = parts.netloc + parts.path
Expand All @@ -35,12 +54,32 @@ def load_as_dict(self, url: str) -> dict:
except JSONDecodeError:
raise Exception(f"Secret url '{url}' could not be parsed.")

elif parts.scheme == "vault+gcp":
project = parts.netloc
secret_name = parts.path.lstrip("/")
if not project or not secret_name:
raise ConfigurationError(
f"Invalid GCP secret URL '{url}'. "
f"Expected format: vault+gcp://<project>/<secret>"
)
name = f"projects/{project}/secrets/{secret_name}/versions/latest"
try:
response = self.gcp_client.access_secret_version(request={"name": name})
except Exception as err:
raise ConfigurationError(
f"Couldn't load GCP secret '{url}': {err}"
) from err
try:
secrets = json.loads(response.payload.data.decode("utf-8"))
except JSONDecodeError:
raise Exception(f"Secret url '{url}' could not be parsed.")

else:
raise Exception(f"Secret url '{url}' is not supported.")
return secrets

def supports(self, url: str):
return url.startswith("vault+aws://")
return url.startswith("vault+aws://") or url.startswith("vault+gcp://")

def load_as_db_url(self, secret_uri: str):
"""Load the secret and return it as a database url."""
Expand Down
Loading