Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions redis/_entry_helpers.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local function test_id_from_entry(value, delimiter)
if delimiter then
local pos = string.find(value, delimiter, 1, true)
if pos then
return string.sub(value, 1, pos - 1)
end
end
return value
end
15 changes: 8 additions & 7 deletions redis/acknowledge.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ local processed_key = KEYS[2]
local owners_key = KEYS[3]
local error_reports_key = KEYS[4]

local test = ARGV[1]
local error = ARGV[2]
local ttl = ARGV[3]
redis.call('zrem', zset_key, test)
redis.call('hdel', owners_key, test) -- Doesn't matter if it was reclaimed by another workers
local acknowledged = redis.call('sadd', processed_key, test) == 1
local entry = ARGV[1]
local test_id = ARGV[2]
local error = ARGV[3]
local ttl = ARGV[4]
redis.call('zrem', zset_key, entry)
redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers
local acknowledged = redis.call('sadd', processed_key, test_id) == 1

if acknowledged and error ~= "" then
redis.call('hset', error_reports_key, test, error)
redis.call('hset', error_reports_key, test_id, error)
redis.call('expire', error_reports_key, ttl)
end

Expand Down
13 changes: 9 additions & 4 deletions redis/heartbeat.lua
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
-- @include _entry_helpers

local zset_key = KEYS[1]
local processed_key = KEYS[2]
local owners_key = KEYS[3]
local worker_queue_key = KEYS[4]

local current_time = ARGV[1]
local test = ARGV[2]
local entry = ARGV[2]
local entry_delimiter = ARGV[3]

local test_id = test_id_from_entry(entry, entry_delimiter)

-- already processed, we do not need to bump the timestamp
if redis.call('sismember', processed_key, test) == 1 then
if redis.call('sismember', processed_key, test_id) == 1 then
return false
end

-- we're still the owner of the test, we can bump the timestamp
if redis.call('hget', owners_key, test) == worker_queue_key then
return redis.call('zadd', zset_key, current_time, test)
if redis.call('hget', owners_key, entry) == worker_queue_key then
return redis.call('zadd', zset_key, current_time, entry)
end
23 changes: 12 additions & 11 deletions redis/requeue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ local error_reports_key = KEYS[7]

local max_requeues = tonumber(ARGV[1])
local global_max_requeues = tonumber(ARGV[2])
local test = ARGV[3]
local offset = ARGV[4]
local entry = ARGV[3]
local test_id = ARGV[4]
local offset = ARGV[5]

if redis.call('hget', owners_key, test) == worker_queue_key then
redis.call('hdel', owners_key, test)
if redis.call('hget', owners_key, entry) == worker_queue_key then
redis.call('hdel', owners_key, entry)
end

if redis.call('sismember', processed_key, test) == 1 then
if redis.call('sismember', processed_key, test_id) == 1 then
return false
end

Expand All @@ -24,23 +25,23 @@ if global_requeues and global_requeues >= tonumber(global_max_requeues) then
return false
end

local requeues = tonumber(redis.call('hget', requeues_count_key, test))
local requeues = tonumber(redis.call('hget', requeues_count_key, test_id))
if requeues and requeues >= max_requeues then
return false
end

redis.call('hincrby', requeues_count_key, '___total___', 1)
redis.call('hincrby', requeues_count_key, test, 1)
redis.call('hincrby', requeues_count_key, test_id, 1)

redis.call('hdel', error_reports_key, test)
redis.call('hdel', error_reports_key, test_id)

local pivot = redis.call('lrange', queue_key, -1 - offset, 0 - offset)[1]
if pivot then
redis.call('linsert', queue_key, 'BEFORE', pivot, test)
redis.call('linsert', queue_key, 'BEFORE', pivot, entry)
else
redis.call('lpush', queue_key, test)
redis.call('lpush', queue_key, entry)
end

redis.call('zrem', zset_key, test)
redis.call('zrem', zset_key, entry)

return true
6 changes: 5 additions & 1 deletion redis/reserve_lost.lua
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
-- @include _entry_helpers

local zset_key = KEYS[1]
local processed_key = KEYS[2]
local worker_queue_key = KEYS[3]
local owners_key = KEYS[4]

local current_time = ARGV[1]
local timeout = ARGV[2]
local entry_delimiter = ARGV[3]

local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
for _, test in ipairs(lost_tests) do
if redis.call('sismember', processed_key, test) == 0 then
local test_id = test_id_from_entry(test, entry_delimiter)
if redis.call('sismember', processed_key, test_id) == 0 then
redis.call('zadd', zset_key, current_time, test)
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
Expand Down
2 changes: 1 addition & 1 deletion ruby/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
ci-queue (0.82.0)
ci-queue (0.83.0)
logger

GEM
Expand Down
87 changes: 87 additions & 0 deletions ruby/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,93 @@ minitest-queue --queue redis://example.com run -Itest test/**/*_test.rb

Additionally you can configure the requeue settings (see main README) with `--max-requeues` and `--requeue-tolerance`.

#### Lazy loading (opt-in)

By default, all test files are loaded upfront before any tests run. Lazy loading changes this
so that test files are loaded on-demand as each test is dequeued, reducing peak memory usage.
This is supported only by `minitest-queue` (not `rspec-queue`).

```bash
minitest-queue --queue redis://example.com --lazy-load run -Itest test/**/*_test.rb
```

The leader discovers tests from the provided files, streams them to Redis in batches, and
workers start running tests as soon as the first batch arrives. Each worker only loads the
test files it actually needs.

In lazy-load mode, test files are not loaded at startup. If your test suite requires a boot
file (e.g., `test/test_helper.rb` for Rails), specify it so all workers load it before
running tests.

**CLI flags:**

