diff --git a/ddtrace/internal/datadog/profiling/dd_wrapper/include/sample.hpp b/ddtrace/internal/datadog/profiling/dd_wrapper/include/sample.hpp index e0bf18adbdf..30e7cab50c4 100644 --- a/ddtrace/internal/datadog/profiling/dd_wrapper/include/sample.hpp +++ b/ddtrace/internal/datadog/profiling/dd_wrapper/include/sample.hpp @@ -183,6 +183,15 @@ class Sample // This is useful when the Sample object is embedded and will be destroyed later bool export_sample(); + // Flip the sign on the heap_space value so this sample is emitted as a + // tombstone (negative delta) by a subsequent export_sample() call. + // NOT idempotent: each call toggles the sign, so calling twice returns the + // original positive value. Heap-tracker callers must apply this exactly + // once per REMOVE event (see the tombstone_applied guard in + // _memalloc_heap.cpp) so retries on transient libdatadog rejection emit a + // stable negative value instead of toggling sign on every attempt. + void negate_heap_space(); + static ProfileBorrow profile_borrow(); static void postfork_child(); static void cleanup(); diff --git a/ddtrace/internal/datadog/profiling/dd_wrapper/src/sample.cpp b/ddtrace/internal/datadog/profiling/dd_wrapper/src/sample.cpp index 598e35e8c8f..771e8ccd839 100644 --- a/ddtrace/internal/datadog/profiling/dd_wrapper/src/sample.cpp +++ b/ddtrace/internal/datadog/profiling/dd_wrapper/src/sample.cpp @@ -431,6 +431,20 @@ Datadog::Sample::export_sample() return ProfilerState::get().profile_state.collect(sample, endtime_ns); } +void +Datadog::Sample::negate_heap_space() +{ + // Flip the sign on heap_space so a subsequent export_sample() emits this + // sample as a tombstone. libdatadog accepts negative i64 values + // (sample.rs Value is &[i64] with no sign validation). + if (0U != (type_mask & SampleType::Heap)) { + const size_t heap_space_idx = ProfilerState::get().profile_state.val().heap_space; + if (heap_space_idx < values.size()) { + values[heap_space_idx] = -values[heap_space_idx]; + } + } +} + bool Datadog::Sample::push_cputime(int64_t cputime, int64_t count) { diff --git a/ddtrace/profiling/collector/_memalloc_heap.cpp b/ddtrace/profiling/collector/_memalloc_heap.cpp index 74dc9725638..485dce5c6ef 100644 --- a/ddtrace/profiling/collector/_memalloc_heap.cpp +++ b/ddtrace/profiling/collector/_memalloc_heap.cpp @@ -128,6 +128,35 @@ class heap_tracker_t void postfork_child(); private: + /* Delta-export change event. ADD events reference the live traceback in + * allocs_m via raw pointer; REMOVE events own the extracted unique_ptr. + * + * The raw pointer in ADD is valid until export because the traceback is + * owned by either allocs_m or a subsequent REMOVE event in pending_changes + * (both are cleared together at export time). REMOVE events emit a + * negative tombstone — heap_space is negated lazily at emit time so an + * earlier ADD that aliases the same traceback (same ptr, same snapshot + * interval) reads the original positive value. `tombstone_applied` guards + * against re-negation when an emit fails and we retry. */ + struct change_event + { + enum Kind : uint8_t + { + ADD, + REMOVE + }; + void* ptr; + traceback_t* tb; + std::unique_ptr owner; // set for REMOVE, null for ADD + Kind kind; + bool tombstone_applied = false; // REMOVE only; true after heap_space is negated + uint8_t failed_attempts = 0; // export retries counter; events exceeding MAX are dropped + }; + + /* Cap retries on a single event before dropping it, so a persistent + * libdatadog rejection cannot grow pending_changes without bound. */ + static constexpr uint8_t MAX_EXPORT_RETRIES = 2; + uint32_t next_sample_size_no_cpython(uint32_t sample_size); /* This function is called from heap_tracker_t::postfork_child() as part of @@ -161,6 +190,31 @@ class heap_tracker_t /* Initial capacity of the allocations map */ static constexpr size_t INITAL_ALLOC_MAP_CAPACITY = 512; + + /* Delta export state. pending_changes accumulates ADD/REMOVE events between + * snapshots; export_heap_no_cpython drains it and emits the deltas. + * + * Pre-reserved at construction so steady-state push_back is allocation-free. + * On pathological bursts the vector may reallocate. The reallocation goes + * through ::operator new (system malloc), NOT PyMem_Malloc, so it does not + * reenter the Python allocator hooks where untrack_no_cpython runs from. + * (Reentry would also be caught by memalloc_reentrant_guard_t earlier in + * the call.) Dropping events at the push site instead would risk + * use-after-clear on a queued ADD whose traceback gets pool_put'd by a + * matching REMOVE that overflowed — letting the vector grow is the safe + * option. */ + static constexpr size_t PENDING_CHANGES_RESERVE = 4096; + std::vector pending_changes; + + /* Pair-collapse side map: ptr -> index of its pending ADD event in + * pending_changes. When untrack arrives for a ptr that has a still-pending + * ADD (i.e., the allocation was tracked and freed within the same snapshot + * interval), both events are canceled before reaching libdatadog — the ADD + * is marked `collapsed`, no REMOVE is queued, and the traceback returns to + * the pool immediately. Saves ~2 Profile_add2 calls per churn pair, which + * dominates CPU on alloc-then-free hot loops (see PR #18125 review). */ + static constexpr size_t PENDING_ADD_IDX_RESERVE = 1024; + HeapMapType pending_add_idx; }; // Pool implementation @@ -223,6 +277,8 @@ heap_tracker_t::heap_tracker_t(uint32_t sample_size_val) pool.reserve(POOL_CAPACITY); // Pre-allocate map capacity to avoid rehashing during ramp-up. allocs_m.reserve(INITAL_ALLOC_MAP_CAPACITY); + pending_changes.reserve(PENDING_CHANGES_RESERVE); + pending_add_idx.reserve(PENDING_ADD_IDX_RESERVE); } void @@ -231,9 +287,48 @@ heap_tracker_t::untrack_no_cpython(void* ptr) memalloc_gil_debug_guard_t guard(gil_guard); auto node = allocs_m.extract(ptr); - if (!node.empty()) { - pool_put_no_cpython(std::move(node.mapped())); + if (node.empty()) { + return; + } + auto owner = std::move(node.mapped()); + + /* Pair-collapse: if a still-pending ADD targets this same ptr, the + * allocation was tracked AND freed within this snapshot interval — both + * events would aggregate to a zero bucket on the backend anyway, so skip + * both and reclaim the traceback immediately. Eliminates the dominant + * libdatadog cost in alloc-then-free hot loops. + * + * Swap-and-pop keeps pending_changes compact instead of leaving the ADD + * marked dead in place: the last element moves into the freed slot, and + * if it's also a tracked ADD, its index entry in pending_add_idx is + * rewritten to point at the new position. Avoids paying the export-time + * iteration cost over a buffer that fills up with skipped entries under + * sustained alloc/free churn. */ + auto it = pending_add_idx.find(ptr); + if (it != pending_add_idx.end()) { + const size_t add_idx = it->second; + const size_t last_idx = pending_changes.size() - 1; + if (add_idx != last_idx) { + pending_changes[add_idx] = std::move(pending_changes[last_idx]); + // The moved element kept its own pending_add_idx entry pointing at + // last_idx; rewrite it if the moved element was an indexed ADD. + if (pending_changes[add_idx].kind == change_event::ADD) { + pending_add_idx[pending_changes[add_idx].ptr] = add_idx; + } + } + pending_changes.pop_back(); + pending_add_idx.erase(it); + pool_put_no_cpython(std::move(owner)); + return; } + + /* No pending ADD — the allocation was tracked in an earlier snapshot + * interval and the backend already integrated its positive contribution. + * Queue a REMOVE so the next export emits a negative tombstone. heap_space + * negation is deferred to emit time so any same-snapshot raw-ptr aliasing + * sees the original positive value first. */ + auto* raw = owner.get(); + pending_changes.push_back({ ptr, raw, std::move(owner), change_event::REMOVE }); } bool @@ -266,11 +361,18 @@ heap_tracker_t::add_sample_no_cpython(void* ptr, std::unique_ptr tb memalloc_gil_debug_guard_t guard(gil_guard); auto [it, inserted] = allocs_m.insert_or_assign(ptr, std::move(tb)); - (void)it; // Unused, but needed for structured binding /* This should always be a new insertion. If not, we failed to properly untrack a previous allocation. */ assert(inserted && "add_sample: found existing entry for key that should have been removed"); + /* Record ADD event referencing the live traceback. The raw pointer remains + * valid until export: either the entry stays in allocs_m, or a subsequent + * untrack moves ownership into a REMOVE event in pending_changes — both + * are cleared in the same export pass. Index this event in pending_add_idx + * so a same-snapshot-interval free can find and collapse it. */ + pending_changes.push_back({ ptr, it->second.get(), nullptr, change_event::ADD }); + pending_add_idx[ptr] = pending_changes.size() - 1; + // Get ready for the next sample reset_sampling_state_no_cpython(); } @@ -280,10 +382,74 @@ heap_tracker_t::export_heap_no_cpython() { memalloc_gil_debug_guard_t guard(gil_guard); - /* Iterate over live samples and export them */ - for (const auto& [ptr, tb] : allocs_m) { - (void)ptr; // Suppress unused variable warning - tb->sample.export_sample(); + /* Delta emission: ADDs reference live tracebacks via raw pointer (kept + * alive by either allocs_m or a later REMOVE event in this buffer). + * REMOVEs own the extracted unique_ptr and emit a negative tombstone; + * heap_space is negated lazily here (guarded by tombstone_applied) so an + * earlier ADD aliasing the same traceback emits the original positive + * value before the REMOVE flips the sign, and so retries on libdatadog + * rejection don't toggle the sign each attempt. + * + * On libdatadog rejection, retain the event for retry on the next snapshot + * — without this, an ADD or REMOVE that Profile_add2 refuses is silently + * dropped, causing permanent drift in the backend's running heap state. + * Bounded by MAX_EXPORT_RETRIES so a persistent rejection cannot grow + * pending_changes without bound. + * + * No periodic resync is emitted today: a full-snapshot would not carry a + * wire-format marker telling the backend to reset accumulated state, so + * the backend would double-count instead of recovering from drift. If a + * delta upload is dropped, the backend's state diverges from live until + * the next profiler restart (which clears allocs_m and pending_changes). + * Adding a resync requires either backend-side reset signaling or + * emitting compensating negative+positive pairs for every live entry — + * both are out of scope for v1. */ + + /* In-place compaction with a write index. Moving into a fresh vector and + * swapping would discard the constructor-side reserve (PENDING_CHANGES_RESERVE) + * on every export, forcing the hook path to re-grow the buffer on the next + * churn burst. Keeping the same storage preserves capacity. */ + size_t write_idx = 0; + for (size_t read_idx = 0; read_idx < pending_changes.size(); ++read_idx) { + auto& evt = pending_changes[read_idx]; + if (evt.kind == change_event::REMOVE && !evt.tombstone_applied) { + evt.tb->sample.negate_heap_space(); + evt.tombstone_applied = true; + } + if (evt.tb->sample.export_sample()) { + // Success: recycle the REMOVE traceback; ADDs leave their + // tracebacks in allocs_m where they were. + if (evt.kind == change_event::REMOVE) { + pool_put_no_cpython(std::move(evt.owner)); + } + continue; + } + // libdatadog rejected: retry next snapshot, up to MAX_EXPORT_RETRIES. + if (evt.failed_attempts >= MAX_EXPORT_RETRIES) { + if (evt.kind == change_event::REMOVE) { + pool_put_no_cpython(std::move(evt.owner)); + } + continue; + } + ++evt.failed_attempts; + if (write_idx != read_idx) { + pending_changes[write_idx] = std::move(evt); + } + ++write_idx; + } + pending_changes.resize(write_idx); + /* Rebuild the side map for the compacted pending_changes vector. The old + * map's indices referenced positions before compaction; without rebuilding, + * retained ADDs would lose their pair-collapse capability on the next + * snapshot (a REMOVE arriving for one of these ptrs would queue a tombstone + * instead of canceling the still-unsuccessful ADD). The cost is O(write_idx), + * which is bounded — retained events only appear on libdatadog rejection + * and are capped at MAX_EXPORT_RETRIES per event. */ + pending_add_idx.clear(); + for (size_t i = 0; i < pending_changes.size(); ++i) { + if (pending_changes[i].kind == change_event::ADD) { + pending_add_idx[pending_changes[i].ptr] = i; + } } Datadog::Sample::profile_borrow().stats().set_heap_tracker_size(allocs_m.size()); @@ -310,6 +476,12 @@ heap_tracker_t::postfork_child() // Profile::postfork_child() pool.clear(); + // Pending delta events may contain raw pointers into allocs_m and owned + // tracebacks; clear before dropping allocs_m so the raw pointers in any + // ADD events do not get stranded. Side map indices alias these events. + pending_changes.clear(); + pending_add_idx.clear(); + // Allocations map may contain data from the parent process, and also // traceback_t objects may reference invalid Profile state. allocs_m.clear(); diff --git a/tests/profiling/collector/test_memalloc.py b/tests/profiling/collector/test_memalloc.py index 2cdb123ccc3..630bf44df45 100644 --- a/tests/profiling/collector/test_memalloc.py +++ b/tests/profiling/collector/test_memalloc.py @@ -627,15 +627,19 @@ def test_memory_collector_python_interface_with_allocation_tracking(tmp_path: Pa assert alloc_space_idx >= 0, "alloc-space sample type not found in profile" assert alloc_count_idx >= 0, "alloc-samples sample type not found in profile" - # Validate all samples have valid values + # Validate all samples have valid values. Heap deltas can be negative + # (tombstones for freed allocations). pprof aggregation by stack can + # also produce all-zero buckets when an ADD and REMOVE pair targets + # the same stack within one upload window — those buckets are + # legitimate cancelled-delta artifacts, so skip them in the loop. for sample in final_profile.sample: - # Check that at least one value type is non-zero - has_heap = sample.value[heap_space_idx] > 0 - has_alloc = sample.value[alloc_space_idx] > 0 - assert has_heap or has_alloc, "Sample should have either heap-space or alloc-space > 0" - assert sample.value[alloc_count_idx] >= 0, ( - f"alloc-samples should be non-negative, got {sample.value[alloc_count_idx]}" - ) + heap = sample.value[heap_space_idx] + alloc = sample.value[alloc_space_idx] + count = sample.value[alloc_count_idx] + if heap == 0 and alloc == 0 and count == 0: + continue # cancelled ADD+REMOVE pair aggregated to zero bucket + assert heap != 0 or alloc > 0, "Sample should have either heap-space != 0 or alloc-space > 0" + assert count >= 0, f"alloc-samples should be non-negative, got {count}" # Get live samples (heap-space > 0) live_samples = [s for s in final_profile.sample if s.value[heap_space_idx] > 0] @@ -718,52 +722,68 @@ def test_memory_collector_python_interface_with_allocation_tracking_no_deletion( assert alloc_space_idx >= 0, "alloc-space sample type not found in profile" assert alloc_count_idx >= 0, "alloc-samples sample type not found in profile" - # Since no objects were deleted, heap samples should accumulate (first_batch + second_batch) - # Count heap samples in both profiles - after_first_heap_samples = [s for s in after_first_batch_profile.sample if s.value[heap_space_idx] > 0] - final_heap_samples = [s for s in final_profile.sample if s.value[heap_space_idx] > 0] - - assert len(final_heap_samples) > len(after_first_heap_samples), ( - f"Final should have more heap samples than after first batch (nothing deleted). " - f"Got final={len(final_heap_samples)}, after_first={len(after_first_heap_samples)}" + # With delta semantics, each upload carries only the changes since the + # previous one. The after-first-batch profile contains positive samples + # from batch one; the final profile contains positive samples from + # batch two. Verify each batch's deltas land in the corresponding + # upload rather than expecting cumulative live state in the final. + after_first_one_samples = [ + s + for s in after_first_batch_profile.sample + if s.value[heap_space_idx] > 0 and has_function_in_profile_sample(after_first_batch_profile, s, one) + ] + final_two_samples = [ + s + for s in final_profile.sample + if s.value[heap_space_idx] > 0 and has_function_in_profile_sample(final_profile, s, two) + ] + assert len(after_first_one_samples) > 0, ( + f"After-first-batch upload should contain positive deltas from batch one, " + f"got {len(after_first_one_samples)}" + ) + assert len(final_two_samples) > 0, ( + f"Final upload should contain positive deltas from batch two, got {len(final_two_samples)}" ) - # Validate all samples in final profile have valid values - for sample in final_profile.sample: - has_heap = sample.value[heap_space_idx] > 0 - has_alloc = sample.value[alloc_space_idx] > 0 - assert has_heap or has_alloc, "Sample should have either heap-space or alloc-space > 0" - assert sample.value[alloc_count_idx] >= 0, ( - f"alloc-samples should be non-negative, got {sample.value[alloc_count_idx]}" - ) - - # Get live samples (heap-space > 0) - live_samples = [s for s in final_profile.sample if s.value[heap_space_idx] > 0] - - batch_one_live_samples = [ - sample for sample in live_samples if has_function_in_profile_sample(final_profile, sample, one) + # Neither batch's own allocations were freed, so the buckets attributed + # to `one` and `two` should not carry negative tombstones in the final + # upload. (Negatives may appear from unrelated frees that happen + # incidentally inside the Python runtime — those use different stacks.) + one_negatives_final = [ + s + for s in final_profile.sample + if s.value[heap_space_idx] < 0 and has_function_in_profile_sample(final_profile, s, one) ] - - batch_two_live_samples = [ - sample for sample in live_samples if has_function_in_profile_sample(final_profile, sample, two) + two_negatives_final = [ + s + for s in final_profile.sample + if s.value[heap_space_idx] < 0 and has_function_in_profile_sample(final_profile, s, two) ] - - assert len(batch_one_live_samples) > 0, ( - f"Should have live samples from batch one, got {len(batch_one_live_samples)}" + assert len(one_negatives_final) == 0, ( + f"Final upload should not have negative tombstones for batch one (not freed), " + f"got {len(one_negatives_final)}" ) - assert len(batch_two_live_samples) > 0, ( - f"Should have live samples from batch two, got {len(batch_two_live_samples)}" + assert len(two_negatives_final) == 0, ( + f"Final upload should not have negative tombstones for batch two (not freed), " + f"got {len(two_negatives_final)}" ) - # batch_one samples were reported in first snapshot, so alloc-space should be 0 in later snapshots - # batch_two samples are new allocations, so alloc-space should be > 0 - batch_one_valid = all( - sample.value[heap_space_idx] > 0 and sample.value[alloc_space_idx] == 0 for sample in batch_one_live_samples - ) - assert batch_one_valid, "Batch one samples should have heap-space > 0 and alloc-space == 0 (already reported)" + # Skip all-zero samples (cancelled ADD+REMOVE pair artifacts from pprof + # aggregation) when validating individual sample values. + for sample in final_profile.sample: + heap = sample.value[heap_space_idx] + alloc = sample.value[alloc_space_idx] + count = sample.value[alloc_count_idx] + if heap == 0 and alloc == 0 and count == 0: + continue + assert heap != 0 or alloc > 0, "Sample should have either heap-space != 0 or alloc-space > 0" + assert count >= 0, f"alloc-samples should be non-negative, got {count}" + # Sanity-check that batch two's positive samples (the new deltas in the + # final upload) carry both heap-space and alloc-space > 0, since they + # represent newly-tracked allocations rather than pre-existing live state. batch_two_valid = all( - sample.value[heap_space_idx] > 0 and sample.value[alloc_space_idx] > 0 for sample in batch_two_live_samples + sample.value[heap_space_idx] > 0 and sample.value[alloc_space_idx] > 0 for sample in final_two_samples ) assert batch_two_valid, "Batch two samples should have heap-space > 0 and alloc-space > 0 (new allocations)" @@ -1031,6 +1051,281 @@ def test_heap_stress() -> None: _memalloc.stop() +def _alloc_in_named_function(n: int, size: int) -> list[Union[tuple[None, ...], bytearray]]: + """Allocate n objects of `size`. Named so the heap profile can attribute samples back to it.""" + return [one(size) for _ in range(n)] + + +def test_delta_export_emits_negative_tombstones_after_free(tmp_path: Path) -> None: + """After freeing tracked allocations, the next snapshot should emit + negative-value tombstones for each freed sample so the backend can + integrate deltas to current state. Without negative samples the backend + would see only positive live state and never observe frees. + """ + output_filename = _setup_profiling_prelude(tmp_path, "test_delta_export_emits_negative_tombstones_after_free") + mc = memalloc.MemoryCollector(heap_sample_size=1024) + + with mc: + live = _alloc_in_named_function(2_000, 1024) + mc.snapshot_and_parse_pprof(output_filename) + del live[:] + gc.collect() + profile_drained = mc.snapshot_and_parse_pprof(output_filename, assert_samples=False) + + heap_space_idx = pprof_utils.get_sample_type_index(profile_drained, "heap-space") + assert heap_space_idx >= 0 + + negative_samples = [ + s + for s in profile_drained.sample + if s.value[heap_space_idx] < 0 and has_function_in_profile_sample(profile_drained, s, _alloc_in_named_function) + ] + assert len(negative_samples) > 0, ( + "expected at least one negative-value tombstone from the allocator after free, got 0" + ) + + +def test_delta_export_steady_state_skips_unchanged_samples(tmp_path: Path) -> None: + """When nothing is freed between snapshots, the second snapshot should + emit only the new allocations since the previous snapshot. A bug that + re-emits unchanged live entries every snapshot would double-count them + on the backend's running heap state. + """ + output_filename_first = _setup_profiling_prelude(tmp_path, "test_delta_export_steady_state_first") + mc = memalloc.MemoryCollector(heap_sample_size=1024) + + with mc: + # First batch establishes the baseline live set. + baseline = _alloc_in_named_function(2_000, 1024) + profile_first = mc.snapshot_and_parse_pprof(output_filename_first) + + # Second snapshot with no allocations or frees in between. + output_filename_second = _setup_profiling_prelude(tmp_path, "test_delta_export_steady_state_second") + profile_second = mc.snapshot_and_parse_pprof(output_filename_second, assert_samples=False) + + # Keep baseline alive through both snapshots so frees do not affect the result. + assert len(baseline) == 2_000 + + heap_space_idx_first = pprof_utils.get_sample_type_index(profile_first, "heap-space") + heap_space_idx_second = pprof_utils.get_sample_type_index(profile_second, "heap-space") + + first_live = [ + s + for s in profile_first.sample + if s.value[heap_space_idx_first] > 0 + and has_function_in_profile_sample(profile_first, s, _alloc_in_named_function) + ] + second_live = [ + s + for s in profile_second.sample + if s.value[heap_space_idx_second] > 0 + and has_function_in_profile_sample(profile_second, s, _alloc_in_named_function) + ] + + assert len(first_live) > 0, "first snapshot should have allocator samples" + # Second snapshot should contain ZERO positive samples from the allocator: pending_changes + # was cleared at the first export, and no new allocations from _alloc_in_named_function + # happened in between. (pprof aggregates same-stack samples; a re-emitted bucket would + # show up here.) A non-zero count indicates the delta path is incorrectly re-emitting + # unchanged live entries. + assert len(second_live) == 0, ( + f"second snapshot re-emitted unchanged live samples: first={len(first_live)}, " + f"second={len(second_live)} (delta path may be falling back to full snapshot)" + ) + + +def test_delta_export_churn_net_heap_is_zero(tmp_path: Path) -> None: + """Under a balanced alloc/free workload, the delta path must emit enough + negative tombstones to cancel every positive ADD that targets the same + stack. pprof aggregates same-stack samples into a single bucket whose + value is the sum; if the delta path is working, the aggregated heap-space + sum for `_alloc_in_named_function` is zero (positives + negatives cancel). + If REMOVEs were not emitted, the sum would be strongly positive. + """ + output_filename = _setup_profiling_prelude(tmp_path, "test_delta_export_churn_net_heap_is_zero") + mc = memalloc.MemoryCollector(heap_sample_size=256) + + with mc: + for _ in range(50): + batch = _alloc_in_named_function(500, 256) + del batch + gc.collect() + profile = mc.snapshot_and_parse_pprof(output_filename, assert_samples=False) + + heap_space_idx = pprof_utils.get_sample_type_index(profile, "heap-space") + allocator_heap_sum = sum( + s.value[heap_space_idx] + for s in profile.sample + if has_function_in_profile_sample(profile, s, _alloc_in_named_function) + ) + # The allocator stack should net to zero after balanced churn. A strongly + # positive sum indicates the delta path failed to emit REMOVE tombstones. + assert allocator_heap_sum == 0, ( + f"net heap-space for allocator stack should be 0 after balanced churn, " + f"got {allocator_heap_sum} (delta path may have skipped REMOVE tombstones)" + ) + + +def test_delta_export_no_resync_for_steady_state(tmp_path: Path) -> None: + """The delta path emits NO periodic resync today. Past the first snapshot + that drains the initial ADDs, repeated snapshots of an unchanging heap + must not re-emit positive samples from the allocator — emitting a plain + full snapshot without a backend reset marker would double-count live + allocations in any backend that integrates deltas (see Copilot review on + PR #18125). Take 15 snapshots without modifying the heap; only the first + should carry positive allocator samples. + """ + mc = memalloc.MemoryCollector(heap_sample_size=1024) + + profiles = [] + with mc: + live = _alloc_in_named_function(2_000, 1024) + + # Take well past the old RESYNC_INTERVAL (10) so a stale resync would + # surface here if it ever sneaks back in. + for i in range(15): + output_filename = _setup_profiling_prelude(tmp_path, f"test_no_resync_{i}") + profiles.append(mc.snapshot_and_parse_pprof(output_filename, assert_samples=False)) + + # Keep live alive across every snapshot so any positive samples must + # come from a re-emit, not from a fresh allocation. + assert len(live) == 2_000 + + heap_space_idx = pprof_utils.get_sample_type_index(profiles[0], "heap-space") + # The first snapshot drains the initial ADD events — expect positive samples. + first_live = [ + s + for s in profiles[0].sample + if s.value[heap_space_idx] > 0 and has_function_in_profile_sample(profiles[0], s, _alloc_in_named_function) + ] + assert len(first_live) > 0, "first snapshot should carry the initial ADD deltas" + + # Every snapshot after the first must have zero positive samples from the + # allocator stack — pending_changes is empty between snapshots, so the + # delta path emits nothing. + for idx in range(1, len(profiles)): + idx_in_this = pprof_utils.get_sample_type_index(profiles[idx], "heap-space") + re_emitted = [ + s + for s in profiles[idx].sample + if s.value[idx_in_this] > 0 and has_function_in_profile_sample(profiles[idx], s, _alloc_in_named_function) + ] + assert len(re_emitted) == 0, ( + f"snapshot {idx} re-emitted {len(re_emitted)} allocator samples — " + "a periodic resync without a backend reset marker would double-count live state" + ) + + +def test_delta_export_net_heap_is_zero_across_snapshots(tmp_path: Path) -> None: + """The delta path must converge to a net heap-space of zero across snapshots + after every tracked allocation has been freed: every ADD positive must be + matched by a REMOVE tombstone reaching the backend. A silent drop on a + libdatadog rejection (Profile_add2 returns false) would leave one half of + an ADD/REMOVE pair missing and the net non-zero. + + The retain-on-failure logic guards against that: failed events stay in + pending_changes and are retried on subsequent snapshots up to + MAX_EXPORT_RETRIES. See chatgpt-codex-connector P2 review on PR #18125. + + This test exercises the success path of the retry code on a modest + workload (so the run completes in seconds even when libdatadog does + reject). If libdatadog never rejects at this scale, the assertion still + catches any other delta-emission bug that would skew the sum. + """ + mc = memalloc.MemoryCollector(heap_sample_size=256) + + def accumulate_heap_space(profile) -> int: + idx = pprof_utils.get_sample_type_index(profile, "heap-space") + return sum( + s.value[idx] for s in profile.sample if has_function_in_profile_sample(profile, s, _alloc_in_named_function) + ) + + total_heap_space_for_allocator = 0 + with mc: + live = _alloc_in_named_function(5_000, 512) + + # Snapshot while allocations are live so ADD positives reach the + # backend. Without this the alloc+free pair below would pair-collapse + # before any snapshot ran, and the net-zero assertion would pass + # trivially without exercising the ADD/REMOVE round trip. + output_filename = _setup_profiling_prelude(tmp_path, "test_delta_net_zero_pre") + profile = mc.snapshot_and_parse_pprof(output_filename, assert_samples=False) + total_heap_space_for_allocator += accumulate_heap_space(profile) + + del live + gc.collect() + + # Three more snapshots: first drains the queued REMOVE tombstones; any + # retained events from rejection get retried on the next two passes + # (bounded by MAX_EXPORT_RETRIES = 2 in C++). + for i in range(3): + output_filename = _setup_profiling_prelude(tmp_path, f"test_delta_net_zero_{i}") + profile = mc.snapshot_and_parse_pprof(output_filename, assert_samples=False) + total_heap_space_for_allocator += accumulate_heap_space(profile) + + assert total_heap_space_for_allocator == 0, ( + f"net heap-space across all snapshots should be 0 (positives + tombstones cancel), " + f"got {total_heap_space_for_allocator} (delta path may drop failed exports)" + ) + + +def test_delta_export_heavy_churn_stays_consistent(tmp_path: Path) -> None: + """Under sustained allocation pressure the delta path must not crash, hang, + or corrupt state. Hook-side push_backs into pending_changes can grow the + buffer past its initial reserve when many live allocations accumulate + between snapshots — the reallocation goes through system malloc (operator + new), NOT through PyMem_Malloc, so it does not reenter untrack_no_cpython. + + This test exercises that growth path by holding ~5000 live allocations + (above PENDING_CHANGES_RESERVE = 4096 in C++) before the first snapshot, + so every ADD sits in the buffer until export drains it. Pair-collapse + cannot fire because nothing is freed yet. It then verifies the visible + invariants: the profiler does not crash, snapshots succeed, freeing the + batch queues REMOVE tombstones that the next snapshot drains, and a final + no-op snapshot stays empty. + """ + output_filename = _setup_profiling_prelude(tmp_path, "test_delta_export_heavy_churn_stays_consistent") + # Aggressive sampling so most allocations register as events, exercising + # the hook-side push_back code paths added by this change. + mc = memalloc.MemoryCollector(heap_sample_size=64) + + with mc: + # Keep ~5000 live allocations alive so pending_changes grows past + # PENDING_CHANGES_RESERVE before the first snapshot drains it. + live = _alloc_in_named_function(5_000, 256) + profile = mc.snapshot_and_parse_pprof(output_filename, assert_samples=False) + + # Free everything; this queues a wave of REMOVEs into the now-drained + # pending_changes. The second snapshot drains those tombstones. + del live + gc.collect() + profile2 = mc.snapshot_and_parse_pprof(output_filename, assert_samples=False) + + # Third snapshot exercises the post-drain steady state: pending_changes + # was cleared by the second snapshot and no new churn occurred, so this + # one should emit nothing under the delta path. + profile3 = mc.snapshot_and_parse_pprof(output_filename, assert_samples=False) + + # Profiler survived and produced parseable profiles. + assert profile is not None + assert profile2 is not None + assert profile3 is not None + + # The third snapshot must not re-emit positive allocator samples; the + # first two drained any pending events and nothing new was tracked between + # the second and third snapshots, so any positive samples there would + # indicate a stuck buffer or a sneak resync re-emission. + heap_space_idx_3 = pprof_utils.get_sample_type_index(profile3, "heap-space") + re_emitted = [ + s + for s in profile3.sample + if s.value[heap_space_idx_3] > 0 and has_function_in_profile_sample(profile3, s, _alloc_in_named_function) + ] + assert len(re_emitted) == 0, ( + f"third snapshot leaked positive samples from the allocator stack: got {len(re_emitted)}" + ) + + @pytest.mark.parametrize("heap_sample_size", (0, 512 * 1024, 1024 * 1024, 2048 * 1024, 4096 * 1024)) def test_memalloc_speed(benchmark, heap_sample_size) -> None: if heap_sample_size: