diff --git a/components/backend/src/pinta_backend_test_utils/stub_airflow.py b/components/backend/src/pinta_backend_test_utils/stub_airflow.py index b00d3a72..53f2ac5f 100644 --- a/components/backend/src/pinta_backend_test_utils/stub_airflow.py +++ b/components/backend/src/pinta_backend_test_utils/stub_airflow.py @@ -9,13 +9,14 @@ from typing import Annotated, Any, TypedDict import fastapi +from pinta_common import constants STUB_BEARER_TOKEN = "stub-token" # noqa: S105 STUB_AIRFLOW_USERNAME = "pinta-backend" STUB_AIRFLOW_PASSWORD = "stub-password" # noqa: S105 -HELLO_WORLD_DAG_ID = "print_hello_world" -HELLO_WORLD_TAG = "hello_world" +HELLO_WORLD_DAG_ID = constants.DAG_ID_HELLO_WORLD +HELLO_WORLD_TAG = constants.DAG_ID_HELLO_WORLD class StubAirflowState(TypedDict): diff --git a/components/dags/pyproject.toml b/components/dags/pyproject.toml index 4974499b..4fe9ffb6 100644 --- a/components/dags/pyproject.toml +++ b/components/dags/pyproject.toml @@ -5,6 +5,7 @@ description = "DAG component of the elevation production system" readme = "README.md" requires-python = ">=3.12" dependencies = [ + "pinta-db", "pinta-processing", ] [project.optional-dependencies] @@ -22,8 +23,8 @@ module-name = "pinta_dags" include = ["py.typed"] [tool.uv.sources] -pinta-db = { workspace = true } pinta-processing = { workspace = true } +pinta-db = { workspace = true } [tool.uv] default-groups = ["dev", "lint"] diff --git a/components/dags/src/pinta_dags/dags/calculate_reference_dem.py b/components/dags/src/pinta_dags/dags/calculate_reference_dem.py index adfdd00a..a3296929 100644 --- a/components/dags/src/pinta_dags/dags/calculate_reference_dem.py +++ b/components/dags/src/pinta_dags/dags/calculate_reference_dem.py @@ -6,6 +6,7 @@ from typing import cast from airflow.sdk import DAG, Param, TriggerRule, Variable, dag, task +from pinta_common import constants from pinta_dags import config @@ -60,6 +61,7 @@ def create_calculate_reference_dem_dag( # noqa: C901, PLR0915 ) -> DAG: @dag( dag_id=dag_id, + tags=[dag_id], dag_display_name="Calculate reference dem", schedule=None, params={ @@ -70,6 +72,7 @@ def create_calculate_reference_dem_dag( # noqa: C901, PLR0915 description=("Production area id as UUID"), ) }, + is_paused_upon_creation=False, ) def calculate_reference_dem_dag() -> None: # noqa: C901, PLR0915 @@ -217,7 +220,8 @@ def initialize_dem_tables( max_active_tis_per_dag=_get_max_parallel_pipelines(), ) def blast2dem( # noqa: PLR0913 - connection_uri: str, + primary_connection_uri: str, + job_connection_uri: str, input_path: str, schema: str, table: str, @@ -233,10 +237,17 @@ def blast2dem( # noqa: PLR0913 import sqlmodel from pinta_processing import pipelines - engine = sqlalchemy.create_engine(connection_uri) - with sqlmodel.Session(engine) as session: + with ( + sqlmodel.Session( + sqlalchemy.create_engine(primary_connection_uri) + ) as primary_session, + sqlmodel.Session( + sqlalchemy.create_engine(job_connection_uri) + ) as job_session, + ): pipeline = pipelines.blast2dem_to_postgis( - session=session, + primary_session=primary_session, + job_session=job_session, input_path=Path(input_path), schema=schema, table_name=table, @@ -298,7 +309,8 @@ def merge_dem_staging_tables( pixel_size = "{{ var.value.pinta_db_dem_pixel_size }}" blast2dem_task = blast2dem.partial( - connection_uri=job_db_uri, + primary_connection_uri=primary_connection_uri, + job_connection_uri=job_db_uri, schema=DB_SCHEMA, table=DB_TABLE, step=pixel_size, @@ -329,6 +341,6 @@ def merge_dem_staging_tables( return calculate_reference_dem_dag() -DAG_ID = "calculate_reference_dem" +DAG_ID = constants.DAG_ID_CALCULATE_REFERENCE_DEM globals()[DAG_ID] = create_calculate_reference_dem_dag(dag_id=DAG_ID) diff --git a/components/dags/src/pinta_dags/dags/load_dem.py b/components/dags/src/pinta_dags/dags/load_dem.py index cd7cf708..a8c66511 100644 --- a/components/dags/src/pinta_dags/dags/load_dem.py +++ b/components/dags/src/pinta_dags/dags/load_dem.py @@ -6,6 +6,7 @@ from typing import Any from airflow.sdk import DAG, Param, Variable, dag, task +from pinta_common import constants from pinta_dags import config @@ -167,6 +168,6 @@ def merge_dem_staging_tables(connection_uri: str, staging_tables: int) -> None: return load_dem_from_files_dag() -DAG_ID = "load_dem_from_files" +DAG_ID = constants.DAG_ID_LOAD_DEM globals()[DAG_ID] = load_dem_dag(dag_id=DAG_ID) diff --git a/components/dags/src/pinta_dags/dags/print_hello_world.py b/components/dags/src/pinta_dags/dags/print_hello_world.py index 20b44b4b..e9b424eb 100644 --- a/components/dags/src/pinta_dags/dags/print_hello_world.py +++ b/components/dags/src/pinta_dags/dags/print_hello_world.py @@ -6,6 +6,7 @@ """Example DAG with external venv task & connection URI variable.""" from airflow.sdk import DAG, Param, chain, dag, task +from pinta_common import constants from pinta_dags import config @@ -18,7 +19,7 @@ def create_print_hello_world_dag( dag_id=dag_id, dag_display_name="Print hello world", schedule=None, - tags=["hello_world"], + tags=[dag_id], params={ "name": Param( "World", @@ -55,6 +56,6 @@ def hello_world_task_docker(connection_uri: str, name: str) -> None: return hello_world_dag() -DAG_ID = "print_hello_world" +DAG_ID = constants.DAG_ID_HELLO_WORLD globals()[DAG_ID] = create_print_hello_world_dag(dag_id=DAG_ID) diff --git a/components/dags/src/pinta_dags/dags/process_production_areas.py b/components/dags/src/pinta_dags/dags/process_production_areas.py index c132f58c..5a78d2e6 100644 --- a/components/dags/src/pinta_dags/dags/process_production_areas.py +++ b/components/dags/src/pinta_dags/dags/process_production_areas.py @@ -8,6 +8,7 @@ import datetime from airflow.sdk import DAG, chain, dag, task +from pinta_common import constants from pinta_dags import config from pinta_dags.sensors.folder_hash_sensor import FolderHashSensor @@ -70,6 +71,6 @@ def store_checksums(changed_folders: list[dict[str, str]]) -> None: return process_production_areas_dag() -DAG_ID = "process_production_areas" +DAG_ID = constants.DAG_ID_PROCESS_PRODUCTION_AREAS globals()[DAG_ID] = create_process_production_areas_dag(dag_id=DAG_ID) diff --git a/components/db/pyproject.toml b/components/db/pyproject.toml index 3b76efb2..2d21740c 100644 --- a/components/db/pyproject.toml +++ b/components/db/pyproject.toml @@ -57,7 +57,7 @@ requires = ["uv_build>=0.10.9,<0.12.0"] build-backend = "uv_build" [tool.uv.build-backend] -module-name = "pinta_db" +module-name = ["pinta_db", "pinta_common"] include = ["py.typed"] [tool.ruff] diff --git a/components/db/src/pinta_common/constants.py b/components/db/src/pinta_common/constants.py new file mode 100644 index 00000000..5487423e --- /dev/null +++ b/components/db/src/pinta_common/constants.py @@ -0,0 +1,10 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +# DAG ids +DAG_ID_HELLO_WORLD = "print_hello_world" +DAG_ID_LOAD_DEM = "load_dem_from_files" +DAG_ID_PROCESS_PRODUCTION_AREAS = "process_production_areas" +DAG_ID_CALCULATE_REFERENCE_DEM = "calculate_reference_dem" diff --git a/components/db/src/pinta_common/py.typed b/components/db/src/pinta_common/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/components/e2e/pyproject.toml b/components/e2e/pyproject.toml index 7d7479f9..b403368f 100644 --- a/components/e2e/pyproject.toml +++ b/components/e2e/pyproject.toml @@ -63,3 +63,4 @@ testpaths = [ ] addopts = "-n auto --dist loadgroup --maxprocesses=4 --import-mode=importlib" timeout = 180 +timeout_method = "thread" diff --git a/components/e2e/test_e2e/conftest.py b/components/e2e/test_e2e/conftest.py index 72047513..8996cd80 100644 --- a/components/e2e/test_e2e/conftest.py +++ b/components/e2e/test_e2e/conftest.py @@ -9,6 +9,8 @@ import pytest import qgis.utils +import sqlmodel +from pinta_db.primary_db.models.management import ProductionArea from pinta_db_test_utils import db_utils from pinta_db_utils import engine_utils from pinta_e2e_utils import constants @@ -178,3 +180,19 @@ def processed_production_areas( f"DAG run finished with state={state}\n" + airflow_client.describe_failed_run(run) ) + + +@pytest.fixture +def reduce_point_cloud_tiles( + processed_production_areas: None, + db: "Session", +) -> None: + """Delete most point cloud tiles in the test database.""" + production_area = db.exec(sqlmodel.select(ProductionArea)).first() + assert production_area is not None + + for tile in sorted(production_area.tiles, key=lambda tile: tile.file_path)[2:]: + db.delete(tile) + db.commit() + + assert len(db.exec(sqlmodel.select(ProductionArea)).first().tiles) == 2 diff --git a/components/e2e/test_e2e/test_dem_workflows.py b/components/e2e/test_e2e/test_dem_workflows.py index 6e02afaa..ac0b8af6 100644 --- a/components/e2e/test_e2e/test_dem_workflows.py +++ b/components/e2e/test_e2e/test_dem_workflows.py @@ -3,6 +3,7 @@ # This file is part of the Pinta. # Licensed under the MIT License; see the repository LICENSE file. +import functools import typing import pytest @@ -17,7 +18,7 @@ @pytest.mark.xdist_group("airflow") -@pytest.mark.usefixtures("processed_production_areas") +@pytest.mark.usefixtures("processed_production_areas", "reduce_point_cloud_tiles") def test_reference_dem_workflow( qgis_plugin: "Plugin", qtbot: "QtBot", m_error_dialog: "MagicMock" ) -> None: @@ -31,12 +32,11 @@ def test_reference_dem_workflow( feature = next(production_area_layer.getFeatures()) layers.run_layer_action(production_area_layer, action, feature) - def check_state() -> None: + def check_state(statuses: list[str]) -> None: production_area_layer.reload() updated_feature = next(production_area_layer.getFeatures()) - assert updated_feature["processing_status"] == "queued" + assert updated_feature["processing_status"] in statuses m_error_dialog.assert_not_called() - qtbot.waitUntil(check_state, timeout=30000) - - # TODO: check for success when using the correct DAG + qtbot.waitUntil(functools.partial(check_state, ["queued", "started"]), timeout=5000) + qtbot.waitUntil(functools.partial(check_state, ["completed"]), timeout=40000) diff --git a/components/e2e/test_e2e/test_workflow_trigger.py b/components/e2e/test_e2e/test_workflow_trigger.py index 775e240f..c90110b3 100644 --- a/components/e2e/test_e2e/test_workflow_trigger.py +++ b/components/e2e/test_e2e/test_workflow_trigger.py @@ -5,10 +5,10 @@ import pytest import requests +from pinta_common import constants from pinta_e2e_utils.airflow_client import AirflowClient, DagRun -HELLO_WORLD_TAG = "hello_world" -HELLO_WORLD_DAG_ID = "print_hello_world" +HELLO_WORLD_TAG = constants.DAG_ID_HELLO_WORLD DAG_RUN_TIMEOUT_S = 300.0 @@ -25,7 +25,7 @@ def test_print_hello_world_runs_to_success( ) assert response.status_code == 202, response.text dag = DagRun.from_api(response.json()) - assert dag.id == HELLO_WORLD_DAG_ID + assert dag.id == HELLO_WORLD_TAG assert dag.run_id.startswith("manual__") state = airflow_client.wait_for_dag_run(dag, timeout=DAG_RUN_TIMEOUT_S) diff --git a/components/processing/src/pinta_processing/pipelines.py b/components/processing/src/pinta_processing/pipelines.py index be75c9be..ca5e177b 100644 --- a/components/processing/src/pinta_processing/pipelines.py +++ b/components/processing/src/pinta_processing/pipelines.py @@ -92,7 +92,8 @@ def blast2dem_to_geotiff( # noqa: PLR0913 def blast2dem_to_postgis( # noqa: PLR0913 - session: Session, + primary_session: Session, + job_session: Session, input_path: Path, schema: str, table_name: str, @@ -105,7 +106,7 @@ def blast2dem_to_postgis( # noqa: PLR0913 """Read LAS/LAZ with blast2dem and write to PostGIS with overviews.""" bounds = tm35_map_sheet_utils.calculate_sheet_bounds_for_tile(input_path.stem) neighbor_paths = find_intersecting_tiles.find_neighboring_tm35_laz_files( - input_path, DEFAULT_BUFFERED, session + input_path, DEFAULT_BUFFERED, primary_session ) LOGGER.debug("Found %d neighbor tiles", len(neighbor_paths)) neighbors_param = {} @@ -129,9 +130,9 @@ def blast2dem_to_postgis( # noqa: PLR0913 extra_lastools_params=extra_lastools_params, ), # Calculate and write overviews - *_generate_overview_stages(schema, table_name, session, staging_tables), + *_generate_overview_stages(schema, table_name, job_session, staging_tables), # Write original data - writer.RasterPostgisWriter(schema, table_name, session, staging_tables), + writer.RasterPostgisWriter(schema, table_name, job_session, staging_tables), ] ) diff --git a/components/processing/test_integration_processing/pipelines/test_blast2dem_pipelines.py b/components/processing/test_integration_processing/pipelines/test_blast2dem_pipelines.py index 652c5b31..7b38598c 100644 --- a/components/processing/test_integration_processing/pipelines/test_blast2dem_pipelines.py +++ b/components/processing/test_integration_processing/pipelines/test_blast2dem_pipelines.py @@ -78,7 +78,8 @@ def test_blast2dem_to_postgis( input_path = pinta_utils.get_test_data_path(_LAZ_FILE) pipeline = pipelines.blast2dem_to_postgis( - session=processing_worker_session, + primary_session=processing_worker_session, + job_session=processing_worker_session, input_path=input_path, schema=schema, table_name=table_name, diff --git a/components/processing/test_processing/pinta_processing/test_pipelines.py b/components/processing/test_processing/pinta_processing/test_pipelines.py index 0d080755..89b6286c 100644 --- a/components/processing/test_processing/pinta_processing/test_pipelines.py +++ b/components/processing/test_processing/pinta_processing/test_pipelines.py @@ -19,7 +19,8 @@ def test_blast2dem_to_postgis_uses_extra_param_defaults(mocker: MockerFixture) - ) pipelines.blast2dem_to_postgis( - session=MagicMock(), + primary_session=MagicMock(), + job_session=MagicMock(), input_path=Path("/tmp/dir/N5122B4_1.laz"), schema="test", table_name="test", @@ -46,7 +47,8 @@ def test_blast2dem_to_postgis_override_extra_param_defaults( ) pipelines.blast2dem_to_postgis( - session=MagicMock(), + primary_session=MagicMock(), + job_session=MagicMock(), input_path=Path("/tmp/dir/N5122B4_1.laz"), schema="test", table_name="test", diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py index 61127e13..4fd8ef56 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py +++ b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py @@ -23,6 +23,7 @@ from typing import Any, NoReturn, ParamSpec, TypeVar import requests +from pinta_common import constants from qgis.core import QgsSettings from qgis.PyQt.QtCore import QLocale from qgis_plugin_tools.tools.i18n import tr @@ -89,8 +90,8 @@ def start_reference_dem_workflow(self, production_area_id: str) -> None: """Starts a DEM update workflow for the given production area.""" # TODO: correct signature and params self._start_workflow( - "hello_world", - {"name": production_area_id}, + constants.DAG_ID_CALCULATE_REFERENCE_DEM, + {"id": production_area_id}, production_area_id=production_area_id, ) MsgBar.info( diff --git a/components/qgis_plugin/test_qgis/processing/test_api_client.py b/components/qgis_plugin/test_qgis/processing/test_api_client.py index 3f41e1cf..1f888219 100644 --- a/components/qgis_plugin/test_qgis/processing/test_api_client.py +++ b/components/qgis_plugin/test_qgis/processing/test_api_client.py @@ -20,11 +20,16 @@ import pytest import requests +from pinta_common import constants from pytest_mock import MockerFixture from pinta_qgis_plugin import exceptions from pinta_qgis_plugin.api import api_client +EXPECTED_URL = ( + f"http://example.test/workflows/{constants.DAG_ID_CALCULATE_REFERENCE_DEM}" +) + @pytest.fixture def client() -> api_client.PintaAPIClient: @@ -66,8 +71,8 @@ def test_start_reference_dem_workflow_posts_workflow_payload( client.start_reference_dem_workflow("area-1") mock_post.assert_called_once_with( - "http://example.test/workflows/hello_world", - json={"parameters": {"name": "area-1"}, "production_area_id": "area-1"}, + EXPECTED_URL, + json={"parameters": {"id": "area-1"}, "production_area_id": "area-1"}, timeout=10, ) mock_post.return_value.raise_for_status.assert_called_once_with() @@ -118,7 +123,7 @@ def test_start_reference_dem_workflow_http_error_without_json_detail_uses_fallba response = MagicMock(spec=requests.Response) response.json.side_effect = requests.exceptions.JSONDecodeError("err", "", 0) response.text = "500 internal" - response.url = "http://example.test/workflows/hello_world" + response.url = "http://example.test/workflows/" http_error = requests.exceptions.HTTPError(response=response) mock_post.return_value.raise_for_status.side_effect = http_error @@ -135,7 +140,7 @@ def test_start_reference_dem_workflow_http_error_with_none_detail_uses_fallback( response = MagicMock(spec=requests.Response) response.json.return_value = {"detail": None} response.text = "boom" - response.url = "http://example.test/workflows/hello_world" + response.url = EXPECTED_URL http_error = requests.exceptions.HTTPError(response=response) mock_post.return_value.raise_for_status.side_effect = http_error diff --git a/uv.lock b/uv.lock index d137bc86..c1f0f179 100644 --- a/uv.lock +++ b/uv.lock @@ -2367,6 +2367,7 @@ name = "pinta-dags" version = "0.0.0" source = { editable = "components/dags" } dependencies = [ + { name = "pinta-db" }, { name = "pinta-processing" }, ] @@ -2396,6 +2397,7 @@ lint = [ [package.metadata] requires-dist = [ { name = "apache-airflow", extras = ["docker", "postgres", "standard"], marker = "extra == 'airflow'", specifier = "==3.1.8" }, + { name = "pinta-db", editable = "components/db" }, { name = "pinta-db", extras = ["psycopg"], marker = "extra == 'airflow'", editable = "components/db" }, { name = "pinta-processing", editable = "components/processing" }, ]