[FFL-1720] Evaluation Logging: Aggregation Engine & Test Utilities#3145
[FFL-1720] Evaluation Logging: Aggregation Engine & Test Utilities#3145typotter wants to merge 39 commits intofeature/flags-evaluations-loggingfrom
Conversation
e1704c3 to
95f4c0a
Compare
This stack of pull requests is managed by Graphite. Learn more about stacking. |
95f4c0a to
a0e7df9
Compare
This comment has been minimized.
This comment has been minimized.
...oid-flags/src/main/kotlin/com/datadog/android/flags/internal/aggregation/AggregationStats.kt
Outdated
Show resolved
Hide resolved
...ndroid-flags/src/main/kotlin/com/datadog/android/flags/internal/EvaluationEventsProcessor.kt
Outdated
Show resolved
Hide resolved
...ndroid-flags/src/main/kotlin/com/datadog/android/flags/internal/EvaluationEventsProcessor.kt
Outdated
Show resolved
Hide resolved
...ndroid-flags/src/main/kotlin/com/datadog/android/flags/internal/EvaluationEventsProcessor.kt
Show resolved
Hide resolved
...ndroid-flags/src/main/kotlin/com/datadog/android/flags/internal/EvaluationEventsProcessor.kt
Show resolved
Hide resolved
...ndroid-flags/src/main/kotlin/com/datadog/android/flags/internal/EvaluationEventsProcessor.kt
Outdated
Show resolved
Hide resolved
...ndroid-flags/src/main/kotlin/com/datadog/android/flags/internal/EvaluationEventsProcessor.kt
Outdated
Show resolved
Hide resolved
...oid-flags/src/main/kotlin/com/datadog/android/flags/internal/aggregation/AggregationStats.kt
Show resolved
Hide resolved
| // Take atomic snapshot of statistics to prevent torn reads | ||
| val snapshotCount: Int | ||
| val snapshotFirst: Long | ||
| val snapshotLast: Long | ||
| val snapshotMessage: String? | ||
| synchronized(this) { | ||
| snapshotCount = count | ||
| snapshotFirst = firstEvaluation | ||
| snapshotLast = lastEvaluation | ||
| snapshotMessage = lastErrorMessage | ||
| } |
There was a problem hiding this comment.
minor: this piece is confusing. These fields are volatile, so torn read is not possible. Otherwise, this class is not designed to handle concurrent calls to recordEvaluation and toEvaluationEvent as that may leave some events uncounted.
There was a problem hiding this comment.
perhaps this is being overly defensive. Because of the CAS of the map of aggregation stats for flushing, _no other thread should be calling toEvaluationEvent or recordEvaluation on this AggregationStats instance.
Without this synchronized, recordEvaluation could change the computed values while we're getting them for constructing the FlagEvaluation
There was a problem hiding this comment.
With the synchronized CAS snapshotting the aggregationMap for flushing, this method should be protected from another thread calling recordEvaluation which could change the values as they're being grabbed for the FlagEvaluation constructor
Is this too defensive?
...droid-flags/src/main/kotlin/com/datadog/android/flags/internal/aggregation/AggregationKey.kt
Outdated
Show resolved
Hide resolved
...dk-android-flags/src/main/kotlin/com/datadog/android/flags/internal/EvaluationEventWriter.kt
Outdated
Show resolved
Hide resolved
...ndroid-flags/src/main/kotlin/com/datadog/android/flags/internal/EvaluationEventsProcessor.kt
Outdated
Show resolved
Hide resolved
eb13adb to
c924067
Compare
97e4a4e to
eb0ace5
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## feature/flags-evaluations-logging #3145 +/- ##
=====================================================================
+ Coverage 70.81% 70.99% +0.18%
=====================================================================
Files 901 905 +4
Lines 33172 33314 +142
Branches 5596 5618 +22
=====================================================================
+ Hits 23489 23648 +159
+ Misses 8118 8106 -12
+ Partials 1565 1560 -5
🚀 New features to boost your workflow:
|
c924067 to
a031b97
Compare
968dc68 to
80c8de7
Compare
ddeaaa3 to
75827d0
Compare
2eb16ca to
0cfb30a
Compare
0cfb30a to
141cded
Compare
1c0c827 to
00b8f55
Compare
141cded to
4bfc37e
Compare
00b8f55 to
7dba2fe
Compare
4bfc37e to
550f9f0
Compare
e09cbe5 to
66b62f4
Compare
typotter
left a comment
There was a problem hiding this comment.
Thanks Oleksii, as always for your 👀 . ptal
| private val context: EvaluationContext, | ||
| private val service: String?, | ||
| private val rumApplicationId: String?, | ||
| private val rumViewName: String?, |
There was a problem hiding this comment.
nit: view name is duplicated in aggregation key. do we need it here?
...droid-flags/src/main/kotlin/com/datadog/android/flags/internal/aggregation/AggregationKey.kt
Show resolved
Hide resolved
| runtimeDefaultUsed = reason == null || | ||
| reason == ResolutionReason.DEFAULT.name || | ||
| reason == ResolutionReason.ERROR.name |
There was a problem hiding this comment.
minor: the condition I would use here is aggregationKey.variantKey == null. Reasons are somewhat less reliable and it's valid to return runtime default with reasons besides "default" or "error"
| if (shouldFlush) { | ||
| flush() | ||
| } |
There was a problem hiding this comment.
minor: if there are many threads recording evaluations, it's still possible to flush too frequently:
- Thread A records an event,
shouldFlushis true - Thread B records an event,
shouldFlushis true - Thread A flushes all events, aggregation map is now empty
- Thread C records an event,
shouldFlushis false - Thread B tries to flush, flushes a single event (written by C)
Note in my previous message, the size limit is re-checked after acquiring a write lock and holding it through to the flush.
Two ways we can implement this in the current structure: either add a parameter to flush() for it to do an extra check and only flush if the size exceeds the limit. Alternatively, we can make aggregator.record() return drained records instead of a flag—then we can keep this synchronization entirely inside aggregator.
There was a problem hiding this comment.
return drained records instead of a flag—then we can keep this synchronization entirely inside aggregator.
Interesting approach
There was a problem hiding this comment.
yeah, the only downside is that it doesn't cancel scheduled flush holding the same lock, so it's possible for:
- Thread A calls
aggregator.record()which triggers size limit and drains records - Thread B writes more records
- Thread C — scheduled flush triggers
- Thread A flushes returned records
- Thread C continues with scheduled flush and writes just a few records
This is probably less severe than the previous situation with extra flushes due to size limit because there's at most one scheduled flush whereas the number of threads producing evaluations can be much higher.
...flags/src/main/kotlin/com/datadog/android/flags/internal/aggregation/EvaluationAggregator.kt
Outdated
Show resolved
Hide resolved
...flags/src/main/kotlin/com/datadog/android/flags/internal/aggregation/EvaluationAggregator.kt
Show resolved
Hide resolved
| } | ||
| val toDrain = aggregationMap | ||
| aggregationMap = ConcurrentHashMap() | ||
| return toDrain.map { (_, stats) -> stats.toEvaluationEvent() } |
There was a problem hiding this comment.
minor: this mapping is best done outside of write lock
| ) | ||
|
|
||
| if (drainedEvents != null) { | ||
| writer.writeAll(drainedEvents) |
There was a problem hiding this comment.
re-schedule periodic flush here? (don't forget flushMutex, so we don't schedule multiple futures simultaneously)
| * | ||
| * @param events the evaluation events to write to storage | ||
| */ | ||
| fun writeAll(events: List<FlagEvaluation>) |
There was a problem hiding this comment.
nit: as I noted in another review, maybe it can be just write, especially if we don't expect another methods in this interface
| val flagKey: String, | ||
| val variantKey: String? = null, | ||
| val allocationKey: String? = null, | ||
| val targetingKey: String?, |
There was a problem hiding this comment.
Is there specific reason why targetingKey has no default value as other nullable properties of this class?
Should this property be nullable?
There was a problem hiding this comment.
It must be nullable because the user might not pass the targeting key. But null is not a good default value per se as that might hide issues/bugs.
I would suggest removing default values from the rest of the properties. If they are null, it's better to pass that explicitly
| import kotlin.concurrent.withLock | ||
|
|
||
| /** | ||
| * Aggregates flag evaluation events before writing them to storage. |
There was a problem hiding this comment.
It seems that aggregation logic can be done on the backend side, is there a reason we do this on the client side? Seems like doing it on the client side adds some complexity.
Or is it because otherwise we have to send too many events to the intake?
There was a problem hiding this comment.
Yep, the client-side aggregation is to reduce network usage. Flag evaluations can be on the hot path in users' code, and would generate tons of events without aggregation.
| val toDrain = aggregationMap | ||
| aggregationMap = ConcurrentHashMap() |
There was a problem hiding this comment.
this can be simpler:
val toDrain = aggregationMap.toMap()
aggregationMap.clear()
and then aggregationMap can be simply val without @Volatile
| errorMessage = errorMessage | ||
| ) | ||
|
|
||
| mapLock.read { |
There was a problem hiding this comment.
although aggregationMap.putIfAbsent below is a write operation
| existing?.recordEvaluation(timestamp, errorMessage) | ||
| } | ||
|
|
||
| if (aggregationMap.size < maxAggregations) { |
There was a problem hiding this comment.
note:
* Bear in mind that the results of aggregate status methods including
* {@code size}, {@code isEmpty}, and {@code containsValue} are typically
* useful only when a map is not undergoing concurrent updates in other threads.
but probably it is fine is .size call is off by a bit
| private var count: Int = 1 | ||
| private var firstEvaluation: Long = firstTimestamp | ||
| private var lastEvaluation: Long = firstTimestamp | ||
| private var lastErrorMessage: String? = errorMessage |
There was a problem hiding this comment.
I think this can be done simpler, without mutability and synchronisation primitives.
AggregationStats can be just a data class, meaning when update is needed .copy method is called at the usage side.
| FlagEvaluation.Rum( | ||
| application = FlagEvaluation.Application(id = appId), | ||
| view = aggregationKey.viewName?.let { viewName -> | ||
| FlagEvaluation.View(url = viewName) |
There was a problem hiding this comment.
Generally speaking View URL != View Name. Is there a mistake here?
| @Volatile | ||
| private var aggregationMap = ConcurrentHashMap<AggregationKey, AggregationStats>() | ||
|
|
||
| private val mapLock = ReentrantReadWriteLock() |
There was a problem hiding this comment.
if we guard concurrent access to this class from the processor side, why do we need another guard here?

🥞 Evaluation Logging Stacked Pull Requests 🥞
🔲 Integration & Configuration (PR #3147)
🔲 Storage & Network Infrastructure (PR #3146)
👉 Aggregation Engine & Test Utilities (this PR)
☑️ FlagEvaluation Schema (PR #3166)
☑️ Event Schema & Data Models (PR #3144)
☑️ Evaluations Subfeature (PR #3159)
⎿
feature/flags-evaluations-logging(feature branch)Datadog Internal
🎟️ Ticket: FFL-1720 - Implement Evaluation Logging for Android SDK
What does this PR do?
Implements the core evaluations aggregation engine that groups feature flag evaluations by key characteristics and manages periodic flushing from memory to persistent storage (via
StorageBackedFeature/EvaluationsFeature).The
EvaluationEventProcessormanages concurrent access and mutation ofAggregationStatsinstances in memory. Each Stats object is given anAggregationKey- a composite key made of the set of identifiers across the evaluation (targetingKey, variationKey, etc.) and SDK context (rum View ID). This ensures proper evaluation counting under the "same" circumstances.The processor also manages the periodic and size-based triggers for flushing the in-memory aggregations to persistent storage.
Motivation
We need to implement Evaluation Logging to provide comprehensive visibility into all feature flag evaluations, including defaults, errors, and successful matches. This goes beyond exposure logging by capturing aggregated metrics about evaluation frequency, error rates, and runtime default usage across all flags.
Description
This PR adds the core business logic for evaluation logging:
Implementation:
AggregationKey: Defines how evaluations are grouped (by flag, variant, allocation, targeting key, and error code)AggregationStats: Tracks count, first/last timestamps, and last error message for each aggregationEvaluationEventsProcessor: Orchestrates aggregation, manages flush triggers (time-based, size-based, shutdown)EvaluationEventWriter: Interface for persisting evaluation events (implementation in PR [FFL-1720] Evaluation Logging: Storage & Network Infrastructure #3146)Additional Notes
Review checklist (to be filled by reviewers)