feat(persistence,rest,hfs): FHIR Bulk Data Export ($export) — async kick-off, postgres-s3 multi-instance, Inferno v2.0.0#108
feat(persistence,rest,hfs): FHIR Bulk Data Export ($export) — async kick-off, postgres-s3 multi-instance, Inferno v2.0.0#108aacruzgon wants to merge 83 commits into
Conversation
Free function exposed at crate root that dispatches per FhirVersion to
the existing helios_fhir::{r4,r4b,r5,r6}::get_compartment_params helpers.
Lets persistence reuse the lookup without depending on helios-rest.
…t handler Drops the private get_compartment_params_for_version wrapper in favor of the new shared dispatch on the helios-fhir crate.
Returned by fenced ExportWorkerStorage methods when a stale worker's mutation is rejected because the job has been reclaimed.
- ExportRequest gains until / elements / include_associated_data / patient_refs - ExportManifest gains deleted / link (IG-required) - New StartExportInput bundles kickoff metadata (transaction_time, request_url, owner_subject, fhir_version) - New RawExportManifest / RawManifestEntry: storage-side manifest carrying ExportPartKey rather than wire URLs - New ExportJobMetadata, ExportFileMetadata, ExpiredExportRef - New GroupExportProvider::get_group_members_with_periods (default impl derived from get_group_members) so backends can surface Group.member.period.start for the _since-newly-added filter - BulkExportStorage gains start_export(StartExportInput) signature, RawExportManifest return, get_export_job_metadata, get_export_file_metadata, count_active_exports, list_expired_exports
ExportPartKey (with embedded fencing_token), ExportPartWriter (line + byte counter over a boxed AsyncWrite), FinalizedPart, DownloadUrl, and the ExportOutputStore trait. Decouples 'where the bytes go' from the job-state backend.
…rker - WorkerId, ExportJobLease (with fencing_token), LeaseError - ExportClaimStrategy: claim_next + heartbeat + release - ExportWorkerStorage: every method fenced by (worker_id, fencing_token) so a stale worker cannot mutate progress, file rows, or terminal status after its lease has been reclaimed - BulkExportJobStore marker trait (BulkExportStorage + ExportWorkerStorage + ExportClaimStrategy) for bootstrap-time selection of the job store - DefaultExportWorker drives a claimed job to completion under its lease, applying _typeFilter / _since / _until / _elements, supporting resume from the persisted cursor, and honoring since_newly_added=exclude via Group.member.period.start
…umns bulk_export_jobs: worker_id, lease_expiry, fencing_token, heartbeat_at, owner_subject, request_url, fhir_version + idx_export_jobs_claim. bulk_export_files: part_index, fencing_token + a backfill that assigns 0-based sequential part_index per (job_id, file_type, resource_type) before creating the unique idx_export_files_part. Includes test exercising the duplicate-row backfill case.
- start_export(StartExportInput): persists frozen kickoff metadata - get_export_manifest -> RawExportManifest assembled from rows - get_export_job_metadata / get_export_file_metadata - count_active_exports / list_expired_exports - ExportClaimStrategy via process-local mutex + INSERT/UPDATE - ExportWorkerStorage: every mutation fenced by worker_id + fencing_token (UPDATE … WHERE worker_id=? AND fencing_token=? for terminals, WHERE EXISTS-guarded ON CONFLICT upserts for progress + file rows) - get_group_members_with_periods reads Group.member.period.start - resolve_group_patient_ids flattens nested Groups with a cycle guard - Tests: stale-worker fencing, claim/lifecycle, group-cycle, since_newly_added
ALTER TABLE bulk_export_jobs ADD COLUMN IF NOT EXISTS … for the lease fields, owner_subject, request_url, fhir_version. ALTER bulk_export_files for part_index + fencing_token; ROW_NUMBER() backfill before the unique idx_export_files_part.
PostgresSkipLocked claim strategy (FOR UPDATE SKIP LOCKED inside a transaction), fully-fenced ExportWorkerStorage (every mutation guarded by worker_id + fencing_token), all new BulkExportStorage methods, get_group_members_with_periods + nested-Group flattening with cycle guard. Bind sites use i32 / i64 to match the actual column types on bulk_export_progress / bulk_export_files.
Default impl reports unsupported; AwsS3Client overrides it via PresigningConfig from the AWS SDK. Used by S3OutputStore to mint direct-from-S3 download URLs for the bulk-export manifest.
Reserved for future S3OutputStore integrations; unused now that S3 is output-only and keys live in S3OutputStore::object_key.
S3 is no longer a bulk-export job-state backend; the model is preserved for a future read-modify-write integration.
Reserved for future S3OutputStore integration; unused now that the synchronous BulkExportStorage path has been removed.
S3 is output-only for bulk export — job state lives in SQLite or PostgreSQL. Drops the synchronous start_export / run_export_job path and adds stub PatientExportProvider / GroupExportProvider impls returning UnsupportedCapability so an S3-resource-storage deployment satisfies the trait hierarchy.
ExportOutputStore impl backed by AwsS3Client. open_writer returns a
local scratch tempfile; finalize_part fsyncs + put_object's it to S3
under {tenant}/exports/{job_id}/{file_type}-{rt}-{part}-{token}.ndjson.
download_url either pre-signs (Auto / AlwaysPresigned) or returns an
HFS-served URL (AlwaysToken). delete_job_outputs lists + deletes by
prefix. AccessTokenMode encodes the requires_access_token posture.
bulk_export_start_manifest_and_delete is gone (the impl was removed); bulk_export_invalid_format_and_fetch_batch_cursor is reduced to the fetch_export_batch cursor case which still exercises ExportDataProvider.
postgres_integration_export_claim_skip_locked: claim ordering, fencing token bumps. postgres_integration_export_stale_worker_fenced_out: LeaseLost on every fenced ExportWorkerStorage call after reclaim. postgres_integration_export_count_active_and_expire: count + list filtering. claim_specific helper drains foreign jobs so tests can cope with the shared SHARED_PG container.
…add S3OutputStore round-trip The lifecycle test now exercises the remaining ExportDataProvider surface. Adds test_minio_s3_output_store_round_trip: write → finalize → pre-signed GET → open_reader → idempotent delete against MinIO.
…ort_batch S3 is no longer a bulk-export job-state backend; verify the ExportDataProvider data feed instead.
ExportOutputStore impl backed by tokio::fs. open_writer creates a
.tmp under ${HFS_DATA_DIR}/exports/{tenant}/{job_id}/, finalize_part
fsyncs + atomic rename, download_url returns an HFS-served URL with
requires_access_token=true, open_reader serves the file, and
delete_job_outputs is idempotent. Includes a write→finalize→read→delete
round-trip test.
ExportDataProvider / PatientExportProvider / GroupExportProvider impls returning UnsupportedCapability so MongoDB can satisfy the trait hierarchy without supporting bulk export as a primary.
CompositeStorage gains an export_provider: Option<DynGroupExportProvider> field set by with_full_primary (with the new GroupExportProvider bound on T). Each trait method delegates to the primary or returns UnsupportedCapability when no primary impl is wired in.
Authorizes the HFS-served (requires_access_token=true) download path
using the helios_auth Principal — checks ownership against
job_owner_subject (or system/* wildcard) plus a system/{ResourceType}.rs
scope. Pre-signed downloads bypass HFS and never reach this trait.
bulk_export_jobs: Arc<dyn BulkExportJobStore>, bulk_export_output: Arc<dyn ExportOutputStore>, bulk_export_file_auth: Arc<dyn ExportFileAuth>, plus an Arc<BulkExportConfig>. New with_bulk_export(...) builder and accessors so handlers can reach the subsystem behind feature toggles without touching the resource-storage S type parameter.
Full configuration surface: enabled, backend (embedded|postgres-s3), output_backend (local-fs|s3), output_dir, s3_bucket, requires_access_token (auto|true|false), file_url_ttl_secs, output_ttl_secs, worker_concurrency, disable_local_worker, max_concurrent_per_tenant, batch_size, lease_duration_secs, heartbeat_interval_secs, cleanup_interval_secs, since_newly_added (include|exclude). validate() rejects local-fs + requires_access_token=false (no pre-signed URL capability).
… batch queries The keyset cursor stores timestamps as RFC 3339 strings (e.g. "2026-05-15T22:35:24Z|<id>"). When the second+ batch was fetched the cursor part was pushed into the tokio-postgres param list as a Rust String (TEXT). PostgreSQL's extended query protocol infers the expected type from the column context (TIMESTAMPTZ), so it rejected the TEXT binding with a type error, failing every paginated export job after its first batch. Fix all three cursor sites in fetch_export_batch and fetch_patient_compartment_batch to parse the timestamp with DateTime::parse_from_rfc3339 and push a DateTime<Utc> so the wire type matches the inferred TIMESTAMPTZ.
POST /Patient/$export accepts a `patient` parameter to scope the export to specific patients. The request stores those references in ExportRequest::patient_refs, but the worker's Patient-level branch always called fetch_export_batch (unfiltered), so every resource of each type was returned regardless of which patient was requested. Add a new match arm that fires when ExportLevel::Patient and patient_refs is non-empty: it strips the "Patient/" prefix from each ref and delegates to fetch_patient_compartment_batch, which correctly scopes results to those patients' compartments. The existing arm (no patient filter) is unchanged for generic /Patient/$export calls.
Allow the generated Keycloak client to accept Inferno's five-minute private_key_jwt assertion lifetime, avoiding token endpoint 400s that cascade into export 401s. Expose S256 in SMART discovery so the Inferno SMART Backend Services checks see the expected code_challenge_methods_supported metadata.
Carry the access-token-bearing smart_auth_info emitted by the SMART Backend Services group into the export group so Inferno sends authenticated kickoff requests. Advertise authorization_code in SMART discovery when an authorization endpoint is configured, matching Inferno STU2 well-known expectations.
Add the vital-signs category to the seeded heart-rate Observation so R4 profile validation passes when Inferno applies the Heart Rate profile. Treat file-server TLS checks as known omitted in the HTTP-only CI export file setup.
| ``` | ||
|
|
||
| The full configuration surface (`HFS_BULK_EXPORT_*` env vars, single- vs | ||
| multi-instance recipes, parameter behavior) is documented in `CLAUDE.md`. |
There was a problem hiding this comment.
Should not be documented here - should be in the export README.md
There was a problem hiding this comment.
I have fixed this issue
| The full configuration surface (`HFS_BULK_EXPORT_*` env vars, single- vs | ||
| multi-instance recipes, parameter behavior) is documented in `CLAUDE.md`. | ||
| A docker-compose stack for the multi-instance topology lives at | ||
| `docker/bulk-export/docker-compose.yml`, and a manual Inferno Bulk Data IG |
There was a problem hiding this comment.
Is this provided as an example, or is it used for the GitHub Action workflow tests? If only for workflow tests, we can move this comment to the export README.md and not feature it on the hfs README.md. In the future, we will be providing a library of common configuration examples.
There was a problem hiding this comment.
Yeah it is a provided example, is not part of the GitHub Action workflow test. I have address the documentation for this accordingly.
|
|
||
| /// Deletes the object at the given key. Succeeds even if the key does not | ||
| /// exist. | ||
| /// exist. Reserved for the Phase 2 `S3OutputStore` integration. |
There was a problem hiding this comment.
Is this still required - dead code?
There was a problem hiding this comment.
I have fixed this issue
| } | ||
|
|
||
| /// Key for the JSON state object of a bulk export job. | ||
| #[allow(dead_code)] |
There was a problem hiding this comment.
#[allow(dead_code)] - check these in this file
There was a problem hiding this comment.
I have fixed this issue
| /// Reserved for the Phase 2 `S3OutputStore` integration; the S3 backend is no | ||
| /// longer a bulk-export *job-state* backend (job state lives in SQLite or | ||
| /// PostgreSQL), so this type is currently unused. | ||
| #[allow(dead_code)] |
There was a problem hiding this comment.
#[allow(dead_code)] - check this
There was a problem hiding this comment.
I have fixed this issue
|
|
||
| /// Deletes the object at `key`. Succeeds silently if the key does not exist. | ||
| /// Reserved for the Phase 2 `S3OutputStore` integration. | ||
| #[allow(dead_code)] |
There was a problem hiding this comment.
#[allow(dead_code)] - check
There was a problem hiding this comment.
I have fixed this issue
| {"backend":"sqlite","bulk_mode":"postgres-s3","expectation":"full"}, | ||
| {"backend":"postgres","bulk_mode":"embedded-local","expectation":"full"}, | ||
| {"backend":"postgres","bulk_mode":"postgres-s3","expectation":"full"}, | ||
| {"backend":"sqlite-elasticsearch","bulk_mode":"embedded-local","expectation":"endpoint-unavailable"}, |
There was a problem hiding this comment.
For these backend combinations - why are they unsupported or endpoint-unavailable? Seems like we should be able to support $export on all of the backend types with the exception of s3-only. For s3-only, we should be able to support these $export parameters, but the others will not be feasible without a lot of extra filtering logic that doesn't make much sense to implement at the moment. _outputFormat, _type, _elements, patient, includeAssociatedData, organizeOutputBy, allowPartialManifests
There was a problem hiding this comment.
I have corrected this issue
| serde_json::json!({ | ||
| let mut operations = vec![ | ||
| serde_json::json!({ | ||
| "name": "validate", |
There was a problem hiding this comment.
We don't yet support $validate - why was this added?
There was a problem hiding this comment.
The resource validate already existed before my PR, all I did was refactor the inline operation into what you see right now so it could conditionally append Bulk Data operations: export, patient-export, group-export, Bulk Data instantiates when HFS_BULK_EXPORT_ENABLED=true.
smunini
left a comment
There was a problem hiding this comment.
Good start!! See comments.
S3 no longer owns bulk-export job state after the output-store split. Job rows, progress, leases, file metadata, and manifests live in SQLite or PostgreSQL while S3 stores finalized output objects. Update the persistence README capability notes, S3 backend scope, S3+Elasticsearch guidance, and object model to describe the current S3OutputStore layout.
S3OutputStore calls S3Api::delete_object during export-output cleanup, so the trait method is no longer dead code. Remove the stale dead-code allowance and old Phase 2 note.
S3 no longer stores bulk-export job state, progress, manifests, or output parts under the old bulk/export/jobs keyspace. Remove the unused helper methods for that obsolete object layout.
Bulk-export job state now belongs to SQLite or PostgreSQL. Remove the unused S3 ExportJobState type and update the module docs so S3 models only describe history and bulk-submit state.
The S3Backend helper was only a dead-code wrapper around S3Api::delete_object. S3OutputStore performs cleanup through the S3Api trait directly, so the backend wrapper can be deleted.
Upgrade astral-tokio-tar from 0.6.1 to 0.6.2 in Cargo.lock to clear RUSTSEC-2026-0145, which is pulled in through testcontainers.
# Conflicts: # .github/workflows/bulk-export-smoke.yml
FHIR Bulk Data Export
Implements the FHIR Bulk Data Access IG $export family (system / patient / group) end-to-end, per Discussion #104. Embedded single-instance (SQLite job state + local-FS output + in-process worker pool) is the zero-config default; a multi-instance topology (PostgreSQL job state + S3-compatible output with pre-signed download URLs) is selected at startup with no handler changes. Ships with an external smoke workflow that exercises both topologies on every run and an Inferno Bulk Data IG v2.0.0 conformance workflow against the full SMART Backend Services + Keycloak stack.
Why
Bulk export is the API population-health platforms, payer-provider exchanges, registries, and research/AI pipelines converge on. CRUD + search are not enough once a workload needs every Observation for every patient in a cohort — that's a data-engineering problem (long-running work, durable state, fileserver bandwidth, multi-instance fan-out) rather than a request/response one. The IG defines an asynchronous, manifest-based, NDJSON-over-HTTPS pattern; this PR ships it as a first-class HFS subsystem.
Changes
Persistence — new traits + types (helios-persistence)
Persistence — backends
REST (helios-rest)
<S>, …, BulkExportBundle) sharing the inner build_app.Auth (helios-auth)
helios-fhir
helios-fhirpath
helios-hfs
Ops + docs
Testing
Notes
Implements Discussion #104.