Skip to content

Commit d6d0dcf

Browse files
committed
Stat correction when success replaces failure + stats debug logging
- Store stat delta on record_error (when acknowledged); apply correction in record_success when deleting an error report so aggregates stay correct. - Add error-report-deltas key; BuildStatusRecorder passes delta_for(test). - Add [stats] debug logging (record_error, record_success, record_stats, store_error_report_delta, apply_error_report_delta_correction) when --debug-log is set.
1 parent 63cc2f0 commit d6d0dcf

4 files changed

Lines changed: 61 additions & 4 deletions

File tree

ruby/lib/ci/queue/build_record.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def queue_exhausted?
1818
@queue.exhausted?
1919
end
2020

21-
def record_error(id, payload)
21+
def record_error(id, payload, stat_delta: nil)
2222
error_reports[id] = payload
2323
true
2424
end

ruby/lib/ci/queue/redis/build_record.rb

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,34 @@ def record_warning(type, attributes)
5656
redis.rpush(key('warnings'), Marshal.dump([type, attributes]))
5757
end
5858

59-
def record_error(id, payload)
59+
def record_error(id, payload, stat_delta: nil)
6060
# Run acknowledge first so we know whether we're the first to ack
6161
acknowledged = @queue.acknowledge(id, error: payload)
62+
stats_logger&.info("[stats] record_error test_id=#{id.inspect} acknowledged=#{acknowledged} worker_id=#{config.worker_id} stat_delta=#{stat_delta.inspect}")
6263

6364
if acknowledged
6465
# We were the first to ack; another worker already ack'd would get falsy from SADD
6566
@queue.increment_test_failed
67+
# Only the acknowledging worker's stats include this failure (others skip increment when ack=false).
68+
# Store so we can subtract it if another worker records success later.
69+
store_error_report_delta(id, stat_delta) if stat_delta && stat_delta.any?
6670
end
6771
# Return so caller can roll back local counter when not acknowledged
6872
!!acknowledged
6973
end
7074

7175
def record_success(id, skip_flaky_record: false)
72-
acknowledged, error_reports_deleted_count, requeued_count = redis.multi do |transaction|
76+
acknowledged, error_reports_deleted_count, requeued_count, delta_json = redis.multi do |transaction|
7377
@queue.acknowledge(id, pipeline: transaction)
7478
transaction.hdel(key('error-reports'), id)
7579
transaction.hget(key('requeues-count'), id)
80+
transaction.hget(key('error-report-deltas'), id)
81+
end
82+
stats_logger&.info("[stats] record_success test_id=#{id.inspect} acknowledged=#{acknowledged} worker_id=#{config.worker_id} error_reports_deleted=#{error_reports_deleted_count} has_delta=#{!!delta_json}")
83+
# When we're replacing a failure, subtract the (single) acknowledging worker's stat contribution
84+
if error_reports_deleted_count.to_i > 0 && delta_json
85+
apply_error_report_delta_correction(delta_json)
86+
redis.hdel(key('error-report-deltas'), id)
7687
end
7788
record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0)
7889
!!acknowledged
@@ -84,6 +95,7 @@ def record_requeue(id)
8495

8596
def record_stats(stats = nil, pipeline: nil)
8697
return unless stats
98+
stats_logger&.info("[stats] record_stats worker_id=#{config.worker_id} stats=#{stats.inspect}")
8799
if pipeline
88100
stats.each do |stat_name, stat_value|
89101
pipeline.hset(key(stat_name), config.worker_id, stat_value)
@@ -146,6 +158,38 @@ def reset_stats(stat_names)
146158
def key(*args)
147159
KeyShortener.key(config.build_id, *args)
148160
end
161+
162+
def store_error_report_delta(test_id, stat_delta)
163+
# Only the acknowledging worker's stats include this test; store their delta for correction on success
164+
payload = { 'worker_id' => config.worker_id }.merge(stat_delta)
165+
stats_logger&.info("[stats] store_error_report_delta test_id=#{test_id.inspect} worker_id=#{config.worker_id} payload=#{payload.inspect}")
166+
redis.hset(key('error-report-deltas'), test_id, JSON.generate(payload))
167+
redis.expire(key('error-report-deltas'), config.redis_ttl)
168+
end
169+
170+
def apply_error_report_delta_correction(delta_json)
171+
delta = JSON.parse(delta_json)
172+
worker_id = delta.delete('worker_id')
173+
stats_logger&.info("[stats] apply_error_report_delta_correction worker_id=#{worker_id.inspect} subtracting=#{delta.inspect}")
174+
return if worker_id.nil? || delta.empty?
175+
176+
redis.pipelined do |pipeline|
177+
delta.each do |stat_name, value|
178+
next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/)
179+
180+
pipeline.hincrbyfloat(key(stat_name), worker_id, -value.to_f)
181+
pipeline.expire(key(stat_name), config.redis_ttl)
182+
end
183+
end
184+
end
185+
186+
def stats_logger
187+
return @stats_logger if defined?(@stats_logger)
188+
return @stats_logger = nil unless config.respond_to?(:debug_log) && config.debug_log
189+
190+
require 'logger'
191+
@stats_logger = Logger.new(config.debug_log).tap { |l| l.level = Logger::INFO }
192+
end
149193
end
150194
end
151195
end

ruby/lib/ci/queue/redis/key_shortener.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module KeyShortener
1212
'queue' => 'q',
1313
'owners' => 'o',
1414
'error-reports' => 'e',
15+
'error-report-deltas' => 'ed',
1516
'requeues-count' => 'rc',
1617
'assertions' => 'a',
1718
'errors' => 'er',

ruby/lib/minitest/queue/build_status_recorder.rb

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def record(test)
4343
test_id = "#{test.klass}##{test.name}"
4444

4545
acknowledged = if (test.failure || test.error?) && !test.skipped?
46-
build.record_error(test_id, dump(test))
46+
build.record_error(test_id, dump(test), stat_delta: delta_for(test))
4747
elsif test.requeued?
4848
build.record_requeue(test_id)
4949
else
@@ -67,6 +67,18 @@ def record(test)
6767

6868
private
6969

70+
def delta_for(test)
71+
h = { 'assertions' => 1, 'errors' => 0, 'failures' => 0, 'skips' => 0, 'requeues' => 0, 'total_time' => test.time.to_f }
72+
if (test.failure || test.error?) && !test.skipped?
73+
test.error? ? h['errors'] = 1 : h['failures'] = 1
74+
elsif test.requeued?
75+
h['requeues'] = 1
76+
elsif test.skipped?
77+
h['skips'] = 1
78+
end
79+
h
80+
end
81+
7082
def dump(test)
7183
ErrorReport.new(self.class.failure_formatter.new(test).to_h).dump
7284
end

0 commit comments

Comments
 (0)