From 0b6fa35cf932d20114886c1fcdadf653bdf5ebba Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 9 Apr 2026 09:50:07 +0200 Subject: [PATCH 1/4] Add fast skip pre-check to avoid loading full datasets for up-to-date entries Before processing batches, loads a lightweight {identifier_key: last_modified_at} dict from the database in a single query (no joins to revision/file tables). Datasets where last_modified_at >= max(file.last_modified) are skipped instantly without the expensive get_dataset_collection call. The cache is built once per (provider, dataset_type) in the loader and reused across selectors within the same run. No false negatives: datasets that might need updating always fall through to the full should_refetch check. --- ingestify/application/dataset_store.py | 7 ++ ingestify/application/loader.py | 22 ++++++- .../models/dataset/dataset_repository.py | 13 ++++ .../domain/models/ingestion/ingestion_job.py | 41 ++++++++++-- .../store/dataset/sqlalchemy/repository.py | 28 +++++++- ingestify/tests/test_fast_skip.py | 66 +++++++++++++++++++ 6 files changed, 170 insertions(+), 7 deletions(-) create mode 100644 ingestify/tests/test_fast_skip.py diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index a98d425..2cf800d 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -190,6 +190,13 @@ def with_file_cache(self): def save_ingestion_job_summary(self, ingestion_job_summary): self.dataset_repository.save_ingestion_job_summary(ingestion_job_summary) + def get_existing_dataset_timestamps(self, provider: str, dataset_type: str) -> dict: + return self.dataset_repository.get_existing_dataset_timestamps( + bucket=self.bucket, + provider=provider, + dataset_type=dataset_type, + ) + def get_dataset_collection( self, dataset_type: Optional[str] = None, diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 095b5b7..697b748 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -240,6 +240,11 @@ def collect( def run(self, selectors, dry_run: bool = False): """Execute the collected selectors.""" ingestion_job_prefix = str(uuid.uuid1()) + + # Build a cache of existing dataset timestamps per (provider, dataset_type). + # Used as a fast pre-check to skip datasets that are already up-to-date. + existing_timestamps_cache: dict[tuple, dict] = {} + for ingestion_job_idx, (ingestion_plan, selector) in enumerate(selectors): logger.info( f"Discovering datasets from {ingestion_plan.source.__class__.__name__} using selector {selector}" @@ -253,12 +258,27 @@ def run(self, selectors, dry_run: bool = False): selector=selector, ) + # Lazily load the timestamps cache per (provider, dataset_type) + cache_key = ( + ingestion_plan.source.provider, + ingestion_plan.dataset_type, + ) + if cache_key not in existing_timestamps_cache: + existing_timestamps_cache[ + cache_key + ] = self.store.get_existing_dataset_timestamps( + provider=cache_key[0], + dataset_type=cache_key[1], + ) + with TaskExecutor( dry_run=dry_run, processes=ingestion_plan.source.max_concurrency, ) as task_executor: for ingestion_job_summary in ingestion_job.execute( - self.store, task_executor=task_executor + self.store, + task_executor=task_executor, + existing_timestamps=existing_timestamps_cache[cache_key], ): # TODO: handle task_summaries # Summarize to a IngestionJobSummary, and save to a database. This Summary can later be used in a diff --git a/ingestify/domain/models/dataset/dataset_repository.py b/ingestify/domain/models/dataset/dataset_repository.py index d85b7a8..674570e 100644 --- a/ingestify/domain/models/dataset/dataset_repository.py +++ b/ingestify/domain/models/dataset/dataset_repository.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from datetime import datetime from typing import Optional, List, Union from .collection import DatasetCollection @@ -23,6 +24,18 @@ def get_dataset_collection( ) -> DatasetCollection: pass + def get_existing_dataset_timestamps( + self, + bucket: str, + provider: str, + dataset_type: str, + ) -> dict[str, datetime]: + """Return {identifier_json: last_modified_at} for all datasets matching + the given provider and dataset_type. Used as a fast pre-check to skip + datasets that are already up-to-date without loading the full + dataset+revision+file graph.""" + return {} + @abstractmethod def destroy(self, dataset: Dataset): pass diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index 32df9f1..78fb1f2 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -346,7 +346,10 @@ def __init__( self.selector = selector def execute( - self, store: DatasetStore, task_executor: TaskExecutor + self, + store: DatasetStore, + task_executor: TaskExecutor, + existing_timestamps: Optional[dict] = None, ) -> Iterator[IngestionJobSummary]: is_first_chunk = True ingestion_job_summary = IngestionJobSummary.new(ingestion_job=self) @@ -429,12 +432,42 @@ def execute( yield ingestion_job_summary return + # Fast pre-check: skip datasets that are definitely up-to-date + # based on the cached timestamps. Only resources that might need + # work proceed to the full get_dataset_collection check. + skipped_tasks = 0 + if existing_timestamps: + pending_batch = [] + for dataset_resource in batch: + identifier = Identifier.create_from_selector( + self.selector, **dataset_resource.dataset_resource_id + ) + ts = existing_timestamps.get(identifier.key) + if ts is not None: + # Dataset exists — check if all files are up-to-date + max_file_modified = max( + f.last_modified for f in dataset_resource.files.values() + ) + if ts >= max_file_modified: + skipped_tasks += 1 + continue + pending_batch.append(dataset_resource) + batch = pending_batch + + if not batch: + logger.info( + f"Discovered {skipped_tasks + len(batch)} datasets from " + f"{self.ingestion_plan.source.__class__.__name__} " + f"using selector {self.selector} => nothing to do " + f"({skipped_tasks} skipped via pre-check)" + ) + ingestion_job_summary.increase_skipped_tasks(skipped_tasks) + continue + dataset_identifiers = [ Identifier.create_from_selector( self.selector, **dataset_resource.dataset_resource_id ) - # We have to pass the data_spec_versions here as a Source can add some - # extra data to the identifier which is retrieved in a certain data format for dataset_resource in batch ] @@ -449,8 +482,6 @@ def execute( selector=dataset_identifiers, ) - skipped_tasks = 0 - task_set = TaskSet() with ingestion_job_summary.record_timing("build_task_set"): diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index 281d312..27d3666 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -1,4 +1,5 @@ import itertools +import json import logging import uuid from typing import Optional, Union, List @@ -38,7 +39,7 @@ from ingestify.domain.models.ingestion.ingestion_job_summary import IngestionJobSummary from ingestify.domain.models.task.task_summary import TaskSummary from ingestify.exceptions import IngestifyError -from ingestify.utils import get_concurrency +from ingestify.utils import get_concurrency, key_from_dict from .tables import get_tables @@ -516,6 +517,31 @@ def _debug_query(self, q: Query): ) logger.debug(f"Running query: {text_}") + def get_existing_dataset_timestamps( + self, + bucket: str, + provider: str, + dataset_type: str, + ) -> dict: + with self.session: + query = ( + self.session.query( + self.dataset_table.c.identifier, + self.dataset_table.c.last_modified_at, + ) + .filter(self.dataset_table.c.bucket == bucket) + .filter(self.dataset_table.c.provider == provider) + .filter(self.dataset_table.c.dataset_type == dataset_type) + ) + return { + key_from_dict( + row.identifier + if isinstance(row.identifier, dict) + else json.loads(row.identifier) + ): row.last_modified_at + for row in query + } + def get_dataset_collection( self, bucket: str, diff --git a/ingestify/tests/test_fast_skip.py b/ingestify/tests/test_fast_skip.py new file mode 100644 index 0000000..d4924e9 --- /dev/null +++ b/ingestify/tests/test_fast_skip.py @@ -0,0 +1,66 @@ +"""Tests for fast skip pre-check.""" +from ingestify import Source, DatasetResource +from ingestify.domain import DataSpecVersionCollection, DraftFile, Selector +from ingestify.domain.models.dataset.collection_metadata import ( + DatasetCollectionMetadata, +) +from ingestify.domain.models.fetch_policy import FetchPolicy +from ingestify.domain.models.ingestion.ingestion_plan import IngestionPlan +from ingestify.utils import utcnow + + +def loader(file_resource, current_file, **kwargs): + return DraftFile.from_input("data", data_feed_key="f1") + + +class SimpleSource(Source): + provider = "test_provider" + + def find_datasets( + self, dataset_type, data_spec_versions, dataset_collection_metadata, **kwargs + ): + for i in range(5): + r = DatasetResource( + dataset_resource_id={"item_id": i}, + provider=self.provider, + dataset_type="test", + name=f"item-{i}", + ) + r.add_file( + last_modified=utcnow(), + data_feed_key="f1", + data_spec_version="v1", + file_loader=loader, + ) + yield r + + +def _setup(engine): + source = SimpleSource("s") + dsv = DataSpecVersionCollection.from_dict({"default": {"v1"}}) + engine.add_ingestion_plan( + IngestionPlan( + source=source, + fetch_policy=FetchPolicy(), + dataset_type="test", + selectors=[Selector.build({}, data_spec_versions=dsv)], + data_spec_versions=dsv, + ) + ) + + +def test_timestamps_cache_matches_identifiers(engine): + """Keys from get_existing_dataset_timestamps match Identifier.key.""" + _setup(engine) + engine.run() + + timestamps = engine.store.get_existing_dataset_timestamps( + provider="test_provider", dataset_type="test" + ) + datasets = engine.store.get_dataset_collection( + provider="test_provider", dataset_type="test" + ) + + assert len(timestamps) == len(datasets) == 5 + for dataset in datasets: + assert dataset.identifier.key in timestamps From 38d730985162b58aea4f623c1d00192a32f77de0 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 9 Apr 2026 09:58:47 +0200 Subject: [PATCH 2/4] Rename to get_dataset_last_modified_at_map --- ingestify/application/dataset_store.py | 4 ++-- ingestify/application/loader.py | 10 +++++----- ingestify/domain/models/dataset/dataset_repository.py | 2 +- ingestify/domain/models/ingestion/ingestion_job.py | 6 +++--- ingestify/infra/store/dataset/sqlalchemy/repository.py | 2 +- ingestify/tests/test_fast_skip.py | 4 ++-- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index 2cf800d..56e0c06 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -190,8 +190,8 @@ def with_file_cache(self): def save_ingestion_job_summary(self, ingestion_job_summary): self.dataset_repository.save_ingestion_job_summary(ingestion_job_summary) - def get_existing_dataset_timestamps(self, provider: str, dataset_type: str) -> dict: - return self.dataset_repository.get_existing_dataset_timestamps( + def get_dataset_last_modified_at_map(self, provider: str, dataset_type: str) -> dict: + return self.dataset_repository.get_dataset_last_modified_at_map( bucket=self.bucket, provider=provider, dataset_type=dataset_type, diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 697b748..1d27c50 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -243,7 +243,7 @@ def run(self, selectors, dry_run: bool = False): # Build a cache of existing dataset timestamps per (provider, dataset_type). # Used as a fast pre-check to skip datasets that are already up-to-date. - existing_timestamps_cache: dict[tuple, dict] = {} + last_modified_at_cache: dict[tuple, dict] = {} for ingestion_job_idx, (ingestion_plan, selector) in enumerate(selectors): logger.info( @@ -263,10 +263,10 @@ def run(self, selectors, dry_run: bool = False): ingestion_plan.source.provider, ingestion_plan.dataset_type, ) - if cache_key not in existing_timestamps_cache: - existing_timestamps_cache[ + if cache_key not in last_modified_at_cache: + last_modified_at_cache[ cache_key - ] = self.store.get_existing_dataset_timestamps( + ] = self.store.get_dataset_last_modified_at_map( provider=cache_key[0], dataset_type=cache_key[1], ) @@ -278,7 +278,7 @@ def run(self, selectors, dry_run: bool = False): for ingestion_job_summary in ingestion_job.execute( self.store, task_executor=task_executor, - existing_timestamps=existing_timestamps_cache[cache_key], + last_modified_at_map=last_modified_at_cache[cache_key], ): # TODO: handle task_summaries # Summarize to a IngestionJobSummary, and save to a database. This Summary can later be used in a diff --git a/ingestify/domain/models/dataset/dataset_repository.py b/ingestify/domain/models/dataset/dataset_repository.py index 674570e..08af592 100644 --- a/ingestify/domain/models/dataset/dataset_repository.py +++ b/ingestify/domain/models/dataset/dataset_repository.py @@ -24,7 +24,7 @@ def get_dataset_collection( ) -> DatasetCollection: pass - def get_existing_dataset_timestamps( + def get_dataset_last_modified_at_map( self, bucket: str, provider: str, diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index 78fb1f2..4a2d13e 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -349,7 +349,7 @@ def execute( self, store: DatasetStore, task_executor: TaskExecutor, - existing_timestamps: Optional[dict] = None, + last_modified_at_map: Optional[dict] = None, ) -> Iterator[IngestionJobSummary]: is_first_chunk = True ingestion_job_summary = IngestionJobSummary.new(ingestion_job=self) @@ -436,13 +436,13 @@ def execute( # based on the cached timestamps. Only resources that might need # work proceed to the full get_dataset_collection check. skipped_tasks = 0 - if existing_timestamps: + if last_modified_at_map: pending_batch = [] for dataset_resource in batch: identifier = Identifier.create_from_selector( self.selector, **dataset_resource.dataset_resource_id ) - ts = existing_timestamps.get(identifier.key) + ts = last_modified_at_map.get(identifier.key) if ts is not None: # Dataset exists — check if all files are up-to-date max_file_modified = max( diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index 27d3666..3be5e03 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -517,7 +517,7 @@ def _debug_query(self, q: Query): ) logger.debug(f"Running query: {text_}") - def get_existing_dataset_timestamps( + def get_dataset_last_modified_at_map( self, bucket: str, provider: str, diff --git a/ingestify/tests/test_fast_skip.py b/ingestify/tests/test_fast_skip.py index d4924e9..fa4ea8a 100644 --- a/ingestify/tests/test_fast_skip.py +++ b/ingestify/tests/test_fast_skip.py @@ -50,11 +50,11 @@ def _setup(engine): def test_timestamps_cache_matches_identifiers(engine): - """Keys from get_existing_dataset_timestamps match Identifier.key.""" + """Keys from get_dataset_last_modified_at_map match Identifier.key.""" _setup(engine) engine.run() - timestamps = engine.store.get_existing_dataset_timestamps( + timestamps = engine.store.get_dataset_last_modified_at_map( provider="test_provider", dataset_type="test" ) datasets = engine.store.get_dataset_collection( From 0c5552a4f7abb217b8505bd7b46272b457d3576b Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 9 Apr 2026 10:10:42 +0200 Subject: [PATCH 3/4] Add DatasetLastModifiedAtMap type alias --- ingestify/application/dataset_store.py | 4 +++- ingestify/application/loader.py | 2 +- ingestify/domain/models/dataset/dataset_repository.py | 4 +++- ingestify/domain/models/ingestion/ingestion_job.py | 3 ++- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index 56e0c06..03ebade 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -190,7 +190,9 @@ def with_file_cache(self): def save_ingestion_job_summary(self, ingestion_job_summary): self.dataset_repository.save_ingestion_job_summary(ingestion_job_summary) - def get_dataset_last_modified_at_map(self, provider: str, dataset_type: str) -> dict: + def get_dataset_last_modified_at_map( + self, provider: str, dataset_type: str + ) -> "DatasetLastModifiedAtMap": return self.dataset_repository.get_dataset_last_modified_at_map( bucket=self.bucket, provider=provider, diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 1d27c50..f302a5c 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -243,7 +243,7 @@ def run(self, selectors, dry_run: bool = False): # Build a cache of existing dataset timestamps per (provider, dataset_type). # Used as a fast pre-check to skip datasets that are already up-to-date. - last_modified_at_cache: dict[tuple, dict] = {} + last_modified_at_cache: dict[tuple, "DatasetLastModifiedAtMap"] = {} for ingestion_job_idx, (ingestion_plan, selector) in enumerate(selectors): logger.info( diff --git a/ingestify/domain/models/dataset/dataset_repository.py b/ingestify/domain/models/dataset/dataset_repository.py index 08af592..262fb56 100644 --- a/ingestify/domain/models/dataset/dataset_repository.py +++ b/ingestify/domain/models/dataset/dataset_repository.py @@ -7,6 +7,8 @@ from .dataset_state import DatasetState from .selector import Selector +DatasetLastModifiedAtMap = dict[str, datetime] + class DatasetRepository(ABC): @abstractmethod @@ -29,7 +31,7 @@ def get_dataset_last_modified_at_map( bucket: str, provider: str, dataset_type: str, - ) -> dict[str, datetime]: + ) -> DatasetLastModifiedAtMap: """Return {identifier_json: last_modified_at} for all datasets matching the given provider and dataset_type. Used as a fast pre-check to skip datasets that are already up-to-date without loading the full diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index 4a2d13e..4f5a704 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -24,6 +24,7 @@ DatasetResource, ) from ingestify.domain.models.resources.batch_loader import BatchLoader +from ingestify.domain.models.dataset.dataset_repository import DatasetLastModifiedAtMap from ingestify.domain.models.task.task_summary import TaskSummary from ingestify.exceptions import SaveError, IngestifyError from ingestify.utils import TaskExecutor, chunker @@ -349,7 +350,7 @@ def execute( self, store: DatasetStore, task_executor: TaskExecutor, - last_modified_at_map: Optional[dict] = None, + last_modified_at_map: Optional[DatasetLastModifiedAtMap] = None, ) -> Iterator[IngestionJobSummary]: is_first_chunk = True ingestion_job_summary = IngestionJobSummary.new(ingestion_job=self) From dbb7b758d3efa824d6bc19b9eea403c716c56c43 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 9 Apr 2026 13:20:43 +0200 Subject: [PATCH 4/4] Move DatasetLastModifiedAtMap type alias to dataset.py --- ingestify/domain/models/dataset/dataset.py | 3 +++ ingestify/domain/models/dataset/dataset_repository.py | 4 +--- ingestify/domain/models/ingestion/ingestion_job.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/ingestify/domain/models/dataset/dataset.py b/ingestify/domain/models/dataset/dataset.py index 1d118f2..bf30304 100644 --- a/ingestify/domain/models/dataset/dataset.py +++ b/ingestify/domain/models/dataset/dataset.py @@ -11,6 +11,9 @@ from ..base import BaseModel +DatasetLastModifiedAtMap = dict[str, datetime] + + class Dataset(BaseModel): bucket: str # This must be set by the DatasetRepository dataset_id: str diff --git a/ingestify/domain/models/dataset/dataset_repository.py b/ingestify/domain/models/dataset/dataset_repository.py index 262fb56..4076e78 100644 --- a/ingestify/domain/models/dataset/dataset_repository.py +++ b/ingestify/domain/models/dataset/dataset_repository.py @@ -3,12 +3,10 @@ from typing import Optional, List, Union from .collection import DatasetCollection -from .dataset import Dataset +from .dataset import Dataset, DatasetLastModifiedAtMap from .dataset_state import DatasetState from .selector import Selector -DatasetLastModifiedAtMap = dict[str, datetime] - class DatasetRepository(ABC): @abstractmethod diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index 4f5a704..0dca1b0 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -24,7 +24,7 @@ DatasetResource, ) from ingestify.domain.models.resources.batch_loader import BatchLoader -from ingestify.domain.models.dataset.dataset_repository import DatasetLastModifiedAtMap +from ingestify.domain.models.dataset.dataset import DatasetLastModifiedAtMap from ingestify.domain.models.task.task_summary import TaskSummary from ingestify.exceptions import SaveError, IngestifyError from ingestify.utils import TaskExecutor, chunker