| Flag | Description |
|---|---|
| `--lazy-load` | Enable lazy loading mode |
| `--lazy-load-stream-batch-size SIZE` | Number of tests per batch streamed to Redis (default: 5000) |
| `--lazy-load-stream-timeout SECONDS` | Max time for the leader to finish streaming (default: 300s or `--queue-init-timeout`, whichever is larger) |
| `--test-files FILE` | Read test file paths from FILE (one per line) instead of positional args. Avoids ARG_MAX limits for large suites (36K+ files). |

**Environment variables:**

| Variable | Description |
|---|---|
| `CI_QUEUE_LAZY_LOAD=1` | Enable lazy loading (equivalent to `--lazy-load`) |
| `CI_QUEUE_LAZY_LOAD_STREAM_BATCH_SIZE=N` | Same as `--lazy-load-stream-batch-size` |
| `CI_QUEUE_LAZY_LOAD_STREAM_TIMEOUT=N` | Same as `--lazy-load-stream-timeout` |
| `CI_QUEUE_LAZY_LOAD_TEST_HELPERS=path` | Comma-separated list of helper files to load at startup on all workers (e.g., `test/test_helper.rb`). No CLI equivalent. |

Backward-compatible env var aliases: `CI_QUEUE_STREAM_BATCH_SIZE`, `CI_QUEUE_STREAM_TIMEOUT`, `CI_QUEUE_TEST_HELPERS`.

When `CI_QUEUE_DEBUG=1` is set, file loading stats are printed at the end of the run.

#### Preresolved test names (opt-in)

For large test suites, you can pre-compute the full list of test names on a stable branch
(e.g., `main`) and cache it. On feature branches, ci-queue reads test names from the cache
instead of loading all test files to discover them. This eliminates the upfront discovery
cost and implies lazy-load mode for all workers.

```bash
minitest-queue --queue redis://example.com run \
--preresolved-tests test_names.txt \
-I. -Itest
```

The file format is one test per line: `TestClass#method_name|path/to/test_file.rb`.
The pipe-delimited file path tells ci-queue which file to load when a worker picks up that test.
The leader streams entries directly to Redis without loading any test files.

**Reconciliation**: The cached test list may become stale when test files change between
the cache build and the branch build (methods added, removed, or renamed). To handle this,
pass `--test-files` with a list of changed test files. The leader will discard preresolved
entries for those files and re-discover their current test methods by loading them:

```bash
minitest-queue --queue redis://example.com run \
--preresolved-tests cached_test_names.txt \
--test-files changed_test_files.txt \
-I. -Itest
```

Note: `--test-files` serves double duty. In plain lazy-load mode it provides the list of
test files to discover. In preresolved mode it acts as the reconciliation set.

**Stale entry handling**: Even with reconciliation, some preresolved entries may refer to
test methods that no longer exist (e.g., a helper file changed the set of dynamically
generated methods). By default, these cause an error on the worker. To skip them gracefully
as `Minitest::Skip` instead, set:

| Variable | Description |
|---|---|
| `CI_QUEUE_SKIP_STALE_TESTS=1` | Report stale preresolved entries as skips instead of errors. No CLI equivalent. |

**CLI flags:**

| Flag | Description |
|---|---|
| `--preresolved-tests FILE` | Read pre-computed test names from FILE. Implies `--lazy-load`. No env var equivalent. |
| `--test-files FILE` | In preresolved mode: reconciliation set of changed files to re-discover. |

If you'd like to centralize the error reporting you can do so with:

Expand Down
27 changes: 27 additions & 0 deletions ruby/lib/ci/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
require 'ci/queue/file'
require 'ci/queue/grind'
require 'ci/queue/bisect'
require 'ci/queue/queue_entry'
require 'ci/queue/class_resolver'
require 'ci/queue/file_loader'

module CI
module Queue
Expand All @@ -22,6 +25,18 @@ module Queue
attr_accessor :shuffler, :requeueable

Error = Class.new(StandardError)
ClassNotFoundError = Class.new(Error)

class FileLoadError < Error
attr_reader :file_path, :original_error

def initialize(file_path, original_error)
@file_path = file_path
@original_error = original_error
super("Failed to load #{file_path}: #{original_error.class}: #{original_error.message}")
set_backtrace(original_error.backtrace)
end
end

module Warnings
RESERVED_LOST_TEST = :RESERVED_LOST_TEST
Expand All @@ -48,6 +63,17 @@ def shuffle(tests, random)
end
end

def debug?
return @debug if defined?(@debug)

value = ENV['CI_QUEUE_DEBUG']
@debug = !!(value && !value.strip.empty? && !%w[0 false].include?(value.strip.downcase))
end

def reset_debug!
remove_instance_variable(:@debug) if defined?(@debug)
end

def from_uri(url, config)
uri = URI(url)
implementation = case uri.scheme
Expand All @@ -65,3 +91,4 @@ def from_uri(url, config)
end
end
end

38 changes: 38 additions & 0 deletions ruby/lib/ci/queue/class_resolver.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

module CI
module Queue
module ClassResolver
def self.resolve(class_name, file_path: nil, loader: nil)
klass = try_direct_lookup(class_name)
return klass if klass

if file_path && loader
loader.load_file(file_path)
klass = try_direct_lookup(class_name)
return klass if klass
end

raise ClassNotFoundError, "Unable to resolve class #{class_name}"
end

def self.try_direct_lookup(class_name)
parts = class_name.sub(/\A::/, '').split('::')
current = Object

parts.each do |name|
return nil unless current.const_defined?(name, false)

current = current.const_get(name, false)
end

return nil unless current.is_a?(Class)

current
rescue NameError
nil
end
private_class_method :try_direct_lookup
end
end
end
Loading
Loading