From 94b0f385036213f30bc9bd0c1bad926d6f98863a Mon Sep 17 00:00:00 2001 From: Saadiq Mohiuddin Date: Thu, 19 Feb 2026 13:46:14 -0700 Subject: [PATCH 1/2] use keycloak service account for DAG run --- scripts/promote_collection.py | 31 ++++++++++++++------------ scripts/promote_dataset.py | 41 +++++++++++++++++++++++------------ scripts/requirements.txt | 3 ++- 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/scripts/promote_collection.py b/scripts/promote_collection.py index abc26e69..1b78c4b6 100644 --- a/scripts/promote_collection.py +++ b/scripts/promote_collection.py @@ -5,8 +5,7 @@ import sys import os import uuid -from base64 import b64encode - +import requests def trigger_collection_dag(payload: Dict[str, Any], stage: str): """ @@ -16,34 +15,38 @@ def trigger_collection_dag(payload: Dict[str, Any], stage: str): if stage == "staging": api_url_env = "STAGING_SM2A_API_URL" - username_env = "STAGING_SM2A_ADMIN_USERNAME" - password_env = "STAGING_SM2A_ADMIN_PASSWORD" + token_url = f"https://{os.getenv('KEYCLOAK_STAGING_URL')}/realms/veda/protocol/openid-connect/token" + client_id = "airflow-webserver-fab" + client_secret = os.getenv("KEYCLOAK_STAGING_SM2A_FAB_CLIENT_SECRET") elif stage == "production": api_url_env = "SM2A_API_URL" - username_env = "SM2A_ADMIN_USERNAME" - password_env = "SM2A_ADMIN_PASSWORD" + token_url = f"https://{os.getenv('KEYCLOAK_PROD_URL')}/realms/veda/protocol/openid-connect/token" + client_id = "airflow-webserver-fab" + client_secret = os.getenv("KEYCLOAK_PROD_SM2A_FAB_CLIENT_SECRET") else: raise ValueError( f"Invalid stage provided: {stage}. Must be 'staging' or 'production'." ) base_api_url = os.getenv(api_url_env) - username = os.getenv(username_env) - password = os.getenv(password_env) - if not all([base_api_url, username, password]): + response = requests.post(token_url, data={ + "client_id": client_id, + "client_secret": client_secret, + "grant_type": "client_credentials", + }) + access_token = response.json()["access_token"] + + if not all([base_api_url, access_token]): raise ValueError( f"Missing one or more environment variables: " f"stage is None={stage is None}, " - f"username is None={username_env is None}, " - f"password is None={password_env is None}" + f"access_token is None={access_token is None}" ) - api_token = b64encode(f"{username}:{password}".encode()).decode() - headers = { "Content-Type": "application/json", - "Authorization": "Basic " + api_token, + "Authorization": "Bearer " + access_token, } body = { diff --git a/scripts/promote_dataset.py b/scripts/promote_dataset.py index e2ea38fd..dd45c413 100644 --- a/scripts/promote_dataset.py +++ b/scripts/promote_dataset.py @@ -5,7 +5,7 @@ import sys import os import uuid -from base64 import b64encode +import requests class MissingFieldError(Exception): @@ -35,21 +35,27 @@ def validate_discovery_item_config(item: Dict[str, Any]) -> Dict[str, Any]: def publish_to_staging(payload): base_api_url = os.getenv("STAGING_SM2A_API_URL") dataset_pipeline_dag = os.getenv("DATASET_DAG_NAME", "veda_dataset_pipeline") - username = os.getenv("STAGING_SM2A_ADMIN_USERNAME") - password = os.getenv("STAGING_SM2A_ADMIN_PASSWORD") - api_token = b64encode(f"{username}:{password}".encode()).decode() + token_url = f"https://{os.getenv('KEYCLOAK_STAGING_URL')}/realms/veda/protocol/openid-connect/token" + client_id = "airflow-webserver-fab" + client_secret = os.getenv("KEYCLOAK_STAGING_SM2A_FAB_CLIENT_SECRET") - if not base_api_url or not api_token: + response = requests.post(token_url, data={ + "client_id": client_id, + "client_secret": client_secret, + "grant_type": "client_credentials", + }) + access_token = response.json()["access_token"] + + if not base_api_url or not access_token: raise ValueError( - "STAGING_SM2A_API_URL or STAGING_SM2A_ADMIN_USERNAME" - + " or STAGING_SM2A_ADMIN_PASSWORD is not" + "STAGING_SM2A_API_URL or KEYCLOAK_STAGING_SM2A_FAB_CLIENT_SECRET is not" + " set in the environment variables." ) headers = { "Content-Type": "application/json", - "Authorization": "Basic " + api_token, + "Authorization": "Bearer " + access_token, } body = { @@ -76,20 +82,27 @@ def publish_to_staging(payload): def promote_to_production(payload): base_api_url = os.getenv("SM2A_API_URL") promotion_dag = os.getenv("PROMOTION_DAG_NAME", "veda_promotion_pipeline") - username = os.getenv("SM2A_ADMIN_USERNAME") - password = os.getenv("SM2A_ADMIN_PASSWORD") - api_token = b64encode(f"{username}:{password}".encode()).decode() + token_url = f"https://{os.getenv('KEYCLOAK_PROD_URL')}/realms/veda/protocol/openid-connect/token" + client_id = "airflow-webserver-fab" + client_secret = os.getenv("KEYCLOAK_PROD_SM2A_FAB_CLIENT_SECRET") + + response = requests.post(token_url, data={ + "client_id": client_id, + "client_secret": client_secret, + "grant_type": "client_credentials", + }) + access_token = response.json()["access_token"] - if not base_api_url or not api_token: + if not base_api_url or not access_token: raise ValueError( - "SM2A_API_URL or SM2A_ADMIN_USERNAME or SM2A_ADMIN_PASSWORD is not" + "SM2A_API_URL or KEYCLOAK_PRODUCTION_SM2A_FAB_CLIENT_SECRET is not" + " set in the environment variables." ) headers = { "Content-Type": "application/json", - "Authorization": "Basic " + api_token, + "Authorization": "Bearer " + access_token, } payload["conf"]["transfer"] = True diff --git a/scripts/requirements.txt b/scripts/requirements.txt index 4818cc54..1c6d8b40 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -1 +1,2 @@ -pyyaml \ No newline at end of file +pyyaml +requests From a6819d794c96ce035813aa3b4e7a115f3d38cdc1 Mon Sep 17 00:00:00 2001 From: Saadiq Mohiuddin Date: Thu, 19 Feb 2026 13:50:04 -0700 Subject: [PATCH 2/2] format --- scripts/promote_collection.py | 14 +++++++++----- scripts/promote_dataset.py | 26 ++++++++++++++++---------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/scripts/promote_collection.py b/scripts/promote_collection.py index 1b78c4b6..cb780c11 100644 --- a/scripts/promote_collection.py +++ b/scripts/promote_collection.py @@ -7,6 +7,7 @@ import uuid import requests + def trigger_collection_dag(payload: Dict[str, Any], stage: str): """ Triggers the veda_collection_pipeline DAG in either staging or production SM2A. @@ -30,11 +31,14 @@ def trigger_collection_dag(payload: Dict[str, Any], stage: str): base_api_url = os.getenv(api_url_env) - response = requests.post(token_url, data={ - "client_id": client_id, - "client_secret": client_secret, - "grant_type": "client_credentials", - }) + response = requests.post( + token_url, + data={ + "client_id": client_id, + "client_secret": client_secret, + "grant_type": "client_credentials", + }, + ) access_token = response.json()["access_token"] if not all([base_api_url, access_token]): diff --git a/scripts/promote_dataset.py b/scripts/promote_dataset.py index dd45c413..6ee4b633 100644 --- a/scripts/promote_dataset.py +++ b/scripts/promote_dataset.py @@ -40,11 +40,14 @@ def publish_to_staging(payload): client_id = "airflow-webserver-fab" client_secret = os.getenv("KEYCLOAK_STAGING_SM2A_FAB_CLIENT_SECRET") - response = requests.post(token_url, data={ - "client_id": client_id, - "client_secret": client_secret, - "grant_type": "client_credentials", - }) + response = requests.post( + token_url, + data={ + "client_id": client_id, + "client_secret": client_secret, + "grant_type": "client_credentials", + }, + ) access_token = response.json()["access_token"] if not base_api_url or not access_token: @@ -87,11 +90,14 @@ def promote_to_production(payload): client_id = "airflow-webserver-fab" client_secret = os.getenv("KEYCLOAK_PROD_SM2A_FAB_CLIENT_SECRET") - response = requests.post(token_url, data={ - "client_id": client_id, - "client_secret": client_secret, - "grant_type": "client_credentials", - }) + response = requests.post( + token_url, + data={ + "client_id": client_id, + "client_secret": client_secret, + "grant_type": "client_credentials", + }, + ) access_token = response.json()["access_token"] if not base_api_url or not access_token: