Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions integration_tests/cdk/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ def __init__(
},
removal_policy=RemovalPolicy.DESTROY,
pgstac_version=PGSTAC_VERSION,
parameters={
"shared_preload_libraries": "pg_cron",
},
custom_resource_properties={
"context": "TRUE",
"update_collection_extent": "TRUE",
"use_queue": "TRUE",
"pg_cron_schedule": "*/10 * * * *", # every 10 minutes (default)
},
)

assert pgstac_db.security_group
Expand Down
170 changes: 157 additions & 13 deletions lib/database/bootstrapper_runtime/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
logger = logging.getLogger("eoapi-bootstrap")


DEFAULT_PG_CRON_SCHEDULE = "*/10 * * * *" # every 10 minutes


def send(
event,
context,
Expand Down Expand Up @@ -143,30 +146,151 @@ def register_extensions(cursor) -> None:
###############################################################################
# PgSTAC Customization
###############################################################################
def customization(cursor, params) -> None:
def _is_enabled(params: dict, key: str) -> bool:
"""Parse a string boolean CloudFormation custom resource property.

CloudFormation custom resource properties are always strings, so boolean
flags are passed as ``"TRUE"`` or ``"FALSE"``. Callers must check that the
key is present before calling this function; absent keys are a no-op and
should not reach this function.

Args:
params: ResourceProperties dict from the CloudFormation event.
key: Property name to look up.

Returns:
True if the property value is ``"TRUE"`` (case-insensitive).
"""
CUSTOMIZED YOUR PGSTAC DATABASE
return str(params.get(key, "FALSE")).upper() == "TRUE"


def _apply_pgstac_setting(cursor, name: str, enabled: bool) -> None:
"""Upsert or delete a row in the pgstac_settings table.

When ``enabled`` is True, upserts ``'on'``. When False, deletes the row so
pgstac reverts to its built-in default. When the setting is absent from
params entirely, this function should not be called — absent means no-op.

Args:
cursor: Database cursor with ``search_path`` including the pgstac schema.
name: Setting name (primary key in pgstac_settings).
enabled: Whether to enable or disable the setting.
"""
if enabled:
cursor.execute(
sql.SQL(
"INSERT INTO pgstac_settings (name, value) VALUES ({name}, 'on') "
"ON CONFLICT ON CONSTRAINT pgstac_settings_pkey DO UPDATE SET value = excluded.value;"
).format(name=sql.Literal(name))
)
else:
cursor.execute(
sql.SQL("DELETE FROM pgstac_settings WHERE name = {name};").format(
name=sql.Literal(name)
)
)


def customization(cursor, params) -> None:
"""Apply pgSTAC database customizations based on the provided params.

Each setting follows honest boolean semantics:

- ``"TRUE"`` → enable (upsert ``'on'`` in pgstac_settings)
- ``"FALSE"`` → disable (delete from pgstac_settings, reverting to pgstac default)
- absent → no-op (the database is left exactly as-is)

This means the flags are safe to use as true booleans, and upgrading without
changing config never silently alters a manually-managed setting.

ref: https://github.com/stac-utils/pgstac/blob/main/docs/src/pgstac.md
"""
if "context" in params:
_apply_pgstac_setting(cursor, "context", _is_enabled(params, "context"))

if "mosaic_index" in params:
if _is_enabled(params, "mosaic_index"):
cursor.execute(
sql.SQL(
"CREATE INDEX IF NOT EXISTS searches_mosaic ON searches ((true)) "
"WHERE metadata->>'type'='mosaic';"
)
)
else:
cursor.execute(sql.SQL("DROP INDEX IF EXISTS searches_mosaic;"))

# Enable automatic spatial/temporal extent updates on item ingest.
# For large ingests, combine with use_queue=TRUE to reduce transaction overhead.
if "update_collection_extent" in params:
_apply_pgstac_setting(
cursor,
"update_collection_extent",
_is_enabled(params, "update_collection_extent"),
)

# Enable asynchronous queue processing for extent updates.
# Requires pg_cron to periodically invoke CALL pgstac.run_queued_queries().
if "use_queue" in params:
_apply_pgstac_setting(cursor, "use_queue", _is_enabled(params, "use_queue"))


def unregister_pg_cron(cursor) -> None:
"""Remove the run_queued_queries pg_cron job if it exists.

Safe to call even if pg_cron is not installed or the job does not exist.

Args:
cursor: Database cursor connected to the ``postgres`` database as a superuser.
"""
if str(params.get("context", "FALSE")).upper() == "TRUE":
# Add CONTEXT=ON
pgstac_settings = """
INSERT INTO pgstac_settings (name, value)
VALUES ('context', 'on')
ON CONFLICT ON CONSTRAINT pgstac_settings_pkey DO UPDATE SET value = excluded.value;"""
cursor.execute(sql.SQL(pgstac_settings))

if str(params.get("mosaic_index", "TRUE")).upper() == "TRUE":
# Create index of searches with `mosaic`` type
cursor.execute(sql.SQL("SELECT 1 FROM pg_extension WHERE extname = 'pg_cron';"))
if cursor.fetchone():
cursor.execute(
sql.SQL(
"CREATE INDEX IF NOT EXISTS searches_mosaic ON searches ((true)) WHERE metadata->>'type'='mosaic';"
"SELECT cron.unschedule(jobid) FROM cron.job "
"WHERE jobname = 'pgstac-run-queued-queries';"
)
)


def register_pg_cron(cursor, db_name: str, schedule: str) -> None:
"""Install the pg_cron extension and schedule run_queued_queries.

pg_cron can only be installed in the ``postgres`` database, so the cursor
must be connected there as a superuser. ``cron.schedule_in_database`` is
used to run the job against the pgSTAC database. The job is upserted by
name so repeated bootstrap runs are safe.

pg_cron must be listed in ``shared_preload_libraries`` in the RDS parameter
group before this will succeed.

Side effect: sets ``search_path = pgstac, public`` as the default for all
connections to ``db_name`` via ``ALTER DATABASE``. This is required because
pg_cron background worker sessions start with no ``search_path``, and
prepending ``SET search_path`` to the cron command would open a transaction
that prevents ``run_queued_queries()`` from issuing its own ``COMMIT``.

Args:
cursor: Database cursor connected to the ``postgres`` database as a superuser.
db_name: Name of the pgSTAC database where run_queued_queries will run.
schedule: Cron schedule expression (e.g. ``"*/5 * * * *"``).
"""
cursor.execute(sql.SQL("CREATE EXTENSION IF NOT EXISTS pg_cron;"))
cursor.execute(
sql.SQL("ALTER DATABASE {db_name} SET search_path TO pgstac, public;").format(
db_name=sql.Identifier(db_name),
)
)
cursor.execute(
sql.SQL(
"SELECT cron.schedule_in_database({job_name}, {schedule}, 'CALL pgstac.run_queued_queries();', {db_name});"
).format(
job_name=sql.Literal("pgstac-run-queued-queries"),
schedule=sql.Literal(schedule),
db_name=sql.Literal(db_name),
)
)


def handler(event, context):
"""Lambda Handler."""
print(f"Handling {event}")
Expand Down Expand Up @@ -213,6 +337,26 @@ def handler(event, context):
password=eoapi_params["password"],
)

