Skip to content

Commit 673356f

Browse files
committed
feat: add pgstac settings for use_queue and update_collection_extent
1 parent 70e8437 commit 673356f

3 files changed

Lines changed: 204 additions & 18 deletions

File tree

integration_tests/cdk/app.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,15 @@ def __init__(
9999
},
100100
removal_policy=RemovalPolicy.DESTROY,
101101
pgstac_version=PGSTAC_VERSION,
102+
parameters={
103+
"shared_preload_libraries": "pg_cron",
104+
},
105+
custom_resource_properties={
106+
"context": "TRUE",
107+
"update_collection_extent": "TRUE",
108+
"use_queue": "TRUE",
109+
"pg_cron_schedule": "*/10 * * * *", # every 10 minutes (default)
110+
},
102111
)
103112

104113
assert pgstac_db.security_group

lib/database/bootstrapper_runtime/handler.py

Lines changed: 157 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
logger = logging.getLogger("eoapi-bootstrap")
1818

1919

20+
DEFAULT_PG_CRON_SCHEDULE = "*/10 * * * *" # every 10 minutes
21+
22+
2023
def send(
2124
event,
2225
context,
@@ -143,30 +146,151 @@ def register_extensions(cursor) -> None:
143146
###############################################################################
144147
# PgSTAC Customization
145148
###############################################################################
146-
def customization(cursor, params) -> None:
149+
def _is_enabled(params: dict, key: str) -> bool:
150+
"""Parse a string boolean CloudFormation custom resource property.
151+
152+
CloudFormation custom resource properties are always strings, so boolean
153+
flags are passed as ``"TRUE"`` or ``"FALSE"``. Callers must check that the
154+
key is present before calling this function; absent keys are a no-op and
155+
should not reach this function.
156+
157+
Args:
158+
params: ResourceProperties dict from the CloudFormation event.
159+
key: Property name to look up.
160+
161+
Returns:
162+
True if the property value is ``"TRUE"`` (case-insensitive).
147163
"""
148-
CUSTOMIZED YOUR PGSTAC DATABASE
164+
return str(params.get(key, "FALSE")).upper() == "TRUE"
165+
166+
167+
def _apply_pgstac_setting(cursor, name: str, enabled: bool) -> None:
168+
"""Upsert or delete a row in the pgstac_settings table.
169+
170+
When ``enabled`` is True, upserts ``'on'``. When False, deletes the row so
171+
pgstac reverts to its built-in default. When the setting is absent from
172+
params entirely, this function should not be called — absent means no-op.
173+
174+
Args:
175+
cursor: Database cursor with ``search_path`` including the pgstac schema.
176+
name: Setting name (primary key in pgstac_settings).
177+
enabled: Whether to enable or disable the setting.
178+
"""
179+
if enabled:
180+
cursor.execute(
181+
sql.SQL(
182+
"INSERT INTO pgstac_settings (name, value) VALUES ({name}, 'on') "
183+
"ON CONFLICT ON CONSTRAINT pgstac_settings_pkey DO UPDATE SET value = excluded.value;"
184+
).format(name=sql.Literal(name))
185+
)
186+
else:
187+
cursor.execute(
188+
sql.SQL("DELETE FROM pgstac_settings WHERE name = {name};").format(
189+
name=sql.Literal(name)
190+
)
191+
)
192+
193+
194+
def customization(cursor, params) -> None:
195+
"""Apply pgSTAC database customizations based on the provided params.
196+
197+
Each setting follows honest boolean semantics:
198+
199+
- ``"TRUE"`` → enable (upsert ``'on'`` in pgstac_settings)
200+
- ``"FALSE"`` → disable (delete from pgstac_settings, reverting to pgstac default)
201+
- absent → no-op (the database is left exactly as-is)
202+
203+
This means the flags are safe to use as true booleans, and upgrading without
204+
changing config never silently alters a manually-managed setting.
149205
150206
ref: https://github.com/stac-utils/pgstac/blob/main/docs/src/pgstac.md
207+
"""
208+
if "context" in params:
209+
_apply_pgstac_setting(cursor, "context", _is_enabled(params, "context"))
210+
211+
if "mosaic_index" in params:
212+
if _is_enabled(params, "mosaic_index"):
213+
cursor.execute(
214+
sql.SQL(
215+
"CREATE INDEX IF NOT EXISTS searches_mosaic ON searches ((true)) "
216+
"WHERE metadata->>'type'='mosaic';"
217+
)
218+
)
219+
else:
220+
cursor.execute(sql.SQL("DROP INDEX IF EXISTS searches_mosaic;"))
221+
222+
# Enable automatic spatial/temporal extent updates on item ingest.
223+
# For large ingests, combine with use_queue=TRUE to reduce transaction overhead.
224+
if "update_collection_extent" in params:
225+
_apply_pgstac_setting(
226+
cursor,
227+
"update_collection_extent",
228+
_is_enabled(params, "update_collection_extent"),
229+
)
230+
231+
# Enable asynchronous queue processing for extent updates.
232+
# Requires pg_cron to periodically invoke CALL pgstac.run_queued_queries().
233+
if "use_queue" in params:
234+
_apply_pgstac_setting(cursor, "use_queue", _is_enabled(params, "use_queue"))
235+
236+
237+
def unregister_pg_cron(cursor) -> None:
238+
"""Remove the run_queued_queries pg_cron job if it exists.
239+
240+
Safe to call even if pg_cron is not installed or the job does not exist.
151241
242+
Args:
243+
cursor: Database cursor connected to the ``postgres`` database as a superuser.
152244
"""
153-
if str(params.get("context", "FALSE")).upper() == "TRUE":
154-
# Add CONTEXT=ON
155-
pgstac_settings = """
156-
INSERT INTO pgstac_settings (name, value)
157-
VALUES ('context', 'on')
158-
ON CONFLICT ON CONSTRAINT pgstac_settings_pkey DO UPDATE SET value = excluded.value;"""
159-
cursor.execute(sql.SQL(pgstac_settings))
160-
161-
if str(params.get("mosaic_index", "TRUE")).upper() == "TRUE":
162-
# Create index of searches with `mosaic`` type
245+
cursor.execute(sql.SQL("SELECT 1 FROM pg_extension WHERE extname = 'pg_cron';"))
246+
if cursor.fetchone():
163247
cursor.execute(
164248
sql.SQL(
165-
"CREATE INDEX IF NOT EXISTS searches_mosaic ON searches ((true)) WHERE metadata->>'type'='mosaic';"
249+
"SELECT cron.unschedule(jobid) FROM cron.job "
250+
"WHERE jobname = 'pgstac-run-queued-queries';"
166251
)
167252
)
168253

169254

255+
def register_pg_cron(cursor, db_name: str, schedule: str) -> None:
256+
"""Install the pg_cron extension and schedule run_queued_queries.
257+
258+
pg_cron can only be installed in the ``postgres`` database, so the cursor
259+
must be connected there as a superuser. ``cron.schedule_in_database`` is
260+
used to run the job against the pgSTAC database. The job is upserted by
261+
name so repeated bootstrap runs are safe.
262+
263+
pg_cron must be listed in ``shared_preload_libraries`` in the RDS parameter
264+
group before this will succeed.
265+
266+
Side effect: sets ``search_path = pgstac, public`` as the default for all
267+
connections to ``db_name`` via ``ALTER DATABASE``. This is required because
268+
pg_cron background worker sessions start with no ``search_path``, and
269+
prepending ``SET search_path`` to the cron command would open a transaction
270+
that prevents ``run_queued_queries()`` from issuing its own ``COMMIT``.
271+
272+
Args:
273+
cursor: Database cursor connected to the ``postgres`` database as a superuser.
274+
db_name: Name of the pgSTAC database where run_queued_queries will run.
275+
schedule: Cron schedule expression (e.g. ``"*/5 * * * *"``).
276+
"""
277+
cursor.execute(sql.SQL("CREATE EXTENSION IF NOT EXISTS pg_cron;"))
278+
cursor.execute(
279+
sql.SQL("ALTER DATABASE {db_name} SET search_path TO pgstac, public;").format(
280+
db_name=sql.Identifier(db_name),
281+
)
282+
)
283+
cursor.execute(
284+
sql.SQL(
285+
"SELECT cron.schedule_in_database({job_name}, {schedule}, 'CALL pgstac.run_queued_queries();', {db_name});"
286+
).format(
287+
job_name=sql.Literal("pgstac-run-queued-queries"),
288+
schedule=sql.Literal(schedule),
289+
db_name=sql.Literal(db_name),
290+
)
291+
)
292+
293+
170294
def handler(event, context):
171295
"""Lambda Handler."""
172296
print(f"Handling {event}")
@@ -213,6 +337,26 @@ def handler(event, context):
213337
password=eoapi_params["password"],
214338
)
215339

