From b7996cb8b0d075c0566c2a23a9d3aeb37a17d834 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 5 May 2026 15:44:12 +0100 Subject: [PATCH 1/5] Update info log --- lib/rage/deferred/scheduler.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/rage/deferred/scheduler.rb b/lib/rage/deferred/scheduler.rb index a055ccab..9a520836 100644 --- a/lib/rage/deferred/scheduler.rb +++ b/lib/rage/deferred/scheduler.rb @@ -7,8 +7,8 @@ def self.start(tasks) return if tasks.empty? Rage::Internal.pick_a_worker(lock_path: LOCK_PATH) do - Rage.logger.info " Worker PID #{Process.pid} is managing scheduled tasks" - register_timers tasks + puts("INFO: #{Process.pid} is managing scheduled tasks.") if Rage.logger.info? + register_timers(tasks) end end From e4df687babfbc01d8c38738be9fc93bec9269cae Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 5 May 2026 15:45:40 +0100 Subject: [PATCH 2/5] Correctly schedule tasks after code reloading in development --- lib/rage/deferred/scheduler.rb | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/rage/deferred/scheduler.rb b/lib/rage/deferred/scheduler.rb index 9a520836..f0b8876e 100644 --- a/lib/rage/deferred/scheduler.rb +++ b/lib/rage/deferred/scheduler.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +# @private class Rage::Deferred::Scheduler LOCK_PATH = "/tmp/rage_deferred_scheduler.lock" @@ -14,8 +15,12 @@ def self.start(tasks) def self.register_timers(tasks) tasks.each do |entry| - Iodine.run_every((entry[:interval] * 1000).to_i) do - entry[:task].enqueue + interval = (entry[:interval] * 1000).to_i + + if Rage.env.development? + Iodine.run_every(interval) { Object.const_get(entry[:task].name).enqueue } + else + Iodine.run_every(interval) { entry[:task].enqueue } end end end From 1b72d140eeabbd9c3d823af67ff25c8b73ea5a0e Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 5 May 2026 17:07:59 +0100 Subject: [PATCH 3/5] Update worker election logic update `pick_a_worker` to use the `purpose` argument instead of `lock_path` - callers shouldn't know the method's implementation details (e.g. that it uses file locks). Additionally, the code is updated to work correctly with multiple instances (e.g. different Rage apps are running on the same server on different ports) and all lock files are now stored in an array to protect from GC --- lib/rage/deferred/scheduler.rb | 4 +--- lib/rage/internal.rb | 20 ++++++++++++++------ lib/rage/pubsub/adapters/redis.rb | 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/lib/rage/deferred/scheduler.rb b/lib/rage/deferred/scheduler.rb index f0b8876e..df8246f1 100644 --- a/lib/rage/deferred/scheduler.rb +++ b/lib/rage/deferred/scheduler.rb @@ -2,12 +2,10 @@ # @private class Rage::Deferred::Scheduler - LOCK_PATH = "/tmp/rage_deferred_scheduler.lock" - def self.start(tasks) return if tasks.empty? - Rage::Internal.pick_a_worker(lock_path: LOCK_PATH) do + Rage::Internal.pick_a_worker(purpose: "deferred-scheduler") do puts("INFO: #{Process.pid} is managing scheduled tasks.") if Rage.logger.info? register_timers(tasks) end diff --git a/lib/rage/internal.rb b/lib/rage/internal.rb index b0d09430..9d4ba856 100644 --- a/lib/rage/internal.rb +++ b/lib/rage/internal.rb @@ -64,16 +64,20 @@ def stream_name_for(streamables) name_segments.join(":") end + LOCK_FILE_SUFFIX = rand(0x100000000).to_s(36) + # Pick a worker process to execute a block of code. # This is useful for ensuring that certain code is only executed by a single worker in a multi-worker setup, e.g. for broadcasting messages to known streams or for running periodic tasks. # @yield The block of code to be executed by the picked worker - def pick_a_worker(lock_path: nil, &block) - @lock_file, lock_path = Tempfile.new.yield_self { |f| [f, f.path] } unless lock_path - + def pick_a_worker(purpose:, &block) attempt = proc do - worker_lock = File.open(lock_path, File::CREAT | File::WRONLY) - if worker_lock.flock(File::LOCK_EX | File::LOCK_NB) - @worker_lock = worker_lock + lock_path = Pathname.new(Dir.tmpdir).join("rage-#{purpose}-lock-#{LOCK_FILE_SUFFIX}") + + lock_file = File.open(lock_path, File::CREAT | File::WRONLY) + + if lock_file.flock(File::LOCK_EX | File::LOCK_NB) + Iodine.on_state(:on_finish) { File.unlink(lock_file) } + worker_locks << lock_file block.call end end @@ -83,6 +87,10 @@ def pick_a_worker(lock_path: nil, &block) private + def worker_locks + @worker_locks ||= [] + end + def dynamic_name_seed @dynamic_name_seed ||= ("a".."j").to_a.permutation end diff --git a/lib/rage/pubsub/adapters/redis.rb b/lib/rage/pubsub/adapters/redis.rb index 1f4220f4..d5dbaf3b 100644 --- a/lib/rage/pubsub/adapters/redis.rb +++ b/lib/rage/pubsub/adapters/redis.rb @@ -40,7 +40,7 @@ def initialize(config) @trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid - Rage::Internal.pick_a_worker do + Rage::Internal.pick_a_worker(purpose: "redis-pubsub") do puts("INFO: #{Process.pid} is managing Redis subscriptions.") if Rage.logger.info? poll end From 06e07979ae506336e6ded02777342e6049198c82 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 5 May 2026 19:15:22 +0100 Subject: [PATCH 4/5] Update docs --- lib/rage/configuration.rb | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index 7335350a..80111237 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -691,11 +691,18 @@ def initialize @schedule_blocks = [] end - # Stores the scheduling block for later execution + # Schedule a periodic task to run at a fixed interval. + # @example + # Rage.configure do + # config.deferred.schedule do + # every 5.minutes, task: ClearCache + # end + # end def schedule(&block) @schedule_blocks << block end + # @private # Evaluates all stored schedule blocks and returns the collected tasks. # Called at boot time after all app constants are loaded. def scheduled_tasks From d8061961504785fe990a11ed5c1490284446c936 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 5 May 2026 20:15:57 +0100 Subject: [PATCH 5/5] Update tests --- spec/deferred/scheduler_spec.rb | 20 +++++++++++--- spec/internal_spec.rb | 47 +++++++++++++++++++-------------- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/spec/deferred/scheduler_spec.rb b/spec/deferred/scheduler_spec.rb index 1448d985..0dc383d1 100644 --- a/spec/deferred/scheduler_spec.rb +++ b/spec/deferred/scheduler_spec.rb @@ -3,7 +3,7 @@ RSpec.describe Rage::Deferred::Scheduler do let(:task) { double("Rage::Deferred::Task") } let(:tasks) { [{ interval: 60, task: task }] } - let(:logger) { double("Logger", info: nil) } + let(:logger) { double("Logger", info: nil, info?: true) } before do allow(Rage).to receive(:logger).and_return(logger) @@ -15,7 +15,7 @@ describe ".start" do it "does not start when no tasks are configured" do described_class.start([]) - expect(Iodine).not_to have_received(:run_every) + expect(Rage::Internal).not_to have_received(:pick_a_worker) end it "registers timers when leader is elected" do @@ -40,14 +40,26 @@ end it "calls enqueue on the task when timer fires" do + allow(Rage.env).to receive(:development?).and_return(false) + allow(Iodine).to receive(:run_every).with(60_000) { |&block| block.call } + described_class.start(tasks) + expect(task).to have_received(:enqueue) + end + + it "reloads the task in development" do + allow(Rage.env).to receive(:development?).and_return(true) + + allow(task).to receive(:name).and_return("TestTask") + stub_const("TestTask", task) + allow(Iodine).to receive(:run_every).with(60_000) { |&block| block.call } described_class.start(tasks) expect(task).to have_received(:enqueue) end - it "passes the correct lock path to pick_a_worker" do + it "passes the correct purpose to pick_a_worker" do described_class.start(tasks) - expect(Rage::Internal).to have_received(:pick_a_worker).with(lock_path: Rage::Deferred::Scheduler::LOCK_PATH) + expect(Rage::Internal).to have_received(:pick_a_worker).with(purpose: "deferred-scheduler") end end end diff --git a/spec/internal_spec.rb b/spec/internal_spec.rb index bc46ba09..47b18abb 100644 --- a/spec/internal_spec.rb +++ b/spec/internal_spec.rb @@ -97,21 +97,29 @@ describe ".pick_a_worker" do let(:on_start_callbacks) { [] } + let(:on_finish_callbacks) { [] } before do allow(Iodine).to receive(:running?).and_return(false) allow(Iodine).to receive(:on_state).with(:on_start) do |&block| on_start_callbacks << block end + + allow(Iodine).to receive(:on_state).with(:on_finish) do |&block| + on_finish_callbacks << block + end + + allow(File).to receive(:open).and_return(lock_file) end + let(:lock_file) { Tempfile.create } + after do - described_class.instance_variable_set(:@lock_file, nil) - described_class.instance_variable_set(:@worker_lock, nil) + described_class.send(:worker_locks).clear end it "registers a callback" do - described_class.pick_a_worker { "work" } + described_class.pick_a_worker(purpose: "test") { "work" } expect(Iodine).to have_received(:on_state).with(:on_start) expect(on_start_callbacks.size).to eq(1) @@ -120,27 +128,30 @@ it "executes the block immediately when Iodine is already running" do allow(Iodine).to receive(:running?).and_return(true) executed = false - described_class.pick_a_worker { executed = true } + described_class.pick_a_worker(purpose: "test") { executed = true } expect(executed).to be(true) end it "creates a lock file" do - described_class.pick_a_worker { "work" } + described_class.pick_a_worker(purpose: "test") { "work" } + + on_start_callbacks.first.call - lock_file = described_class.instance_variable_get(:@lock_file) expect(File.exist?(lock_file.path)).to be(true) end - it "uses provided lock_path instead of creating a new tempfile" do - lock_path = Tempfile.new.path - expect(Tempfile).not_to receive(:new) - described_class.pick_a_worker(lock_path: lock_path) { "work" } + it "deletes the lock file" do + described_class.pick_a_worker(purpose: "test") { "work" } + on_start_callbacks.first.call + on_finish_callbacks.first.call + + expect(File.exist?(lock_file.path)).to be(false) end it "executes the block when lock is acquired" do executed = false - described_class.pick_a_worker { executed = true } + described_class.pick_a_worker(purpose: "test") { executed = true } on_start_callbacks.first.call @@ -148,19 +159,17 @@ end it "stores the worker lock when acquired" do - described_class.pick_a_worker { "work" } + described_class.pick_a_worker(purpose: "test") { "work" } on_start_callbacks.first.call - worker_lock = described_class.instance_variable_get(:@worker_lock) - expect(worker_lock).to be_a(File) + worker_lock = described_class.send(:worker_locks).first + expect(worker_lock).to eq(lock_file) end it "does not execute the block when lock cannot be acquired" do executed = false - described_class.pick_a_worker { executed = true } - - lock_file = described_class.instance_variable_get(:@lock_file) + described_class.pick_a_worker(purpose: "test") { executed = true } external_lock = File.new(lock_file.path) external_lock.flock(File::LOCK_EX | File::LOCK_NB) @@ -174,9 +183,7 @@ it "allows only one worker to execute the block" do execution_count = 0 - described_class.pick_a_worker { execution_count += 1 } - - lock_file = described_class.instance_variable_get(:@lock_file) + described_class.pick_a_worker(purpose: "test") { execution_count += 1 } on_start_callbacks.first.call expect(execution_count).to eq(1)