if "use_queue" in params:
if _is_enabled(params, "use_queue"):
schedule = params.get(
"pg_cron_schedule", DEFAULT_PG_CRON_SCHEDULE
)
print(
f"Scheduling pg_cron job 'pgstac-run-queued-queries' "
f"with schedule '{schedule}'..."
)
register_pg_cron(
cursor=cur,
db_name=eoapi_params["dbname"],
schedule=schedule,
)
else:
print(
"Removing pg_cron job 'pgstac-run-queued-queries' if present..."
)
unregister_pg_cron(cursor=cur)

# Install postgis and pgstac on the eoapi database with
# superuser permissions
print(f"Connecting to eoAPI '{eoapi_params['dbname']}' database...")
Expand Down
43 changes: 38 additions & 5 deletions lib/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import * as path from "path";
const instanceSizes: Record<string, number> = require("./instance-memory.json");

let defaultPgSTACCustomOptions: { [key: string]: any } = {
context: "FALSE",
mosaic_index: "TRUE",
};

Expand Down Expand Up @@ -135,6 +134,7 @@ export class PgStacDatabase extends Construct {
props.instanceType?.toString() || "m5.large",
props.parameters,
);

const parameterGroup = new rds.ParameterGroup(this, "parameterGroup", {
engine: props.engine,
parameters: {
Expand All @@ -151,10 +151,11 @@ export class PgStacDatabase extends Construct {
},
});

const { parameters: _parameters, ...dbProps } = props;
this.db = new rds.DatabaseInstance(this, "db", {
instanceIdentifier: Stack.of(this).stackName,
parameterGroup,
...props,
...dbProps,
});

this.pgstacVersion = props.pgstacVersion || DEFAULT_PGSTAC_VERSION;
Expand Down Expand Up @@ -423,10 +424,42 @@ export interface PgStacDatabaseProps extends rds.DatabaseInstanceProps {
readonly maintenanceWindow?: ssm.CfnMaintenanceWindow;

/**
* Lambda function Custom Resource properties. A custom resource property is going to be created
* to trigger the boostrapping lambda function. This parameter allows the user to specify additional properties
* on top of the defaults ones.
* Additional properties passed to the bootstrapper Lambda as CloudFormation
* Custom Resource properties. These are merged with the defaults and forwarded
* to the handler as `event["ResourceProperties"]`.
*
* ## Supported pgSTAC settings
*
* Each setting follows honest boolean semantics: `"TRUE"` enables, `"FALSE"`
* disables (reverts to pgSTAC's built-in default), and omitting the key
* entirely is a no-op — the database is left as-is. This means upgrading
* without changing config never silently alters a manually-managed setting.
*
* | Property | Default | Description |
* |----------------------------|----------|-------------|
* | `context` | (unset) | Enable `CONTEXT=ON` in pgSTAC (item count on search). |
* | `mosaic_index` | `"TRUE"` | Create a partial index on searches of type `mosaic`. Set `"FALSE"` to drop it. |
* | `update_collection_extent` | (unset) | Automatically update collection spatial/temporal extents on item ingest. Combine with `use_queue` to reduce per-transaction overhead. |
* | `use_queue` | (unset) | Process extent updates asynchronously via an internal queue. `"TRUE"` installs pg_cron and schedules the drain job; `"FALSE"` removes the job. Requires `pg_cron` (see below). |
* | `pg_cron_schedule` | `"*\/10 * * * *"` | Cron schedule for `CALL run_queued_queries()`. Only used when `use_queue` is `"TRUE"`. |
*
* ## pg_cron requirement
*
* When `use_queue` is `"TRUE"`, the bootstrapper installs the `pg_cron`
* extension and schedules a job to periodically drain the queue.
* `pg_cron` must be present in `shared_preload_libraries` **before**
* deployment or the bootstrap will fail. Add it via `props.parameters`:
*
* ```
* parameters: { shared_preload_libraries: "pg_cron" }
* ```
*
* @example
* customResourceProperties: {
* update_collection_extent: "TRUE",
* use_queue: "TRUE",
* pg_cron_schedule: "*\/10 * * * *",
* }
*/
readonly customResourceProperties?: {
[key: string]: any;
Expand Down
Loading