diff --git a/src/instrumentation.ts b/src/instrumentation.ts index 67df9b1a..8d0bcd3d 100644 --- a/src/instrumentation.ts +++ b/src/instrumentation.ts @@ -36,6 +36,8 @@ export async function register() { const { schedule } = await import("@/server/services/queue"); // Run refreshWebhooks at 3:00am every day (see https://timgit.github.io/pg-boss/#/./api/scheduling) await schedule("0 3 * * *", "refreshWebhooks", {}); + // Re-import all Zetkin data sources at 4:00am every day + await schedule("0 4 * * *", "importZetkinDataSources", {}); logger.info("Started"); } diff --git a/src/server/jobs/importZetkinDataSources.ts b/src/server/jobs/importZetkinDataSources.ts new file mode 100644 index 00000000..455cf995 --- /dev/null +++ b/src/server/jobs/importZetkinDataSources.ts @@ -0,0 +1,24 @@ +import { DataSourceType } from "@/models/DataSource"; +import { findDataSourcesByType } from "@/server/repositories/DataSource"; +import logger from "../services/logger"; +import { enqueue } from "../services/queue"; + +const importZetkinDataSources = async (): Promise => { + const zetkinDataSources = await findDataSourcesByType(DataSourceType.Zetkin); + for (const source of zetkinDataSources) { + try { + await enqueue("importDataSource", source.id, { + dataSourceId: source.id, + }); + } catch (error) { + logger.warn( + `Failed to enqueue import for Zetkin data source ${source.id}`, + { error }, + ); + continue; + } + } + return true; +}; + +export default importZetkinDataSources; diff --git a/src/server/services/worker.ts b/src/server/services/worker.ts index 98ce4bb3..b581c86e 100644 --- a/src/server/services/worker.ts +++ b/src/server/services/worker.ts @@ -2,6 +2,7 @@ import enrichDataRecords from "@/server/jobs/enrichDataRecords"; import enrichDataSource from "@/server/jobs/enrichDataSource"; import importDataRecords from "@/server/jobs/importDataRecords"; import importDataSource from "@/server/jobs/importDataSource"; +import importZetkinDataSources from "@/server/jobs/importZetkinDataSources"; import refreshWebhooks from "@/server/jobs/refreshWebhooks"; import removeEnrichmentColumns from "@/server/jobs/removeEnrichmentColumns"; import tagDataSource from "@/server/jobs/tagDataSource"; @@ -14,6 +15,7 @@ const taskHandlers: Record Promise> = enrichDataRecords, importDataSource, importDataRecords, + importZetkinDataSources, refreshWebhooks, removeEnrichmentColumns, tagDataSource,