From 673356fb3ed660be3f6e8653568d2bc105a042be Mon Sep 17 00:00:00 2001 From: hrodmn Date: Sat, 21 Mar 2026 06:36:53 -0500 Subject: [PATCH] feat: add pgstac settings for use_queue and update_collection_extent --- integration_tests/cdk/app.py | 9 + lib/database/bootstrapper_runtime/handler.py | 170 +++++++++++++++++-- lib/database/index.ts | 43 ++++- 3 files changed, 204 insertions(+), 18 deletions(-) diff --git a/integration_tests/cdk/app.py b/integration_tests/cdk/app.py index aeb21c2..84fd2fb 100644 --- a/integration_tests/cdk/app.py +++ b/integration_tests/cdk/app.py @@ -99,6 +99,15 @@ def __init__( }, removal_policy=RemovalPolicy.DESTROY, pgstac_version=PGSTAC_VERSION, + parameters={ + "shared_preload_libraries": "pg_cron", + }, + custom_resource_properties={ + "context": "TRUE", + "update_collection_extent": "TRUE", + "use_queue": "TRUE", + "pg_cron_schedule": "*/10 * * * *", # every 10 minutes (default) + }, ) assert pgstac_db.security_group diff --git a/lib/database/bootstrapper_runtime/handler.py b/lib/database/bootstrapper_runtime/handler.py index f60d06b..0b03582 100644 --- a/lib/database/bootstrapper_runtime/handler.py +++ b/lib/database/bootstrapper_runtime/handler.py @@ -17,6 +17,9 @@ logger = logging.getLogger("eoapi-bootstrap") +DEFAULT_PG_CRON_SCHEDULE = "*/10 * * * *" # every 10 minutes + + def send( event, context, @@ -143,30 +146,151 @@ def register_extensions(cursor) -> None: ############################################################################### # PgSTAC Customization ############################################################################### -def customization(cursor, params) -> None: +def _is_enabled(params: dict, key: str) -> bool: + """Parse a string boolean CloudFormation custom resource property. + + CloudFormation custom resource properties are always strings, so boolean + flags are passed as ``"TRUE"`` or ``"FALSE"``. Callers must check that the + key is present before calling this function; absent keys are a no-op and + should not reach this function. + + Args: + params: ResourceProperties dict from the CloudFormation event. + key: Property name to look up. + + Returns: + True if the property value is ``"TRUE"`` (case-insensitive). """ - CUSTOMIZED YOUR PGSTAC DATABASE + return str(params.get(key, "FALSE")).upper() == "TRUE" + + +def _apply_pgstac_setting(cursor, name: str, enabled: bool) -> None: + """Upsert or delete a row in the pgstac_settings table. + + When ``enabled`` is True, upserts ``'on'``. When False, deletes the row so + pgstac reverts to its built-in default. When the setting is absent from + params entirely, this function should not be called — absent means no-op. + + Args: + cursor: Database cursor with ``search_path`` including the pgstac schema. + name: Setting name (primary key in pgstac_settings). + enabled: Whether to enable or disable the setting. + """ + if enabled: + cursor.execute( + sql.SQL( + "INSERT INTO pgstac_settings (name, value) VALUES ({name}, 'on') " + "ON CONFLICT ON CONSTRAINT pgstac_settings_pkey DO UPDATE SET value = excluded.value;" + ).format(name=sql.Literal(name)) + ) + else: + cursor.execute( + sql.SQL("DELETE FROM pgstac_settings WHERE name = {name};").format( + name=sql.Literal(name) + ) + ) + + +def customization(cursor, params) -> None: + """Apply pgSTAC database customizations based on the provided params. + + Each setting follows honest boolean semantics: + + - ``"TRUE"`` → enable (upsert ``'on'`` in pgstac_settings) + - ``"FALSE"`` → disable (delete from pgstac_settings, reverting to pgstac default) + - absent → no-op (the database is left exactly as-is) + + This means the flags are safe to use as true booleans, and upgrading without + changing config never silently alters a manually-managed setting. ref: https://github.com/stac-utils/pgstac/blob/main/docs/src/pgstac.md + """ + if "context" in params: + _apply_pgstac_setting(cursor, "context", _is_enabled(params, "context")) + + if "mosaic_index" in params: + if _is_enabled(params, "mosaic_index"): + cursor.execute( + sql.SQL( + "CREATE INDEX IF NOT EXISTS searches_mosaic ON searches ((true)) " + "WHERE metadata->>'type'='mosaic';" + ) + ) + else: + cursor.execute(sql.SQL("DROP INDEX IF EXISTS searches_mosaic;")) + + # Enable automatic spatial/temporal extent updates on item ingest. + # For large ingests, combine with use_queue=TRUE to reduce transaction overhead. + if "update_collection_extent" in params: + _apply_pgstac_setting( + cursor, + "update_collection_extent", + _is_enabled(params, "update_collection_extent"), + ) + + # Enable asynchronous queue processing for extent updates. + # Requires pg_cron to periodically invoke CALL pgstac.run_queued_queries(). + if "use_queue" in params: + _apply_pgstac_setting(cursor, "use_queue", _is_enabled(params, "use_queue")) + + +def unregister_pg_cron(cursor) -> None: + """Remove the run_queued_queries pg_cron job if it exists. + + Safe to call even if pg_cron is not installed or the job does not exist. + Args: + cursor: Database cursor connected to the ``postgres`` database as a superuser. """ - if str(params.get("context", "FALSE")).upper() == "TRUE": - # Add CONTEXT=ON - pgstac_settings = """ - INSERT INTO pgstac_settings (name, value) - VALUES ('context', 'on') - ON CONFLICT ON CONSTRAINT pgstac_settings_pkey DO UPDATE SET value = excluded.value;""" - cursor.execute(sql.SQL(pgstac_settings)) - - if str(params.get("mosaic_index", "TRUE")).upper() == "TRUE": - # Create index of searches with `mosaic`` type + cursor.execute(sql.SQL("SELECT 1 FROM pg_extension WHERE extname = 'pg_cron';")) + if cursor.fetchone(): cursor.execute( sql.SQL( - "CREATE INDEX IF NOT EXISTS searches_mosaic ON searches ((true)) WHERE metadata->>'type'='mosaic';" + "SELECT cron.unschedule(jobid) FROM cron.job " + "WHERE jobname = 'pgstac-run-queued-queries';" ) ) +def register_pg_cron(cursor, db_name: str, schedule: str) -> None: + """Install the pg_cron extension and schedule run_queued_queries. + + pg_cron can only be installed in the ``postgres`` database, so the cursor + must be connected there as a superuser. ``cron.schedule_in_database`` is + used to run the job against the pgSTAC database. The job is upserted by + name so repeated bootstrap runs are safe. + + pg_cron must be listed in ``shared_preload_libraries`` in the RDS parameter + group before this will succeed. + + Side effect: sets ``search_path = pgstac, public`` as the default for all + connections to ``db_name`` via ``ALTER DATABASE``. This is required because + pg_cron background worker sessions start with no ``search_path``, and + prepending ``SET search_path`` to the cron command would open a transaction + that prevents ``run_queued_queries()`` from issuing its own ``COMMIT``. + + Args: + cursor: Database cursor connected to the ``postgres`` database as a superuser. + db_name: Name of the pgSTAC database where run_queued_queries will run. + schedule: Cron schedule expression (e.g. ``"*/5 * * * *"``). + """ + cursor.execute(sql.SQL("CREATE EXTENSION IF NOT EXISTS pg_cron;")) + cursor.execute( + sql.SQL("ALTER DATABASE {db_name} SET search_path TO pgstac, public;").format( + db_name=sql.Identifier(db_name), + ) + ) + cursor.execute( + sql.SQL( + "SELECT cron.schedule_in_database({job_name}, {schedule}, 'CALL pgstac.run_queued_queries();', {db_name});" + ).format( + job_name=sql.Literal("pgstac-run-queued-queries"), + schedule=sql.Literal(schedule), + db_name=sql.Literal(db_name), + ) + ) + + def handler(event, context): """Lambda Handler.""" print(f"Handling {event}") @@ -213,6 +337,26 @@ def handler(event, context): password=eoapi_params["password"], ) + if "use_queue" in params: + if _is_enabled(params, "use_queue"): + schedule = params.get( + "pg_cron_schedule", DEFAULT_PG_CRON_SCHEDULE + ) + print( + f"Scheduling pg_cron job 'pgstac-run-queued-queries' " + f"with schedule '{schedule}'..." + ) + register_pg_cron( + cursor=cur, + db_name=eoapi_params["dbname"], + schedule=schedule, + ) + else: + print( + "Removing pg_cron job 'pgstac-run-queued-queries' if present..." + ) + unregister_pg_cron(cursor=cur) + # Install postgis and pgstac on the eoapi database with # superuser permissions print(f"Connecting to eoAPI '{eoapi_params['dbname']}' database...") diff --git a/lib/database/index.ts b/lib/database/index.ts index a2d60a4..e074424 100644 --- a/lib/database/index.ts +++ b/lib/database/index.ts @@ -25,7 +25,6 @@ import * as path from "path"; const instanceSizes: Record = require("./instance-memory.json"); let defaultPgSTACCustomOptions: { [key: string]: any } = { - context: "FALSE", mosaic_index: "TRUE", }; @@ -135,6 +134,7 @@ export class PgStacDatabase extends Construct { props.instanceType?.toString() || "m5.large", props.parameters, ); + const parameterGroup = new rds.ParameterGroup(this, "parameterGroup", { engine: props.engine, parameters: { @@ -151,10 +151,11 @@ export class PgStacDatabase extends Construct { }, }); + const { parameters: _parameters, ...dbProps } = props; this.db = new rds.DatabaseInstance(this, "db", { instanceIdentifier: Stack.of(this).stackName, parameterGroup, - ...props, + ...dbProps, }); this.pgstacVersion = props.pgstacVersion || DEFAULT_PGSTAC_VERSION; @@ -423,10 +424,42 @@ export interface PgStacDatabaseProps extends rds.DatabaseInstanceProps { readonly maintenanceWindow?: ssm.CfnMaintenanceWindow; /** - * Lambda function Custom Resource properties. A custom resource property is going to be created - * to trigger the boostrapping lambda function. This parameter allows the user to specify additional properties - * on top of the defaults ones. + * Additional properties passed to the bootstrapper Lambda as CloudFormation + * Custom Resource properties. These are merged with the defaults and forwarded + * to the handler as `event["ResourceProperties"]`. + * + * ## Supported pgSTAC settings + * + * Each setting follows honest boolean semantics: `"TRUE"` enables, `"FALSE"` + * disables (reverts to pgSTAC's built-in default), and omitting the key + * entirely is a no-op — the database is left as-is. This means upgrading + * without changing config never silently alters a manually-managed setting. + * + * | Property | Default | Description | + * |----------------------------|----------|-------------| + * | `context` | (unset) | Enable `CONTEXT=ON` in pgSTAC (item count on search). | + * | `mosaic_index` | `"TRUE"` | Create a partial index on searches of type `mosaic`. Set `"FALSE"` to drop it. | + * | `update_collection_extent` | (unset) | Automatically update collection spatial/temporal extents on item ingest. Combine with `use_queue` to reduce per-transaction overhead. | + * | `use_queue` | (unset) | Process extent updates asynchronously via an internal queue. `"TRUE"` installs pg_cron and schedules the drain job; `"FALSE"` removes the job. Requires `pg_cron` (see below). | + * | `pg_cron_schedule` | `"*\/10 * * * *"` | Cron schedule for `CALL run_queued_queries()`. Only used when `use_queue` is `"TRUE"`. | + * + * ## pg_cron requirement + * + * When `use_queue` is `"TRUE"`, the bootstrapper installs the `pg_cron` + * extension and schedules a job to periodically drain the queue. + * `pg_cron` must be present in `shared_preload_libraries` **before** + * deployment or the bootstrap will fail. Add it via `props.parameters`: + * + * ``` + * parameters: { shared_preload_libraries: "pg_cron" } + * ``` * + * @example + * customResourceProperties: { + * update_collection_extent: "TRUE", + * use_queue: "TRUE", + * pg_cron_schedule: "*\/10 * * * *", + * } */ readonly customResourceProperties?: { [key: string]: any;