Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ingestify/application/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ def with_file_cache(self):
def save_ingestion_job_summary(self, ingestion_job_summary):
self.dataset_repository.save_ingestion_job_summary(ingestion_job_summary)

def get_dataset_last_modified_at_map(
self, provider: str, dataset_type: str
) -> "DatasetLastModifiedAtMap":
return self.dataset_repository.get_dataset_last_modified_at_map(
bucket=self.bucket,
provider=provider,
dataset_type=dataset_type,
)

def get_dataset_collection(
self,
dataset_type: Optional[str] = None,
Expand Down
22 changes: 21 additions & 1 deletion ingestify/application/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ def collect(
def run(self, selectors, dry_run: bool = False):
"""Execute the collected selectors."""
ingestion_job_prefix = str(uuid.uuid1())

# Build a cache of existing dataset timestamps per (provider, dataset_type).
# Used as a fast pre-check to skip datasets that are already up-to-date.
last_modified_at_cache: dict[tuple, "DatasetLastModifiedAtMap"] = {}

for ingestion_job_idx, (ingestion_plan, selector) in enumerate(selectors):
logger.info(
f"Discovering datasets from {ingestion_plan.source.__class__.__name__} using selector {selector}"
Expand All @@ -253,12 +258,27 @@ def run(self, selectors, dry_run: bool = False):
selector=selector,
)

# Lazily load the timestamps cache per (provider, dataset_type)
cache_key = (
ingestion_plan.source.provider,
ingestion_plan.dataset_type,
)
if cache_key not in last_modified_at_cache:
last_modified_at_cache[
cache_key
] = self.store.get_dataset_last_modified_at_map(
provider=cache_key[0],
dataset_type=cache_key[1],
)

with TaskExecutor(
dry_run=dry_run,
processes=ingestion_plan.source.max_concurrency,
) as task_executor:
for ingestion_job_summary in ingestion_job.execute(
self.store, task_executor=task_executor
self.store,
task_executor=task_executor,
last_modified_at_map=last_modified_at_cache[cache_key],
):
# TODO: handle task_summaries
# Summarize to a IngestionJobSummary, and save to a database. This Summary can later be used in a
Expand Down
3 changes: 3 additions & 0 deletions ingestify/domain/models/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from ..base import BaseModel


DatasetLastModifiedAtMap = dict[str, datetime]


class Dataset(BaseModel):
bucket: str # This must be set by the DatasetRepository
dataset_id: str
Expand Down
15 changes: 14 additions & 1 deletion ingestify/domain/models/dataset/dataset_repository.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, List, Union

from .collection import DatasetCollection
from .dataset import Dataset
from .dataset import Dataset, DatasetLastModifiedAtMap
from .dataset_state import DatasetState
from .selector import Selector

Expand All @@ -23,6 +24,18 @@ def get_dataset_collection(
) -> DatasetCollection:
pass

def get_dataset_last_modified_at_map(
self,
bucket: str,
provider: str,
dataset_type: str,
) -> DatasetLastModifiedAtMap:
"""Return {identifier_json: last_modified_at} for all datasets matching
the given provider and dataset_type. Used as a fast pre-check to skip
datasets that are already up-to-date without loading the full
dataset+revision+file graph."""
return {}

@abstractmethod
def destroy(self, dataset: Dataset):
pass
Expand Down
42 changes: 37 additions & 5 deletions ingestify/domain/models/ingestion/ingestion_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
DatasetResource,
)
from ingestify.domain.models.resources.batch_loader import BatchLoader
from ingestify.domain.models.dataset.dataset import DatasetLastModifiedAtMap
from ingestify.domain.models.task.task_summary import TaskSummary
from ingestify.exceptions import SaveError, IngestifyError
from ingestify.utils import TaskExecutor, chunker
Expand Down Expand Up @@ -346,7 +347,10 @@ def __init__(
self.selector = selector

def execute(
self, store: DatasetStore, task_executor: TaskExecutor
self,
store: DatasetStore,
task_executor: TaskExecutor,
last_modified_at_map: Optional[DatasetLastModifiedAtMap] = None,
) -> Iterator[IngestionJobSummary]:
is_first_chunk = True
ingestion_job_summary = IngestionJobSummary.new(ingestion_job=self)
Expand Down Expand Up @@ -429,12 +433,42 @@ def execute(
yield ingestion_job_summary
return

# Fast pre-check: skip datasets that are definitely up-to-date
# based on the cached timestamps. Only resources that might need
# work proceed to the full get_dataset_collection check.
skipped_tasks = 0
if last_modified_at_map:
pending_batch = []
for dataset_resource in batch:
identifier = Identifier.create_from_selector(
self.selector, **dataset_resource.dataset_resource_id
)
ts = last_modified_at_map.get(identifier.key)
if ts is not None:
# Dataset exists — check if all files are up-to-date
max_file_modified = max(
f.last_modified for f in dataset_resource.files.values()
)
if ts >= max_file_modified:
skipped_tasks += 1
continue
pending_batch.append(dataset_resource)
batch = pending_batch

if not batch:
logger.info(
f"Discovered {skipped_tasks + len(batch)} datasets from "
f"{self.ingestion_plan.source.__class__.__name__} "
f"using selector {self.selector} => nothing to do "
f"({skipped_tasks} skipped via pre-check)"
)
ingestion_job_summary.increase_skipped_tasks(skipped_tasks)
continue

dataset_identifiers = [
Identifier.create_from_selector(
self.selector, **dataset_resource.dataset_resource_id
)
# We have to pass the data_spec_versions here as a Source can add some
# extra data to the identifier which is retrieved in a certain data format
for dataset_resource in batch
]

Expand All @@ -449,8 +483,6 @@ def execute(
selector=dataset_identifiers,
)

skipped_tasks = 0

task_set = TaskSet()

with ingestion_job_summary.record_timing("build_task_set"):
Expand Down
28 changes: 27 additions & 1 deletion ingestify/infra/store/dataset/sqlalchemy/repository.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import itertools
import json
import logging
import uuid
from typing import Optional, Union, List
Expand Down Expand Up @@ -38,7 +39,7 @@
from ingestify.domain.models.ingestion.ingestion_job_summary import IngestionJobSummary
from ingestify.domain.models.task.task_summary import TaskSummary
from ingestify.exceptions import IngestifyError
from ingestify.utils import get_concurrency
from ingestify.utils import get_concurrency, key_from_dict

from .tables import get_tables

Expand Down Expand Up @@ -516,6 +517,31 @@ def _debug_query(self, q: Query):
)
logger.debug(f"Running query: {text_}")

def get_dataset_last_modified_at_map(
self,
bucket: str,
provider: str,
dataset_type: str,
) -> dict:
with self.session:
query = (
self.session.query(
self.dataset_table.c.identifier,
self.dataset_table.c.last_modified_at,
)
.filter(self.dataset_table.c.bucket == bucket)
.filter(self.dataset_table.c.provider == provider)
.filter(self.dataset_table.c.dataset_type == dataset_type)
)
return {
key_from_dict(
row.identifier
if isinstance(row.identifier, dict)
else json.loads(row.identifier)
): row.last_modified_at
for row in query
}

def get_dataset_collection(
self,
bucket: str,
Expand Down
66 changes: 66 additions & 0 deletions ingestify/tests/test_fast_skip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Tests for fast skip pre-check."""
from ingestify import Source, DatasetResource
from ingestify.domain import DataSpecVersionCollection, DraftFile, Selector
from ingestify.domain.models.dataset.collection_metadata import (
DatasetCollectionMetadata,
)
from ingestify.domain.models.fetch_policy import FetchPolicy
from ingestify.domain.models.ingestion.ingestion_plan import IngestionPlan
from ingestify.utils import utcnow


def loader(file_resource, current_file, **kwargs):
return DraftFile.from_input("data", data_feed_key="f1")


class SimpleSource(Source):
provider = "test_provider"

def find_datasets(
self, dataset_type, data_spec_versions, dataset_collection_metadata, **kwargs
):
for i in range(5):
r = DatasetResource(
dataset_resource_id={"item_id": i},
provider=self.provider,
dataset_type="test",
name=f"item-{i}",
)
r.add_file(
last_modified=utcnow(),
data_feed_key="f1",
data_spec_version="v1",
file_loader=loader,
)
yield r


def _setup(engine):
source = SimpleSource("s")
dsv = DataSpecVersionCollection.from_dict({"default": {"v1"}})
engine.add_ingestion_plan(
IngestionPlan(
source=source,
fetch_policy=FetchPolicy(),
dataset_type="test",
selectors=[Selector.build({}, data_spec_versions=dsv)],
data_spec_versions=dsv,
)
)


def test_timestamps_cache_matches_identifiers(engine):
"""Keys from get_dataset_last_modified_at_map match Identifier.key."""
_setup(engine)
engine.run()

timestamps = engine.store.get_dataset_last_modified_at_map(
provider="test_provider", dataset_type="test"
)
datasets = engine.store.get_dataset_collection(
provider="test_provider", dataset_type="test"
)

assert len(timestamps) == len(datasets) == 5
for dataset in datasets:
assert dataset.identifier.key in timestamps
Loading