feat(metadata-db,controller): add idempotency key for job deduplication#1910
feat(metadata-db,controller): add idempotency key for job deduplication#1910shiyasmohd wants to merge 1 commit intomainfrom
Conversation
8e51bcf to
4dbbcc0
Compare
4dbbcc0 to
32d875a
Compare
LNSD
left a comment
There was a problem hiding this comment.
Please, check my comments 🙂
| fn idempotency_key(job_kind: JobKind<'_>, reference: &HashReference) -> IdempotencyKey<'static> { | ||
| let input = format!("{}:{}", job_kind.as_str(), reference); | ||
| let hash = datasets_common::hash::hash(input); | ||
| // SAFETY: The hash is a validated 64-char hex string produced by our hash function. | ||
| IdempotencyKey::from_owned_unchecked(hash.into_inner()) | ||
| } |
There was a problem hiding this comment.
Right now the idempotency_key() function lives in the controller scheduler, which means the controller owns the knowledge of how to compute the key for each job kind. I think this responsibility belongs closer to the worker's job crates themselves. Each crate already owns its job_kind.rs with JOB_KIND and MaterializeRawJobKind and other similar types.
Could we add a job_key.rs module (or similar) to both worker-datasets-raw and worker-datasets-derived that each exposes a function to compute their own idempotency key? Something like:
// In worker-datasets-raw/src/job_key.rs
pub fn idempotency_key(reference: &HashReference) -> IdempotencyKey<'static> { ... }
// In worker-datasets-derived/src/job_key.rs
pub fn idempotency_key(reference: &HashReference) -> IdempotencyKey<'static> { ... }
Each would use its own JOB_KIND constant internally, so the key format ({job_kind}:{reference}) stays the same, but the computation is co-located with the job definition rather than centralized in the controller.
This way:
- Each worker crate owns its full job identity (kind + key computation)
- The controller just calls
worker_datasets_raw::job_key::idempotency_key(...)without needing to know the
hashing format - Adding a new worker type in the future doesn't require touching the controller's scheduler
| async fn schedule_job( | ||
| &self, | ||
| job_kind: JobKind<'_>, | ||
| dataset_reference: HashReference, | ||
| job_descriptor: JobDescriptor, | ||
| worker_id: Option<NodeSelector>, |
There was a problem hiding this comment.
I have a question about one scenario.
The key is computed from {job_kind}:{namespace}/{name}@{manifest_hash}, which doesn't include job options like end_block, parallelism, or worker_id. This means if a user deploys foo/bar@v1 with end_block: 1000, and then deploys the same dataset with end_block: 2000 while the first job is still active, the second request silently returns the existing job ID. The caller gets back a 200 with a job that's running with end_block: 1000. No error, no indication that their end_block: 2000 was ignored. This goes against the principle of least surprise.
This also means there's no way to update the end_block of a running job through a single deploy call. The caller would need to manually stop the job first, then redeploy, but nothing in the API response indicates that.
I would suggest that, when the key matches an active job, but the descriptor differs, return a "conflict" error explaining that an active job with different options exists and must be stopped first. At least the caller knows what happened.
There was a problem hiding this comment.
I've just realised that the job descriptor is meant to be immutable in the current design. That's problematic for the "different job options" scenario. We should find a better way to handle this.
Can you research how we can preserve the immutability property of the job ledger while allowing job rescheduling with different options, without creating multiple job instances?
Maybe move the job descriptor to the "scheduled" event and propagate that info. Idk.
|
Having multiple jobs for a same logical dataset is in many situations a feature, not a bug. So be careful how deep you constraint this. Graph Node originally bolted de-duplication of subgraphs into DB primary key constraints, and un-doing it was quite a bit of work. |
The current design uses an idempotency key ( Can you describe the use cases where multiple dataset materialization jobs are desirable? |
|
It allows resyncs without user intervention and without downtime. Situations where a resync is desirable:
|
To cover this scenario the idempotency key would be composed of: job kind + dataset reference + physical table revision id (a.k.a., location ID). And this extends the current set of use cases of the deploy endpoint: materialize dataset in a custom location
This is a more "open-ended" suggestion. But I think we can accommodate it with the idempotency key and job descriptor flexible design. |
|
Yes adding the physical id to the indempotency key completely addresses my concern. |
Summary
Introduces idempotency keys for dataset deployment jobs so repeated deploy requests for the same dataset version return the same job ID instead of creating duplicates. Also there will be one job id per dataset.
Changes
Database (metadata-db)
idempotency_keycolumn tojobswith a unique constraintlegacy:{job_id}and enforcesNOT NULLIdempotencyKeynewtype for type-safe handling of keysget_by_idempotency_keyfor lookupsON CONFLICT (idempotency_key) DO UPDATEso duplicate keys update the descriptor and return the existing job IDController
hash(job_kind:namespace/name@manifest_hash)(e.g.materialize-raw:_/anvil_rpc@abc123...)end_block,parallelism, andworker_idare not part of the key, so re-deploying with different parameters while a job is active returns the same job IDTests
it_idempotency.rsfor metadata-db idempotency behavior.it_admin_api_datasets_deploy.rsto verify redeploy with differentend_blockreturns the same job ID when the job is still active.