From 8c2d7a4b5e2b15f4ad791a98ebba61555ed4be01 Mon Sep 17 00:00:00 2001 From: Kavya Bhat Date: Wed, 25 Mar 2026 14:32:13 -0600 Subject: [PATCH] Broadcast watermark while reading from filesource --- .../src/single_file_custom/source.rs | 82 ++++++++++++++++++- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/crates/arroyo-connectors/src/single_file_custom/source.rs b/crates/arroyo-connectors/src/single_file_custom/source.rs index 8b5c697b6..03fcd7768 100644 --- a/crates/arroyo-connectors/src/single_file_custom/source.rs +++ b/crates/arroyo-connectors/src/single_file_custom/source.rs @@ -9,7 +9,7 @@ use arroyo_operator::SourceFinishType; use arroyo_rpc::formats::{BadData, Format, Framing}; use arroyo_rpc::grpc::rpc::{StopMode, TableConfig}; use arroyo_rpc::ControlMessage; -use arroyo_types::UserError; +use arroyo_types::{SignalMessage, UserError, Watermark}; use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -175,8 +175,11 @@ impl SingleFileCustomSourceFunc { } } None => { - collector.flush_buffer().await?; info!("Finished reading {} records from {}", self.records_read, self.path); + // Emit a far-future watermark to flush all pending windows + let flush_ts = SystemTime::now() + Duration::from_secs(60 * 60 * 24 * 365); + collector.broadcast(SignalMessage::Watermark(Watermark::EventTime(flush_ts))).await; + collector.flush_buffer().await?; return Ok(SourceFinishType::Final); } } @@ -190,6 +193,69 @@ impl SingleFileCustomSourceFunc { } } + fn get_min_max_timestamp( + &self, + batch: &RecordBatch, + ts_idx: usize, + ) -> (Option, Option) { + use arrow::array::AsArray; + use arrow::datatypes::{DataType, TimeUnit}; + + let ts_array = batch.column(ts_idx); + let data_type = ts_array.data_type().clone(); + + let (min_nanos, max_nanos): (Option, Option) = match &data_type { + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let arr = ts_array.as_primitive::(); + let vals: Vec = (0..batch.num_rows()) + .map(|i| arr.value(i) * 1_000_000) + .collect(); + ( + vals.iter().copied().reduce(i64::min), + vals.iter().copied().reduce(i64::max), + ) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let arr = ts_array.as_primitive::(); + let vals: Vec = (0..batch.num_rows()).map(|i| arr.value(i)).collect(); + ( + vals.iter().copied().reduce(i64::min), + vals.iter().copied().reduce(i64::max), + ) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let arr = ts_array.as_primitive::(); + let vals: Vec = (0..batch.num_rows()) + .map(|i| arr.value(i) * 1_000) + .collect(); + ( + vals.iter().copied().reduce(i64::min), + vals.iter().copied().reduce(i64::max), + ) + } + DataType::Int64 => { + let arr = ts_array.as_primitive::(); + let vals: Vec = match self.ts_format { + TsFormat::UnixMillis => (0..batch.num_rows()) + .map(|i| arr.value(i) * 1_000_000) + .collect(), + TsFormat::UnixSeconds => (0..batch.num_rows()) + .map(|i| arr.value(i) * 1_000_000_000) + .collect(), + _ => vec![], + }; + ( + vals.iter().copied().reduce(i64::min), + vals.iter().copied().reduce(i64::max), + ) + } + _ => (None, None), + }; + + let to_systime = |nanos: i64| UNIX_EPOCH + Duration::from_nanos(nanos as u64); + (min_nanos.map(to_systime), max_nanos.map(to_systime)) + } + async fn run_parquet( &mut self, ctx: &mut SourceContext, @@ -256,9 +322,21 @@ impl SingleFileCustomSourceFunc { batch = stream.next() => { match batch { Some(Ok(batch)) => { + // Broadcast min timestamp BEFORE collecting so windows aren't dropped as late + let (min_ts, max_ts) = self.get_min_max_timestamp(&batch, ts_idx); + if let Some(ts) = min_ts { + collector.broadcast(SignalMessage::Watermark(Watermark::EventTime(ts))).await; + } + // Extract timestamps from the batch and add _timestamp column let out_batch = self.add_timestamp_to_batch(&batch, ts_idx)?; collector.collect(out_batch).await; + + // Advance watermark to max timestamp AFTER collecting + if let Some(ts) = max_ts { + collector.broadcast(SignalMessage::Watermark(Watermark::EventTime(ts))).await; + } + self.records_read += batch.num_rows() as u64; if self.records_read % LOG_INTERVAL == 0 {