diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index a98d425..03ebade 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -190,6 +190,15 @@ def with_file_cache(self): def save_ingestion_job_summary(self, ingestion_job_summary): self.dataset_repository.save_ingestion_job_summary(ingestion_job_summary) + def get_dataset_last_modified_at_map( + self, provider: str, dataset_type: str + ) -> "DatasetLastModifiedAtMap": + return self.dataset_repository.get_dataset_last_modified_at_map( + bucket=self.bucket, + provider=provider, + dataset_type=dataset_type, + ) + def get_dataset_collection( self, dataset_type: Optional[str] = None, diff --git a/ingestify/application/loader.py b/ingestify/application/loader.py index 095b5b7..f302a5c 100644 --- a/ingestify/application/loader.py +++ b/ingestify/application/loader.py @@ -240,6 +240,11 @@ def collect( def run(self, selectors, dry_run: bool = False): """Execute the collected selectors.""" ingestion_job_prefix = str(uuid.uuid1()) + + # Build a cache of existing dataset timestamps per (provider, dataset_type). + # Used as a fast pre-check to skip datasets that are already up-to-date. + last_modified_at_cache: dict[tuple, "DatasetLastModifiedAtMap"] = {} + for ingestion_job_idx, (ingestion_plan, selector) in enumerate(selectors): logger.info( f"Discovering datasets from {ingestion_plan.source.__class__.__name__} using selector {selector}" @@ -253,12 +258,27 @@ def run(self, selectors, dry_run: bool = False): selector=selector, ) + # Lazily load the timestamps cache per (provider, dataset_type) + cache_key = ( + ingestion_plan.source.provider, + ingestion_plan.dataset_type, + ) + if cache_key not in last_modified_at_cache: + last_modified_at_cache[ + cache_key + ] = self.store.get_dataset_last_modified_at_map( + provider=cache_key[0], + dataset_type=cache_key[1], + ) + with TaskExecutor( dry_run=dry_run, processes=ingestion_plan.source.max_concurrency, ) as task_executor: for ingestion_job_summary in ingestion_job.execute( - self.store, task_executor=task_executor + self.store, + task_executor=task_executor, + last_modified_at_map=last_modified_at_cache[cache_key], ): # TODO: handle task_summaries # Summarize to a IngestionJobSummary, and save to a database. This Summary can later be used in a diff --git a/ingestify/domain/models/dataset/dataset.py b/ingestify/domain/models/dataset/dataset.py index 1d118f2..bf30304 100644 --- a/ingestify/domain/models/dataset/dataset.py +++ b/ingestify/domain/models/dataset/dataset.py @@ -11,6 +11,9 @@ from ..base import BaseModel +DatasetLastModifiedAtMap = dict[str, datetime] + + class Dataset(BaseModel): bucket: str # This must be set by the DatasetRepository dataset_id: str diff --git a/ingestify/domain/models/dataset/dataset_repository.py b/ingestify/domain/models/dataset/dataset_repository.py index d85b7a8..4076e78 100644 --- a/ingestify/domain/models/dataset/dataset_repository.py +++ b/ingestify/domain/models/dataset/dataset_repository.py @@ -1,8 +1,9 @@ from abc import ABC, abstractmethod +from datetime import datetime from typing import Optional, List, Union from .collection import DatasetCollection -from .dataset import Dataset +from .dataset import Dataset, DatasetLastModifiedAtMap from .dataset_state import DatasetState from .selector import Selector @@ -23,6 +24,18 @@ def get_dataset_collection( ) -> DatasetCollection: pass + def get_dataset_last_modified_at_map( + self, + bucket: str, + provider: str, + dataset_type: str, + ) -> DatasetLastModifiedAtMap: + """Return {identifier_json: last_modified_at} for all datasets matching + the given provider and dataset_type. Used as a fast pre-check to skip + datasets that are already up-to-date without loading the full + dataset+revision+file graph.""" + return {} + @abstractmethod def destroy(self, dataset: Dataset): pass diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index 32df9f1..0dca1b0 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -24,6 +24,7 @@ DatasetResource, ) from ingestify.domain.models.resources.batch_loader import BatchLoader +from ingestify.domain.models.dataset.dataset import DatasetLastModifiedAtMap from ingestify.domain.models.task.task_summary import TaskSummary from ingestify.exceptions import SaveError, IngestifyError from ingestify.utils import TaskExecutor, chunker @@ -346,7 +347,10 @@ def __init__( self.selector = selector def execute( - self, store: DatasetStore, task_executor: TaskExecutor + self, + store: DatasetStore, + task_executor: TaskExecutor, + last_modified_at_map: Optional[DatasetLastModifiedAtMap] = None, ) -> Iterator[IngestionJobSummary]: is_first_chunk = True ingestion_job_summary = IngestionJobSummary.new(ingestion_job=self) @@ -429,12 +433,42 @@ def execute( yield ingestion_job_summary return + # Fast pre-check: skip datasets that are definitely up-to-date + # based on the cached timestamps. Only resources that might need + # work proceed to the full get_dataset_collection check. + skipped_tasks = 0 + if last_modified_at_map: + pending_batch = [] + for dataset_resource in batch: + identifier = Identifier.create_from_selector( + self.selector, **dataset_resource.dataset_resource_id + ) + ts = last_modified_at_map.get(identifier.key) + if ts is not None: + # Dataset exists — check if all files are up-to-date + max_file_modified = max( + f.last_modified for f in dataset_resource.files.values() + ) + if ts >= max_file_modified: + skipped_tasks += 1 + continue + pending_batch.append(dataset_resource) + batch = pending_batch + + if not batch: + logger.info( + f"Discovered {skipped_tasks + len(batch)} datasets from " + f"{self.ingestion_plan.source.__class__.__name__} " + f"using selector {self.selector} => nothing to do " + f"({skipped_tasks} skipped via pre-check)" + ) + ingestion_job_summary.increase_skipped_tasks(skipped_tasks) + continue + dataset_identifiers = [ Identifier.create_from_selector( self.selector, **dataset_resource.dataset_resource_id ) - # We have to pass the data_spec_versions here as a Source can add some - # extra data to the identifier which is retrieved in a certain data format for dataset_resource in batch ] @@ -449,8 +483,6 @@ def execute( selector=dataset_identifiers, ) - skipped_tasks = 0 - task_set = TaskSet() with ingestion_job_summary.record_timing("build_task_set"): diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index 281d312..3be5e03 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -1,4 +1,5 @@ import itertools +import json import logging import uuid from typing import Optional, Union, List @@ -38,7 +39,7 @@ from ingestify.domain.models.ingestion.ingestion_job_summary import IngestionJobSummary from ingestify.domain.models.task.task_summary import TaskSummary from ingestify.exceptions import IngestifyError -from ingestify.utils import get_concurrency +from ingestify.utils import get_concurrency, key_from_dict from .tables import get_tables @@ -516,6 +517,31 @@ def _debug_query(self, q: Query): ) logger.debug(f"Running query: {text_}") + def get_dataset_last_modified_at_map( + self, + bucket: str, + provider: str, + dataset_type: str, + ) -> dict: + with self.session: + query = ( + self.session.query( + self.dataset_table.c.identifier, + self.dataset_table.c.last_modified_at, + ) + .filter(self.dataset_table.c.bucket == bucket) + .filter(self.dataset_table.c.provider == provider) + .filter(self.dataset_table.c.dataset_type == dataset_type) + ) + return { + key_from_dict( + row.identifier + if isinstance(row.identifier, dict) + else json.loads(row.identifier) + ): row.last_modified_at + for row in query + } + def get_dataset_collection( self, bucket: str, diff --git a/ingestify/tests/test_fast_skip.py b/ingestify/tests/test_fast_skip.py new file mode 100644 index 0000000..fa4ea8a --- /dev/null +++ b/ingestify/tests/test_fast_skip.py @@ -0,0 +1,66 @@ +"""Tests for fast skip pre-check.""" +from ingestify import Source, DatasetResource +from ingestify.domain import DataSpecVersionCollection, DraftFile, Selector +from ingestify.domain.models.dataset.collection_metadata import ( + DatasetCollectionMetadata, +) +from ingestify.domain.models.fetch_policy import FetchPolicy +from ingestify.domain.models.ingestion.ingestion_plan import IngestionPlan +from ingestify.utils import utcnow + + +def loader(file_resource, current_file, **kwargs): + return DraftFile.from_input("data", data_feed_key="f1") + + +class SimpleSource(Source): + provider = "test_provider" + + def find_datasets( + self, dataset_type, data_spec_versions, dataset_collection_metadata, **kwargs + ): + for i in range(5): + r = DatasetResource( + dataset_resource_id={"item_id": i}, + provider=self.provider, + dataset_type="test", + name=f"item-{i}", + ) + r.add_file( + last_modified=utcnow(), + data_feed_key="f1", + data_spec_version="v1", + file_loader=loader, + ) + yield r + + +def _setup(engine): + source = SimpleSource("s") + dsv = DataSpecVersionCollection.from_dict({"default": {"v1"}}) + engine.add_ingestion_plan( + IngestionPlan( + source=source, + fetch_policy=FetchPolicy(), + dataset_type="test", + selectors=[Selector.build({}, data_spec_versions=dsv)], + data_spec_versions=dsv, + ) + ) + + +def test_timestamps_cache_matches_identifiers(engine): + """Keys from get_dataset_last_modified_at_map match Identifier.key.""" + _setup(engine) + engine.run() + + timestamps = engine.store.get_dataset_last_modified_at_map( + provider="test_provider", dataset_type="test" + ) + datasets = engine.store.get_dataset_collection( + provider="test_provider", dataset_type="test" + ) + + assert len(timestamps) == len(datasets) == 5 + for dataset in datasets: + assert dataset.identifier.key in timestamps