Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
from typing import Annotated, Any, TypedDict

import fastapi
from pinta_common import constants

STUB_BEARER_TOKEN = "stub-token" # noqa: S105
STUB_AIRFLOW_USERNAME = "pinta-backend"
STUB_AIRFLOW_PASSWORD = "stub-password" # noqa: S105

HELLO_WORLD_DAG_ID = "print_hello_world"
HELLO_WORLD_TAG = "hello_world"
HELLO_WORLD_DAG_ID = constants.DAG_ID_HELLO_WORLD
HELLO_WORLD_TAG = constants.DAG_ID_HELLO_WORLD


class StubAirflowState(TypedDict):
Expand Down
3 changes: 2 additions & 1 deletion components/dags/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ description = "DAG component of the elevation production system"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"pinta-db",
"pinta-processing",
]
[project.optional-dependencies]
Expand All @@ -22,8 +23,8 @@ module-name = "pinta_dags"
include = ["py.typed"]

[tool.uv.sources]
pinta-db = { workspace = true }
pinta-processing = { workspace = true }
pinta-db = { workspace = true }

[tool.uv]
default-groups = ["dev", "lint"]
Expand Down
24 changes: 18 additions & 6 deletions components/dags/src/pinta_dags/dags/calculate_reference_dem.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import cast

from airflow.sdk import DAG, Param, TriggerRule, Variable, dag, task
from pinta_common import constants

from pinta_dags import config

Expand Down Expand Up @@ -60,6 +61,7 @@ def create_calculate_reference_dem_dag( # noqa: C901, PLR0915
) -> DAG:
@dag(
dag_id=dag_id,
tags=[dag_id],
dag_display_name="Calculate reference dem",
schedule=None,
params={
Expand All @@ -70,6 +72,7 @@ def create_calculate_reference_dem_dag( # noqa: C901, PLR0915
description=("Production area id as UUID"),
)
},
is_paused_upon_creation=False,
)
def calculate_reference_dem_dag() -> None: # noqa: C901, PLR0915

