feat: Add now() to troubleshoot pipeline latencies#19386
Conversation
|
Fixes #18208 |
Co-authored-by: aho135 <andrewho135@gmail.com>
|
@abhishekrb19 Curious if you have any thoughts on this old issue: #6513 Seems like the reservations were mainly for using |
| `now()` is non-deterministic: replicated streaming tasks and task replays evaluate it to different | ||
| values, producing inconsistent results across replicas. Do not use `now()` to overwrite `__time`. | ||
| For Kafka, prefer [`kafka.timestamp`](../ingestion/data-formats.md#kafka) as the | ||
| `__time` source. |
There was a problem hiding this comment.
I'm not sure if it makes sense to mention Kafka here. The timestamps aren't equivalent as one is the time the event lands on Kafka and the other is the time the event is ingested in Druid
| |----|-----------| | ||
| |now|now() returns the current system timestamp in milliseconds since epoch (1970-01-01 00:00:00 UTC). This function is evaluated for each row at processing time. It's recommended to use this only for troubleshooting issues - see [Using now() in ingestion](#using-now-in-ingestion).| | ||
| |timestamp|timestamp(expr[,format-string]) parses string expr into date then returns milliseconds from java epoch. without 'format-string' it's regarded as ISO datetime format | | ||
| |unix_timestamp|same with 'timestamp' function but returns seconds instead | |
There was a problem hiding this comment.
| |unix_timestamp|same with 'timestamp' function but returns seconds instead | | |
| |unix_timestamp|unix_timestamp(expr[,format-string]) same with 'timestamp' function but returns seconds instead | |
Just for consistency with other functions
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 1 |
| P3 | 0 |
| Total | 1 |
This is an automated review by Codex GPT-5
| { | ||
| // Return analysis indicating this is NOT constant | ||
| // by pretending we have a free variable | ||
| return new BindingAnalysis(); |
There was a problem hiding this comment.
[P2] Prevent now() from being planned as constant
Returning an empty BindingAnalysis makes ExpressionPlanner mark now() and expressions that only depend on now() as CONSTANT. Query-time expression selectors then build a ConstantExprEvalSelector and evaluate now() once when the selector is created, and expression filters/virtual columns also get stable cache keys, so native queries can reuse stale timestamps instead of evaluating at processing time. Add explicit non-deterministic handling, or otherwise keep now() out of constant selector/index/cache optimizations.
There was a problem hiding this comment.
yes, good catch, this is fixed in the latest commit.
There was a problem hiding this comment.
Thanks, this fixes the ExpressionPlan constant trait, but I think the selector path can still fold now() once. NowExpression.analyzeInputs() now reports only the synthetic __dummy__ binding; for normal segments that column has no capabilities and its selector is nil, so ExpressionSelectors.createBindings drops it and returns InputBindings.nilBindings(). makeExprEvalSelector then treats those nil bindings as a constant expression and returns new ConstantExprEvalSelector(plan.getExpression().eval(bindings)), so now() is still evaluated when the selector is created instead of at row/query processing time. The new tests cover the planner only; could you add selector-level coverage and avoid using a missing dummy input as the non-determinism marker?
There was a problem hiding this comment.
Ack, updated the code with your non-determinism suggestion
There was a problem hiding this comment.
Thanks, the selector constant-folding path I called out is fixed now: now() carries a non-deterministic analysis flag and ExpressionSelectors skips ConstantExprEvalSelector for it. I think two related constant/cache paths still need the same treatment, though. Expr.asBitmapColumnIndex still treats any expression with no required bindings as a constant bitmap index, so an expression filter around now() can still be evaluated once while building the bitmap. Also, ExpressionDimFilter and ExpressionVirtualColumn cache keys still serialize the stable expression cache key (now() stringifies the same way every time), so cached query results can be reused across wall-clock changes. Could you either make non-deterministic expressions non-cacheable for these query components, or include a non-stable cache key / disable the affected optimizations when analysis.isNonDeterministic() is true?
…ns. Fix constant handling.
@aho135 @FrankChen021 - thanks for the reviews and for pointing me to the previous issues. I’ve added validation to restrict |
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.
Reviewed 17 of 17 changed files.
This is an automated review by Codex GPT-5.5
| // Verify the same instability propagates when now() is nested in a parent expression. | ||
| Expr nested = Parser.parse("now() > 0", ExprMacroTable.nil()); | ||
| Assertions.assertFalse( | ||
| java.util.Arrays.equals(nested.getCacheKey(), nested.getCacheKey()), |
There was a problem hiding this comment.
| java.util.Arrays.equals(nested.getCacheKey(), nested.getCacheKey()), | |
| Arrays.equals(nested.getCacheKey(), nested.getCacheKey()), |
Can be imported
| )::get; | ||
|
|
||
| if (ColumnHolder.TIME_COLUMN_NAME.equals(name) | ||
| && BuiltInExprMacros.NowExprMacro.containsNow(parsedExpression.get())) { |
There was a problem hiding this comment.
Would it make sense to just check isNonDeterministic() instead of containsNow?
There was a problem hiding this comment.
Yeah, good point, updated. I think we can make this be the case for all non-deterministic expressions (now() being the only one currently)
| /** | ||
| * now() depends on wall-clock time, so its cache key must not be stable across calls — otherwise queries containing | ||
| * now() can hit a stale cached result. NowExpression overrides {@link Expr#decorateCacheKeyBuilder} to mix in | ||
| * nanoTime, which makes any containing Expr produce a fresh key on every {@link Expr#getCacheKey()} call. |
There was a problem hiding this comment.
Stale comment, cacheKey is monotonically increasing long
There was a problem hiding this comment.
Removed this test doc altogether
| private Supplier<byte[]> makeCacheKeySupplier() | ||
| { | ||
| return Suppliers.memoize(() -> { | ||
| return () -> { |
There was a problem hiding this comment.
Should memoize be conditional depending on whether the expression isNonDeterministic?
There was a problem hiding this comment.
It shouldn't be needed bc non-deterministic expressions like NowExpr.decorateCacheKeyBuilder() appends an incrementing nonce, which makes the cache key unique every time it's built.
There was a problem hiding this comment.
I see that this was reverted in the latest commit. Was that intentional?
There was a problem hiding this comment.
That's right, it was reverted to the version in master to align with decorateCacheKeyBuilder()
aho135
left a comment
There was a problem hiding this comment.
LGTM, thanks @abhishekrb19!
Fixes #18208
Adds a
now()expression macro to Druid's native expression. It returns the current system timestamp in milliseconds since epoch, evaluated per row at processing time. Intended primarily for ingestion-time troubleshooting - e.g.,capturing
now() - __timeas an ingestion_lag_ms dimension to measure end-to-end pipeline delay. This expression is only allowed for non-__timecolumns.Release note
Added
now()expression function that returns the current system timestamp in milliseconds since epoch. Useful at ingestion time for troubleshooting pipeline delays (e.g., now() - __time). Note:now()is non-deterministic as it evaluates for every row, so it can break idempotency. This can only be added to columns that are not__time.This PR has: