From e7fc2d667cb5974e0ba7b30b9f8093a966ea6759 Mon Sep 17 00:00:00 2001 From: vaahtokarkki Date: Sun, 7 Jun 2026 15:43:21 +0000 Subject: [PATCH] feat(db): initialize only staging tables for processing All main table should be present in the db and we need to initialize only the staging tables. --- .../scripts/initialize_processing_tables.py | 47 +++--- .../db/src/pinta_db_utils/postgis/raster.py | 57 +++---- .../postgis/test_raster.py | 147 +++++++++++++++++- .../pipelines/test_blast2dem_pipelines.py | 2 +- .../test_rasterio_to_postgis_pipeline.py | 2 +- 5 files changed, 193 insertions(+), 62 deletions(-) diff --git a/components/db/scripts/initialize_processing_tables.py b/components/db/scripts/initialize_processing_tables.py index 5cd8f1c4..1f244ac9 100644 --- a/components/db/scripts/initialize_processing_tables.py +++ b/components/db/scripts/initialize_processing_tables.py @@ -3,7 +3,7 @@ # This file is part of the Pinta. # Licensed under the MIT License; see the repository LICENSE file. -"""Materialize the canonical `processing` schema raster tables. +"""Materialize the canonical `processing` schema staging raster tables. Meant to be run by CI for maintaining documentation. """ @@ -27,12 +27,8 @@ _TABLE = "dem" -def _exists(inspector: sa.Inspector, table_name: str) -> bool: - return inspector.has_table(table_name, schema=_SCHEMA) - - def main() -> None: - """Create the canonical processing-schema raster tables if missing.""" + """Create canonical processing-schema raster staging tables if missing.""" credentials = db_utils.get_primary_processing_worker_credentials( os.environ["DB_PRIMARY_NAME"] ) @@ -41,32 +37,35 @@ def main() -> None: with sqlmodel.Session(engine) as session: inspector = sa.inspect(engine) - if _exists(inspector, _TABLE): - logger.info("%s.%s already exists, skipping", _SCHEMA, _TABLE) - else: - raster.initialize_raster_table(session, _SCHEMA, _TABLE) - logger.info("created %s.%s", _SCHEMA, _TABLE) + if not inspector.has_table(_TABLE, schema=_SCHEMA): + msg = ( + f"{_SCHEMA}.{_TABLE} is missing; main raster tables must " + "come from the template database" + ) + raise SystemExit(msg) - inspector = sa.inspect(engine) overview_names = [ raster.OVERVIEW_TABLE_NAME.format(level=level, table_name=_TABLE) for level in raster.DEFAULT_OVERVIEW_LEVELS ] - existing = [name for name in overview_names if _exists(inspector, name)] - if len(existing) == len(overview_names): - logger.info( - "all overview tables in %s already exist, skipping", _SCHEMA - ) - elif existing: + missing = [ + name + for name in overview_names + if not inspector.has_table(name, schema=_SCHEMA) + ] + if missing: msg = ( - f"partial overview tables present in {_SCHEMA} ({existing}); " - "drop them manually before re-running" + f"overview tables missing in {_SCHEMA} ({missing}); main " + "overview tables must come from the template database" ) raise SystemExit(msg) - else: - raster.initialize_overview_tables(session, _SCHEMA, _TABLE) - session.commit() - logger.info("created overview tables in %s", _SCHEMA) + + raster.initialize_raster_table(session, _SCHEMA, _TABLE) + logger.info("initialized staging tables for %s.%s", _SCHEMA, _TABLE) + + raster.initialize_overview_tables(session, _SCHEMA, _TABLE) + session.commit() + logger.info("initialized overview staging tables in %s", _SCHEMA) finally: engine.dispose() diff --git a/components/db/src/pinta_db_utils/postgis/raster.py b/components/db/src/pinta_db_utils/postgis/raster.py index 9285b106..e9d6be02 100644 --- a/components/db/src/pinta_db_utils/postgis/raster.py +++ b/components/db/src/pinta_db_utils/postgis/raster.py @@ -44,9 +44,10 @@ def initialize_raster_table( staging_tables: int = 1, extra_columns: abc.Callable[[], list[sa.Column]] | None = None, ) -> None: - """Initialize a raster table with optional staging tables. + """Initialize staging tables for an existing raster table. - Creates a main table and staging tables (when specified) with: + The main raster table is expected to already exist in the database, usually + from the template database. Creates staging tables (when specified) with: - rid: serial primary key - rast: raster column - Additional custom columns (optional) @@ -60,16 +61,7 @@ def initialize_raster_table( TOAST tuple target optimized TOAST chunk size. Staging tables are created as UNLOGGED with autovacuum disabled for better performance. """ - table_created = _create_raster_table( - session, - schema, - table_name, - extra_columns=extra_columns() if extra_columns else None, - ) - if table_created: - constraints.add_raster_constraints( - session, schema, table_name, pixel_size=env.DEM_PIXEL_SIZE - ) + _ensure_raster_table_exists(session, schema, table_name) for i in range(staging_tables): staging_name = f"{table_name}_p{i}" @@ -94,27 +86,20 @@ def initialize_overview_tables( table_name: str, staging_tables: int = 1, ) -> None: - """Initialize, register and index overview tables with optional staging tables. + """Initialize staging tables for existing overview tables. - Creates a main table and staging tables with: + The main overview tables are expected to already exist in the database, + usually from the template database. Creates staging tables with: - rid: serial primary key - rast: raster column - - The main overview tables are also registered against the reference raster - table with PostGIS overview constraints and receive raster envelope indexes. """ + _ensure_raster_table_exists(session, schema, table_name) for level in DEFAULT_OVERVIEW_LEVELS: overview_name = OVERVIEW_TABLE_NAME.format(level=level, table_name=table_name) - table_created = _create_raster_table( - session, - schema, - overview_name, - ) - if table_created: - constraints.add_raster_constraints( - session, schema, overview_name, pixel_size=env.DEM_PIXEL_SIZE * level - ) + _ensure_raster_table_exists(session, schema, overview_name) + for level in DEFAULT_OVERVIEW_LEVELS: + overview_name = OVERVIEW_TABLE_NAME.format(level=level, table_name=table_name) for i in range(staging_tables): staging_name = f"{overview_name}_p{i}" _create_raster_table( @@ -127,8 +112,6 @@ def initialize_overview_tables( session, schema, staging_name, pixel_size=env.DEM_PIXEL_SIZE * level ) - _register_overview_table(session, schema, table_name, overview_name, level) - _create_raster_index(session, schema, overview_name) session.commit() @@ -334,6 +317,24 @@ def _create_raster_table( return True +def _ensure_raster_table_exists( + session: sqlmodel.Session, + schema: str, + table_name: str, +) -> None: + """Raise if a raster table expected from the template database is missing.""" + inspector = sa.inspect(session.connection()) + if inspector.has_table(table_name, schema=schema): + return + + msg = ( + f"Expected raster table {schema}.{table_name} to exist in the database. " + "Main raster tables must be created by the template database; " + "initialization only creates staging tables." + ) + raise ValueError(msg) + + def _create_raster_index( session: sqlmodel.Session, schema: str, diff --git a/components/db/test_integration_db/postgis/test_raster.py b/components/db/test_integration_db/postgis/test_raster.py index d51e2845..205e8e82 100644 --- a/components/db/test_integration_db/postgis/test_raster.py +++ b/components/db/test_integration_db/postgis/test_raster.py @@ -7,8 +7,56 @@ import sqlalchemy as sa import sqlmodel +from pinta_common import env from pinta_db.primary_db.schema import Schema -from pinta_db_utils.postgis import raster +from pinta_db_utils.postgis import constraints, raster + + +def _create_template_raster_table( + session: sqlmodel.Session, + schema: str, + table_name: str, + extra_columns: list[sa.Column] | None = None, + pixel_size: int = env.DEM_PIXEL_SIZE, +) -> None: + """Create a main raster table to simulate the template database.""" + created = raster._create_raster_table( + session, + schema, + table_name, + extra_columns=extra_columns, + ) + if created: + constraints.add_raster_constraints(session, schema, table_name, pixel_size) + session.commit() + + +def _create_template_overview_tables( + session: sqlmodel.Session, + schema: str, + table_name: str, +) -> list[str]: + overview_table_names = [ + raster.OVERVIEW_TABLE_NAME.format(level=level, table_name=table_name) + for level in raster.DEFAULT_OVERVIEW_LEVELS + ] + for level, overview_table_name in zip( + raster.DEFAULT_OVERVIEW_LEVELS, + overview_table_names, + strict=True, + ): + _create_template_raster_table( + session, + schema, + overview_table_name, + pixel_size=env.DEM_PIXEL_SIZE * level, + ) + raster._register_overview_table( + session, schema, table_name, overview_table_name, level + ) + raster._create_raster_index(session, schema, overview_table_name) + session.commit() + return overview_table_names def _assert_table_exists( @@ -152,6 +200,8 @@ def test_initialize_raster_table( """Test creating a raster table with varying numbers of staging tables.""" table_name = "test_raster_table" schema = Schema.PROCESSING.value + _create_template_raster_table(processing_worker_db, schema, table_name) + raster.initialize_raster_table( table_name=table_name, schema=schema, @@ -189,6 +239,7 @@ def test_test_merge_staging_tables_with_no_staging_tables_creates_rast_index( ): table_name = "test_raster_merge_no_staging" schema = Schema.PROCESSING.value + _create_template_raster_table(processing_worker_db, schema, table_name) raster.initialize_raster_table( table_name=table_name, @@ -215,6 +266,7 @@ def test_merge_staging_tables(processing_worker_db: sqlmodel.Session): schema = Schema.PROCESSING.value staging_tables = 3 rows_per_staging = 1 + _create_template_raster_table(processing_worker_db, schema, table_name) # Initialize table with staging tables raster.initialize_raster_table( @@ -279,6 +331,7 @@ def test_merge_staging_tables_uses_main_table_rid_sequence( """Test merging staging tables assigns rids from the main table sequence.""" table_name = "test_raster_merge_rid_sequence" schema = Schema.PROCESSING.value + _create_template_raster_table(processing_worker_db, schema, table_name) raster.initialize_raster_table( table_name=table_name, @@ -343,6 +396,13 @@ def extra_columns() -> list[sa.Column]: sa.Column("is_private", sa.Boolean()), ] + _create_template_raster_table( + processing_worker_db, + schema, + table_name, + extra_columns=extra_columns(), + ) + raster.initialize_raster_table( table_name=table_name, schema=schema, @@ -381,6 +441,7 @@ def test_initialize_raster_table_twice( """Test calling initialize_raster_table twice.""" table_name = "test_initialize_table_twice" schema = Schema.PROCESSING.value + _create_template_raster_table(processing_worker_db, schema, table_name) # Initialize twice raster.initialize_raster_table( @@ -401,9 +462,15 @@ def test_initialize_raster_table_twice( def test_initialize_overview_tables( processing_worker_db: sqlmodel.Session, staging_tables: int ): - """Test creating, registering and indexing overview tables.""" + """Test creating overview staging tables for existing overview tables.""" table_name = "test_raster_overview" schema = Schema.PROCESSING.value + _create_template_raster_table(processing_worker_db, schema, table_name) + overview_table_names = _create_template_overview_tables( + processing_worker_db, + schema, + table_name, + ) raster.initialize_raster_table( table_name=table_name, @@ -418,11 +485,6 @@ def test_initialize_overview_tables( staging_tables=staging_tables, ) - overview_table_names = [ - raster.OVERVIEW_TABLE_NAME.format(level=level, table_name=table_name) - for level in raster.DEFAULT_OVERVIEW_LEVELS - ] - for overview_table_name in overview_table_names: _assert_table_exists(processing_worker_db, schema, overview_table_name) _assert_table_has_default_columns( @@ -457,6 +519,7 @@ def test_initialize_overview_tables( def test_add_raster_constraints(processing_worker_db: sqlmodel.Session): table_name = "dem" schema = Schema.PROCESSING.value + _create_template_raster_table(processing_worker_db, schema, table_name) raster.initialize_raster_table( table_name=table_name, schema=schema, @@ -484,9 +547,11 @@ def test_add_raster_constraints(processing_worker_db: sqlmodel.Session): def test_register_overview(processing_worker_db: sqlmodel.Session): - """Test overview tables are registered during initialization.""" + """Test existing overview table registrations remain available.""" table_name = "dem" schema = Schema.PROCESSING.value + _create_template_raster_table(processing_worker_db, schema, table_name) + _create_template_overview_tables(processing_worker_db, schema, table_name) raster.initialize_raster_table( table_name=table_name, schema=schema, @@ -520,3 +585,69 @@ def test_register_overview(processing_worker_db: sqlmodel.Session): _assert_table_index_count( processing_worker_db, schema, overview_name, expected_count=2 ) + + +def test_initialize_raster_table_does_not_create_staging_when_main_table_is_missing( + processing_worker_db: sqlmodel.Session, +): + table_name = "test_missing_main_table" + schema = Schema.PROCESSING.value + staging_tables = 2 + + with pytest.raises(ValueError, match=rf"{schema}\.{table_name}"): + raster.initialize_raster_table( + table_name=table_name, + schema=schema, + session=processing_worker_db, + staging_tables=staging_tables, + ) + + _assert_staging_tables_does_not_exist(processing_worker_db, schema, table_name) + + +def test_initialize_overview_tables_does_not_create_staging_when_overview_is_missing( + processing_worker_db: sqlmodel.Session, +): + table_name = "test_missing_overview_table" + schema = Schema.PROCESSING.value + staging_tables = 2 + _create_template_raster_table(processing_worker_db, schema, table_name) + missing_level = raster.DEFAULT_OVERVIEW_LEVELS[-1] + + for level in raster.DEFAULT_OVERVIEW_LEVELS: + if level == missing_level: + continue + overview_name = raster.OVERVIEW_TABLE_NAME.format( + level=level, + table_name=table_name, + ) + _create_template_raster_table( + processing_worker_db, + schema, + overview_name, + pixel_size=env.DEM_PIXEL_SIZE * level, + ) + + missing_overview_name = raster.OVERVIEW_TABLE_NAME.format( + level=missing_level, + table_name=table_name, + ) + + with pytest.raises(ValueError, match=rf"{schema}\.{missing_overview_name}"): + raster.initialize_overview_tables( + table_name=table_name, + schema=schema, + session=processing_worker_db, + staging_tables=staging_tables, + ) + + for level in raster.DEFAULT_OVERVIEW_LEVELS: + overview_name = raster.OVERVIEW_TABLE_NAME.format( + level=level, + table_name=table_name, + ) + _assert_staging_tables_does_not_exist( + processing_worker_db, + schema, + overview_name, + ) diff --git a/components/processing/test_integration_processing/pipelines/test_blast2dem_pipelines.py b/components/processing/test_integration_processing/pipelines/test_blast2dem_pipelines.py index 7ff3dcdd..ea5e0711 100644 --- a/components/processing/test_integration_processing/pipelines/test_blast2dem_pipelines.py +++ b/components/processing/test_integration_processing/pipelines/test_blast2dem_pipelines.py @@ -55,7 +55,7 @@ def test_blast2dem_to_geotiff( def test_blast2dem_to_postgis( processing_worker_session: "Session", ) -> None: - table_name = "test_blast2dem" + table_name = "dem" schema = "processing" staging_tables = 2 ol_2_name = f"o_2_{table_name}" diff --git a/components/processing/test_integration_processing/pipelines/test_rasterio_to_postgis_pipeline.py b/components/processing/test_integration_processing/pipelines/test_rasterio_to_postgis_pipeline.py index d14b1937..290b0d6f 100644 --- a/components/processing/test_integration_processing/pipelines/test_rasterio_to_postgis_pipeline.py +++ b/components/processing/test_integration_processing/pipelines/test_rasterio_to_postgis_pipeline.py @@ -185,7 +185,7 @@ def test_rasterio_to_postgis( processing_worker_session: "Session", ) -> None: - table_name = "test_raster_ol" + table_name = "dem" schema = "processing" staging_tables = 2 ol_2_name = f"o_2_{table_name}"