diff --git a/components/dags/src/pinta_dags/config.py b/components/dags/src/pinta_dags/config.py index 6676818..b2653e1 100644 --- a/components/dags/src/pinta_dags/config.py +++ b/components/dags/src/pinta_dags/config.py @@ -11,6 +11,15 @@ from airflow.sdk import Variable from docker.types import Mount + +def connection_uri_template(conn_id: str) -> str: + """Jinja template for a connection's SQLAlchemy URI with the psycopg3 driver.""" + return ( + f"{{{{ conn.{conn_id}.get_hook().get_uri() " + f"| replace('postgresql://', 'postgresql+psycopg://') }}}}" + ) + + PINTA_COMMON_TASK_ARGS: dict[str, Any] = { "retries": 0, # TODO: Set to something larger in non-local environments "retry_delay": datetime.timedelta(seconds=10), @@ -22,6 +31,7 @@ "docker_url": Variable.get( "pinta_docker_socket_url", "unix:///var/run/docker.sock" ), + "network_mode": Variable.get("pinta_container_network_mode", None), "environment": { "TASK_LOG_LEVEL": "{{ var.value.pinta_processing_task_log_level }}", "DB_SRID": "{{ var.value.pinta_db_srid }}", diff --git a/components/dags/src/pinta_dags/dags/calculate_reference_dem.py b/components/dags/src/pinta_dags/dags/calculate_reference_dem.py index 287a288..adfdd00 100644 --- a/components/dags/src/pinta_dags/dags/calculate_reference_dem.py +++ b/components/dags/src/pinta_dags/dags/calculate_reference_dem.py @@ -275,9 +275,9 @@ def merge_dem_staging_tables( session=session, ) - primary_connection_uri = "{{ conn.pinta_processing_db.get_hook().get_uri() }}" - job_admin_connection_uri = "{{ conn.pinta_job_db_admin.get_hook().get_uri() }}" - job_connection_uri = "{{ conn.pinta_job_db.get_hook().get_uri() }}" + primary_connection_uri = config.connection_uri_template("pinta_processing_db") + job_admin_connection_uri = config.connection_uri_template("pinta_job_db_admin") + job_connection_uri = config.connection_uri_template("pinta_job_db") prod_area_id = "{{ params.id }}" job_database_name = f"job_{prod_area_id}" diff --git a/components/dags/src/pinta_dags/dags/load_dem.py b/components/dags/src/pinta_dags/dags/load_dem.py index 41f05d5..cd7cf70 100644 --- a/components/dags/src/pinta_dags/dags/load_dem.py +++ b/components/dags/src/pinta_dags/dags/load_dem.py @@ -148,7 +148,7 @@ def merge_dem_staging_tables(connection_uri: str, staging_tables: int) -> None: session=session, ) - connection_uri = "{{ conn.pinta_processing_db.get_hook().get_uri() }}" + connection_uri = config.connection_uri_template("pinta_processing_db") staging_tables = _get_staging_tables() files = list_dem_files("{{ params.folder }}") files_to_process = require_dem_files(files) diff --git a/components/dags/src/pinta_dags/dags/print_hello_world.py b/components/dags/src/pinta_dags/dags/print_hello_world.py index dcb08e9..20b44b4 100644 --- a/components/dags/src/pinta_dags/dags/print_hello_world.py +++ b/components/dags/src/pinta_dags/dags/print_hello_world.py @@ -43,11 +43,11 @@ def hello_world_task_docker(connection_uri: str, name: str) -> None: chain( hello_world_task( - "{{ conn.pinta_processing_db.get_hook().sqlalchemy_url }}", + config.connection_uri_template("pinta_processing_db"), "{{ params.name }}", ), hello_world_task_docker( - "{{ conn.pinta_processing_db.get_hook().sqlalchemy_url }}", + config.connection_uri_template("pinta_processing_db"), "{{ params.name }}", ), ) diff --git a/components/dags/src/pinta_dags/dags/process_production_areas.py b/components/dags/src/pinta_dags/dags/process_production_areas.py index ca2833f..c132f58 100644 --- a/components/dags/src/pinta_dags/dags/process_production_areas.py +++ b/components/dags/src/pinta_dags/dags/process_production_areas.py @@ -60,7 +60,7 @@ def store_checksums(changed_folders: list[dict[str, str]]) -> None: changed = check_for_changes.output areas_result = process_areas( - "{{ conn.pinta_processing_db.get_hook().get_uri() }}", + config.connection_uri_template("pinta_processing_db"), "{{ var.value.pinta_container_target_base_path }}", changed, )