Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new daily background job to re-import all Zetkin data sources by scheduling a recurring task through the existing PgBoss worker/queue infrastructure.
Changes:
- Add
importZetkinDataSourcesjob that finds all Zetkin data sources and enqueuesimportDataSourcefor each. - Register the new job handler in the worker task dispatcher.
- Schedule the new job to run daily at 4:00am.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| src/server/services/worker.ts | Registers importZetkinDataSources as a runnable worker task. |
| src/server/jobs/importZetkinDataSources.ts | Implements the job that enqueues imports for all Zetkin data sources. |
| src/instrumentation.ts | Schedules the daily cron trigger for importZetkinDataSources. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| await schedule("0 3 * * *", "refreshWebhooks", {}); | ||
| // Re-import all Zetkin data sources at 4:00am every day | ||
| await schedule("0 4 * * *", "importZetkinDataSources", {}); |
There was a problem hiding this comment.
schedule() ultimately calls boss.schedule(queue, ...). With pg-boss, the first argument is the schedule/job name and must be unique; scheduling twice with the same name (here: the default queue name) will overwrite/update the existing schedule. As written, the 4:00am schedule will replace the 3:00am refreshWebhooks schedule (or vice-versa), so only one of them will actually run. Use distinct schedule names per task (e.g., incorporate task into the scheduled job name), or change the scheduling approach so multiple cron tasks can coexist on the same queue.
| await schedule("0 3 * * *", "refreshWebhooks", {}); | |
| // Re-import all Zetkin data sources at 4:00am every day | |
| await schedule("0 4 * * *", "importZetkinDataSources", {}); | |
| await schedule("0 3 * * *", "refreshWebhooks", {}, "refreshWebhooksSchedule"); | |
| // Re-import all Zetkin data sources at 4:00am every day | |
| await schedule("0 4 * * *", "importZetkinDataSources", {}, "importZetkinDataSourcesSchedule"); |
| for (const source of zetkinDataSources) { | ||
| try { | ||
| await enqueue("importDataSource", source.id, { | ||
| dataSourceId: source.id, | ||
| }); | ||
| } catch (error) { | ||
| logger.warn( | ||
| `Failed to enqueue import for Zetkin data source ${source.id}`, | ||
| { error }, | ||
| ); | ||
| continue; | ||
| } | ||
| } | ||
| return true; |
There was a problem hiding this comment.
This job swallows any enqueue() error and still returns true, which means the scheduled job will be marked successful even if no imports were enqueued (e.g., if PgBoss is temporarily unavailable). Consider letting unexpected enqueue() failures fail the job (so pg-boss retries), or at least tracking whether any enqueue failed and returning false/throwing at the end to avoid silently skipping a day's imports.
| const importZetkinDataSources = async (): Promise<boolean> => { | ||
| const zetkinDataSources = await findDataSourcesByType(DataSourceType.Zetkin); | ||
| for (const source of zetkinDataSources) { | ||
| try { | ||
| await enqueue("importDataSource", source.id, { | ||
| dataSourceId: source.id, | ||
| }); |
There was a problem hiding this comment.
New scheduled job logic is introduced here, but there’s no test coverage to verify it enqueues importDataSource for each Zetkin data source (and handles failures as intended). Given there are existing tests for other jobs under tests/** (e.g. importDataSource), please add a unit/feature test that seeds Zetkin data sources and asserts the correct enqueue() calls / scheduled behavior.
No description provided.