340+
if "use_queue" in params:
341+
if _is_enabled(params, "use_queue"):
342+
schedule = params.get(
343+
"pg_cron_schedule", DEFAULT_PG_CRON_SCHEDULE
344+
)
345+
print(
346+
f"Scheduling pg_cron job 'pgstac-run-queued-queries' "
347+
f"with schedule '{schedule}'..."
348+
)
349+
register_pg_cron(
350+
cursor=cur,
351+
db_name=eoapi_params["dbname"],
352+
schedule=schedule,
353+
)
354+
else:
355+
print(
356+
"Removing pg_cron job 'pgstac-run-queued-queries' if present..."
357+
)
358+
unregister_pg_cron(cursor=cur)
359+
216360
# Install postgis and pgstac on the eoapi database with
217361
# superuser permissions
218362
print(f"Connecting to eoAPI '{eoapi_params['dbname']}' database...")

lib/database/index.ts

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import * as path from "path";
2525
const instanceSizes: Record<string, number> = require("./instance-memory.json");
2626

2727
let defaultPgSTACCustomOptions: { [key: string]: any } = {
28-
context: "FALSE",
2928
mosaic_index: "TRUE",
3029
};
3130

@@ -135,6 +134,7 @@ export class PgStacDatabase extends Construct {
135134
props.instanceType?.toString() || "m5.large",
136135
props.parameters,
137136
);
137+
138138
const parameterGroup = new rds.ParameterGroup(this, "parameterGroup", {
139139
engine: props.engine,
140140
parameters: {
@@ -151,10 +151,11 @@ export class PgStacDatabase extends Construct {
151151
},
152152
});
153153

154+
const { parameters: _parameters, ...dbProps } = props;
154155
this.db = new rds.DatabaseInstance(this, "db", {
155156
instanceIdentifier: Stack.of(this).stackName,
156157
parameterGroup,
157-
...props,
158+
...dbProps,
158159
});
159160

160161
this.pgstacVersion = props.pgstacVersion || DEFAULT_PGSTAC_VERSION;
@@ -423,10 +424,42 @@ export interface PgStacDatabaseProps extends rds.DatabaseInstanceProps {
423424
readonly maintenanceWindow?: ssm.CfnMaintenanceWindow;
424425

425426
/**
426-
* Lambda function Custom Resource properties. A custom resource property is going to be created
427-
* to trigger the boostrapping lambda function. This parameter allows the user to specify additional properties
428-
* on top of the defaults ones.
427+
* Additional properties passed to the bootstrapper Lambda as CloudFormation
428+
* Custom Resource properties. These are merged with the defaults and forwarded
429+
* to the handler as `event["ResourceProperties"]`.
430+
*
431+
* ## Supported pgSTAC settings
432+
*
433+
* Each setting follows honest boolean semantics: `"TRUE"` enables, `"FALSE"`
434+
* disables (reverts to pgSTAC's built-in default), and omitting the key
435+
* entirely is a no-op — the database is left as-is. This means upgrading
436+
* without changing config never silently alters a manually-managed setting.
437+
*
438+
* | Property | Default | Description |
439+
* |----------------------------|----------|-------------|
440+
* | `context` | (unset) | Enable `CONTEXT=ON` in pgSTAC (item count on search). |
441+
* | `mosaic_index` | `"TRUE"` | Create a partial index on searches of type `mosaic`. Set `"FALSE"` to drop it. |
442+
* | `update_collection_extent` | (unset) | Automatically update collection spatial/temporal extents on item ingest. Combine with `use_queue` to reduce per-transaction overhead. |
443+
* | `use_queue` | (unset) | Process extent updates asynchronously via an internal queue. `"TRUE"` installs pg_cron and schedules the drain job; `"FALSE"` removes the job. Requires `pg_cron` (see below). |
444+
* | `pg_cron_schedule` | `"*\/10 * * * *"` | Cron schedule for `CALL run_queued_queries()`. Only used when `use_queue` is `"TRUE"`. |
445+
*
446+
* ## pg_cron requirement
447+
*
448+
* When `use_queue` is `"TRUE"`, the bootstrapper installs the `pg_cron`
449+
* extension and schedules a job to periodically drain the queue.
450+
* `pg_cron` must be present in `shared_preload_libraries` **before**
451+
* deployment or the bootstrap will fail. Add it via `props.parameters`:
452+
*
453+
* ```
454+
* parameters: { shared_preload_libraries: "pg_cron" }
455+
* ```
429456
*
457+
* @example
458+
* customResourceProperties: {
459+
* update_collection_extent: "TRUE",
460+
* use_queue: "TRUE",
461+
* pg_cron_schedule: "*\/10 * * * *",
462+
* }
430463
*/
431464
readonly customResourceProperties?: {
432465
[key: string]: any;

0 commit comments

Comments
 (0)