|
17 | 17 | logger = logging.getLogger("eoapi-bootstrap") |
18 | 18 |
|
19 | 19 |
|
| 20 | +DEFAULT_PG_CRON_SCHEDULE = "*/10 * * * *" # every 10 minutes |
| 21 | + |
| 22 | + |
20 | 23 | def send( |
21 | 24 | event, |
22 | 25 | context, |
@@ -143,30 +146,149 @@ def register_extensions(cursor) -> None: |
143 | 146 | ############################################################################### |
144 | 147 | # PgSTAC Customization |
145 | 148 | ############################################################################### |
146 | | -def customization(cursor, params) -> None: |
| 149 | +def _is_enabled(params: dict, key: str) -> bool: |
| 150 | + """Parse a string boolean CloudFormation custom resource property. |
| 151 | +
|
| 152 | + CloudFormation custom resource properties are always strings, so boolean |
| 153 | + flags are passed as ``"TRUE"`` or ``"FALSE"``. Callers must check that the |
| 154 | + key is present before calling this function; absent keys are a no-op and |
| 155 | + should not reach this function. |
| 156 | +
|
| 157 | + Args: |
| 158 | + params: ResourceProperties dict from the CloudFormation event. |
| 159 | + key: Property name to look up. |
| 160 | +
|
| 161 | + Returns: |
| 162 | + True if the property value is ``"TRUE"`` (case-insensitive). |
147 | 163 | """ |
148 | | - CUSTOMIZED YOUR PGSTAC DATABASE |
| 164 | + return str(params.get(key, "FALSE")).upper() == "TRUE" |
| 165 | + |
| 166 | + |
| 167 | +def _apply_pgstac_setting(cursor, name: str, enabled: bool) -> None: |
| 168 | + """Upsert or delete a row in the pgstac_settings table. |
| 169 | +
|
| 170 | + When ``enabled`` is True, upserts ``'on'``. When False, deletes the row so |
| 171 | + pgstac reverts to its built-in default. When the setting is absent from |
| 172 | + params entirely, this function should not be called — absent means no-op. |
| 173 | +
|
| 174 | + Args: |
| 175 | + cursor: Database cursor with ``search_path`` including the pgstac schema. |
| 176 | + name: Setting name (primary key in pgstac_settings). |
| 177 | + enabled: Whether to enable or disable the setting. |
| 178 | + """ |
| 179 | + if enabled: |
| 180 | + cursor.execute( |
| 181 | + sql.SQL( |
| 182 | + "INSERT INTO pgstac_settings (name, value) VALUES ({name}, 'on') " |
| 183 | + "ON CONFLICT ON CONSTRAINT pgstac_settings_pkey DO UPDATE SET value = excluded.value;" |
| 184 | + ).format(name=sql.Literal(name)) |
| 185 | + ) |
| 186 | + else: |
| 187 | + cursor.execute( |
| 188 | + sql.SQL("DELETE FROM pgstac_settings WHERE name = {name};").format( |
| 189 | + name=sql.Literal(name) |
| 190 | + ) |
| 191 | + ) |
| 192 | + |
| 193 | + |
| 194 | +def customization(cursor, params) -> None: |
| 195 | + """Apply pgSTAC database customizations based on the provided params. |
| 196 | +
|
| 197 | + Each setting follows honest boolean semantics: |
| 198 | +
|
| 199 | + - ``"TRUE"`` → enable (upsert ``'on'`` in pgstac_settings) |
| 200 | + - ``"FALSE"`` → disable (delete from pgstac_settings, reverting to pgstac default) |
| 201 | + - absent → no-op (the database is left exactly as-is) |
| 202 | +
|
| 203 | + This means the flags are safe to use as true booleans, and upgrading without |
| 204 | + changing config never silently alters a manually-managed setting. |
149 | 205 |
|
150 | 206 | ref: https://github.com/stac-utils/pgstac/blob/main/docs/src/pgstac.md |
| 207 | + """ |
| 208 | + if "context" in params: |
| 209 | + _apply_pgstac_setting(cursor, "context", _is_enabled(params, "context")) |
| 210 | + |
| 211 | + if "mosaic_index" in params: |
| 212 | + if _is_enabled(params, "mosaic_index"): |
| 213 | + cursor.execute( |
| 214 | + sql.SQL( |
| 215 | + "CREATE INDEX IF NOT EXISTS searches_mosaic ON searches ((true)) " |
| 216 | + "WHERE metadata->>'type'='mosaic';" |
| 217 | + ) |
| 218 | + ) |
| 219 | + else: |
| 220 | + cursor.execute(sql.SQL("DROP INDEX IF EXISTS searches_mosaic;")) |
| 221 | + |
| 222 | + # Enable automatic spatial/temporal extent updates on item ingest. |
| 223 | + # For large ingests, combine with use_queue=TRUE to reduce transaction overhead. |
| 224 | + if "update_collection_extent" in params: |
| 225 | + _apply_pgstac_setting( |
| 226 | + cursor, "update_collection_extent", _is_enabled(params, "update_collection_extent") |
| 227 | + ) |
| 228 | + |
| 229 | + # Enable asynchronous queue processing for extent updates. |
| 230 | + # Requires pg_cron to periodically invoke CALL pgstac.run_queued_queries(). |
| 231 | + if "use_queue" in params: |
| 232 | + _apply_pgstac_setting(cursor, "use_queue", _is_enabled(params, "use_queue")) |
| 233 | + |
| 234 | + |
| 235 | +def unregister_pg_cron(cursor) -> None: |
| 236 | + """Remove the run_queued_queries pg_cron job if it exists. |
| 237 | +
|
| 238 | + Safe to call even if pg_cron is not installed or the job does not exist. |
151 | 239 |
|
| 240 | + Args: |
| 241 | + cursor: Database cursor connected to the ``postgres`` database as a superuser. |
152 | 242 | """ |
153 | | - if str(params.get("context", "FALSE")).upper() == "TRUE": |
154 | | - # Add CONTEXT=ON |
155 | | - pgstac_settings = """ |
156 | | - INSERT INTO pgstac_settings (name, value) |
157 | | - VALUES ('context', 'on') |
158 | | - ON CONFLICT ON CONSTRAINT pgstac_settings_pkey DO UPDATE SET value = excluded.value;""" |
159 | | - cursor.execute(sql.SQL(pgstac_settings)) |
160 | | - |
161 | | - if str(params.get("mosaic_index", "TRUE")).upper() == "TRUE": |
162 | | - # Create index of searches with `mosaic`` type |
| 243 | + cursor.execute(sql.SQL("SELECT 1 FROM pg_extension WHERE extname = 'pg_cron';")) |
| 244 | + if cursor.fetchone(): |
163 | 245 | cursor.execute( |
164 | 246 | sql.SQL( |
165 | | - "CREATE INDEX IF NOT EXISTS searches_mosaic ON searches ((true)) WHERE metadata->>'type'='mosaic';" |
| 247 | + "SELECT cron.unschedule(jobid) FROM cron.job " |
| 248 | + "WHERE jobname = 'pgstac-run-queued-queries';" |
166 | 249 | ) |
167 | 250 | ) |
168 | 251 |
|
169 | 252 |
|
| 253 | +def register_pg_cron(cursor, db_name: str, schedule: str) -> None: |
| 254 | + """Install the pg_cron extension and schedule run_queued_queries. |
| 255 | +
|
| 256 | + pg_cron can only be installed in the ``postgres`` database, so the cursor |
| 257 | + must be connected there as a superuser. ``cron.schedule_in_database`` is |
| 258 | + used to run the job against the pgSTAC database. The job is upserted by |
| 259 | + name so repeated bootstrap runs are safe. |
| 260 | +
|
| 261 | + pg_cron must be listed in ``shared_preload_libraries`` in the RDS parameter |
| 262 | + group before this will succeed. |
| 263 | +
|
| 264 | + Side effect: sets ``search_path = pgstac, public`` as the default for all |
| 265 | + connections to ``db_name`` via ``ALTER DATABASE``. This is required because |
| 266 | + pg_cron background worker sessions start with no ``search_path``, and |
| 267 | + prepending ``SET search_path`` to the cron command would open a transaction |
| 268 | + that prevents ``run_queued_queries()`` from issuing its own ``COMMIT``. |
| 269 | +
|
| 270 | + Args: |
| 271 | + cursor: Database cursor connected to the ``postgres`` database as a superuser. |
| 272 | + db_name: Name of the pgSTAC database where run_queued_queries will run. |
| 273 | + schedule: Cron schedule expression (e.g. ``"*/5 * * * *"``). |
| 274 | + """ |
| 275 | + cursor.execute(sql.SQL("CREATE EXTENSION IF NOT EXISTS pg_cron;")) |
| 276 | + cursor.execute( |
| 277 | + sql.SQL("ALTER DATABASE {db_name} SET search_path TO pgstac, public;").format( |
| 278 | + db_name=sql.Identifier(db_name), |
| 279 | + ) |
| 280 | + ) |
| 281 | + cursor.execute( |
| 282 | + sql.SQL( |
| 283 | + "SELECT cron.schedule_in_database({job_name}, {schedule}, 'CALL pgstac.run_queued_queries();', {db_name});" |
| 284 | + ).format( |
| 285 | + job_name=sql.Literal("pgstac-run-queued-queries"), |
| 286 | + schedule=sql.Literal(schedule), |
| 287 | + db_name=sql.Literal(db_name), |
| 288 | + ) |
| 289 | + ) |
| 290 | + |
| 291 | + |
170 | 292 | def handler(event, context): |
171 | 293 | """Lambda Handler.""" |
172 | 294 | print(f"Handling {event}") |
@@ -213,6 +335,22 @@ def handler(event, context): |
213 | 335 | password=eoapi_params["password"], |
214 | 336 | ) |
215 | 337 |
|
| 338 | + if "use_queue" in params: |
| 339 | + if _is_enabled(params, "use_queue"): |
| 340 | + schedule = params.get("pg_cron_schedule", DEFAULT_PG_CRON_SCHEDULE) |
| 341 | + print( |
| 342 | + f"Scheduling pg_cron job 'pgstac-run-queued-queries' " |
| 343 | + f"with schedule '{schedule}'..." |
| 344 | + ) |
| 345 | + register_pg_cron( |
| 346 | + cursor=cur, |
| 347 | + db_name=eoapi_params["dbname"], |
| 348 | + schedule=schedule, |
| 349 | + ) |
| 350 | + else: |
| 351 | + print("Removing pg_cron job 'pgstac-run-queued-queries' if present...") |
| 352 | + unregister_pg_cron(cursor=cur) |
| 353 | + |
216 | 354 | # Install postgis and pgstac on the eoapi database with |
217 | 355 | # superuser permissions |
218 | 356 | print(f"Connecting to eoAPI '{eoapi_params['dbname']}' database...") |
|
0 commit comments