Expand Down Expand Up @@ -217,7 +220,8 @@ def initialize_dem_tables(
max_active_tis_per_dag=_get_max_parallel_pipelines(),
)
def blast2dem( # noqa: PLR0913
connection_uri: str,
primary_connection_uri: str,
job_connection_uri: str,
input_path: str,
schema: str,
table: str,
Expand All @@ -233,10 +237,17 @@ def blast2dem( # noqa: PLR0913
import sqlmodel
from pinta_processing import pipelines

engine = sqlalchemy.create_engine(connection_uri)
with sqlmodel.Session(engine) as session:
with (
sqlmodel.Session(
sqlalchemy.create_engine(primary_connection_uri)
) as primary_session,
sqlmodel.Session(
sqlalchemy.create_engine(job_connection_uri)
) as job_session,
):
pipeline = pipelines.blast2dem_to_postgis(
session=session,
primary_session=primary_session,
job_session=job_session,
input_path=Path(input_path),
schema=schema,
table_name=table,
Expand Down Expand Up @@ -298,7 +309,8 @@ def merge_dem_staging_tables(

pixel_size = "{{ var.value.pinta_db_dem_pixel_size }}"
blast2dem_task = blast2dem.partial(
connection_uri=job_db_uri,
primary_connection_uri=primary_connection_uri,
job_connection_uri=job_db_uri,
schema=DB_SCHEMA,
table=DB_TABLE,
step=pixel_size,
Expand Down Expand Up @@ -329,6 +341,6 @@ def merge_dem_staging_tables(
return calculate_reference_dem_dag()


DAG_ID = "calculate_reference_dem"
DAG_ID = constants.DAG_ID_CALCULATE_REFERENCE_DEM

globals()[DAG_ID] = create_calculate_reference_dem_dag(dag_id=DAG_ID)
3 changes: 2 additions & 1 deletion components/dags/src/pinta_dags/dags/load_dem.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any

from airflow.sdk import DAG, Param, Variable, dag, task
from pinta_common import constants

from pinta_dags import config

Expand Down Expand Up @@ -167,6 +168,6 @@ def merge_dem_staging_tables(connection_uri: str, staging_tables: int) -> None:
return load_dem_from_files_dag()


DAG_ID = "load_dem_from_files"
DAG_ID = constants.DAG_ID_LOAD_DEM

globals()[DAG_ID] = load_dem_dag(dag_id=DAG_ID)
5 changes: 3 additions & 2 deletions components/dags/src/pinta_dags/dags/print_hello_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""Example DAG with external venv task & connection URI variable."""

from airflow.sdk import DAG, Param, chain, dag, task
from pinta_common import constants

from pinta_dags import config

Expand All @@ -18,7 +19,7 @@ def create_print_hello_world_dag(
dag_id=dag_id,
dag_display_name="Print hello world",
schedule=None,
tags=["hello_world"],
tags=[dag_id],
params={
"name": Param(
"World",
Expand Down Expand Up @@ -55,6 +56,6 @@ def hello_world_task_docker(connection_uri: str, name: str) -> None:
return hello_world_dag()


DAG_ID = "print_hello_world"
DAG_ID = constants.DAG_ID_HELLO_WORLD

globals()[DAG_ID] = create_print_hello_world_dag(dag_id=DAG_ID)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import datetime

from airflow.sdk import DAG, chain, dag, task
from pinta_common import constants

from pinta_dags import config
from pinta_dags.sensors.folder_hash_sensor import FolderHashSensor
Expand Down Expand Up @@ -70,6 +71,6 @@ def store_checksums(changed_folders: list[dict[str, str]]) -> None:
return process_production_areas_dag()


DAG_ID = "process_production_areas"
DAG_ID = constants.DAG_ID_PROCESS_PRODUCTION_AREAS

globals()[DAG_ID] = create_process_production_areas_dag(dag_id=DAG_ID)
2 changes: 1 addition & 1 deletion components/db/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ requires = ["uv_build>=0.10.9,<0.12.0"]
build-backend = "uv_build"

[tool.uv.build-backend]
module-name = "pinta_db"
module-name = ["pinta_db", "pinta_common"]
include = ["py.typed"]

[tool.ruff]
Expand Down
10 changes: 10 additions & 0 deletions components/db/src/pinta_common/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (c) 2026 National Land Survey of Finland
# (https://www.maanmittauslaitos.fi/en).
# This file is part of the Pinta.
# Licensed under the MIT License; see the repository LICENSE file.

# DAG ids
DAG_ID_HELLO_WORLD = "print_hello_world"
DAG_ID_LOAD_DEM = "load_dem_from_files"
DAG_ID_PROCESS_PRODUCTION_AREAS = "process_production_areas"
DAG_ID_CALCULATE_REFERENCE_DEM = "calculate_reference_dem"
Empty file.
1 change: 1 addition & 0 deletions components/e2e/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ testpaths = [
]
addopts = "-n auto --dist loadgroup --maxprocesses=4 --import-mode=importlib"
timeout = 180
timeout_method = "thread"
18 changes: 18 additions & 0 deletions components/e2e/test_e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import pytest
import qgis.utils
import sqlmodel
from pinta_db.primary_db.models.management import ProductionArea
from pinta_db_test_utils import db_utils
from pinta_db_utils import engine_utils
from pinta_e2e_utils import constants
Expand Down Expand Up @@ -178,3 +180,19 @@ def processed_production_areas(
f"DAG run finished with state={state}\n"
+ airflow_client.describe_failed_run(run)
)


@pytest.fixture
def reduce_point_cloud_tiles(
processed_production_areas: None,
db: "Session",
) -> None:
"""Delete most point cloud tiles in the test database."""
production_area = db.exec(sqlmodel.select(ProductionArea)).first()
assert production_area is not None

for tile in sorted(production_area.tiles, key=lambda tile: tile.file_path)[2:]:
db.delete(tile)
db.commit()

assert len(db.exec(sqlmodel.select(ProductionArea)).first().tiles) == 2
12 changes: 6 additions & 6 deletions components/e2e/test_e2e/test_dem_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# This file is part of the Pinta.
# Licensed under the MIT License; see the repository LICENSE file.

import functools
import typing

import pytest
Expand All @@ -17,7 +18,7 @@


@pytest.mark.xdist_group("airflow")
@pytest.mark.usefixtures("processed_production_areas")
@pytest.mark.usefixtures("processed_production_areas", "reduce_point_cloud_tiles")
def test_reference_dem_workflow(
qgis_plugin: "Plugin", qtbot: "QtBot", m_error_dialog: "MagicMock"
) -> None:
Expand All @@ -31,12 +32,11 @@ def test_reference_dem_workflow(
feature = next(production_area_layer.getFeatures())
layers.run_layer_action(production_area_layer, action, feature)

def check_state() -> None:
def check_state(statuses: list[str]) -> None:
production_area_layer.reload()
updated_feature = next(production_area_layer.getFeatures())
assert updated_feature["processing_status"] == "queued"
assert updated_feature["processing_status"] in statuses

m_error_dialog.assert_not_called()
qtbot.waitUntil(check_state, timeout=30000)

# TODO: check for success when using the correct DAG
qtbot.waitUntil(functools.partial(check_state, ["queued", "started"]), timeout=5000)
qtbot.waitUntil(functools.partial(check_state, ["completed"]), timeout=40000)
6 changes: 3 additions & 3 deletions components/e2e/test_e2e/test_workflow_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

import pytest
import requests
from pinta_common import constants
from pinta_e2e_utils.airflow_client import AirflowClient, DagRun

HELLO_WORLD_TAG = "hello_world"
HELLO_WORLD_DAG_ID = "print_hello_world"
HELLO_WORLD_TAG = constants.DAG_ID_HELLO_WORLD
DAG_RUN_TIMEOUT_S = 300.0


Expand All @@ -25,7 +25,7 @@ def test_print_hello_world_runs_to_success(
)
assert response.status_code == 202, response.text
dag = DagRun.from_api(response.json())
assert dag.id == HELLO_WORLD_DAG_ID
assert dag.id == HELLO_WORLD_TAG
assert dag.run_id.startswith("manual__")

state = airflow_client.wait_for_dag_run(dag, timeout=DAG_RUN_TIMEOUT_S)
Expand Down
9 changes: 5 additions & 4 deletions components/processing/src/pinta_processing/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def blast2dem_to_geotiff( # noqa: PLR0913


def blast2dem_to_postgis( # noqa: PLR0913
session: Session,
primary_session: Session,
job_session: Session,
input_path: Path,
schema: str,
table_name: str,
Expand All @@ -105,7 +106,7 @@ def blast2dem_to_postgis( # noqa: PLR0913
"""Read LAS/LAZ with blast2dem and write to PostGIS with overviews."""
bounds = tm35_map_sheet_utils.calculate_sheet_bounds_for_tile(input_path.stem)
neighbor_paths = find_intersecting_tiles.find_neighboring_tm35_laz_files(
input_path, DEFAULT_BUFFERED, session
input_path, DEFAULT_BUFFERED, primary_session
)
LOGGER.debug("Found %d neighbor tiles", len(neighbor_paths))
neighbors_param = {}
Expand All @@ -129,9 +130,9 @@ def blast2dem_to_postgis( # noqa: PLR0913
extra_lastools_params=extra_lastools_params,
),
# Calculate and write overviews
*_generate_overview_stages(schema, table_name, session, staging_tables),
*_generate_overview_stages(schema, table_name, job_session, staging_tables),
# Write original data
writer.RasterPostgisWriter(schema, table_name, session, staging_tables),
writer.RasterPostgisWriter(schema, table_name, job_session, staging_tables),
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def test_blast2dem_to_postgis(
input_path = pinta_utils.get_test_data_path(_LAZ_FILE)

pipeline = pipelines.blast2dem_to_postgis(
session=processing_worker_session,
primary_session=processing_worker_session,
job_session=processing_worker_session,
input_path=input_path,
schema=schema,
table_name=table_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def test_blast2dem_to_postgis_uses_extra_param_defaults(mocker: MockerFixture) -
)

pipelines.blast2dem_to_postgis(
session=MagicMock(),
primary_session=MagicMock(),
job_session=MagicMock(),
input_path=Path("/tmp/dir/N5122B4_1.laz"),
schema="test",
table_name="test",
Expand All @@ -46,7 +47,8 @@ def test_blast2dem_to_postgis_override_extra_param_defaults(
)

pipelines.blast2dem_to_postgis(
session=MagicMock(),
primary_session=MagicMock(),
job_session=MagicMock(),
input_path=Path("/tmp/dir/N5122B4_1.laz"),
schema="test",
table_name="test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from typing import Any, NoReturn, ParamSpec, TypeVar

import requests
from pinta_common import constants
from qgis.core import QgsSettings
from qgis.PyQt.QtCore import QLocale
from qgis_plugin_tools.tools.i18n import tr
Expand Down Expand Up @@ -89,8 +90,8 @@ def start_reference_dem_workflow(self, production_area_id: str) -> None:
"""Starts a DEM update workflow for the given production area."""
# TODO: correct signature and params
self._start_workflow(
"hello_world",
{"name": production_area_id},
constants.DAG_ID_CALCULATE_REFERENCE_DEM,
{"id": production_area_id},
production_area_id=production_area_id,
)
MsgBar.info(
Expand Down
Loading
Loading