diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index 7335350a..80111237 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -691,11 +691,18 @@ def initialize @schedule_blocks = [] end - # Stores the scheduling block for later execution + # Schedule a periodic task to run at a fixed interval. + # @example + # Rage.configure do + # config.deferred.schedule do + # every 5.minutes, task: ClearCache + # end + # end def schedule(&block) @schedule_blocks << block end + # @private # Evaluates all stored schedule blocks and returns the collected tasks. # Called at boot time after all app constants are loaded. def scheduled_tasks diff --git a/lib/rage/deferred/scheduler.rb b/lib/rage/deferred/scheduler.rb index a055ccab..df8246f1 100644 --- a/lib/rage/deferred/scheduler.rb +++ b/lib/rage/deferred/scheduler.rb @@ -1,21 +1,24 @@ # frozen_string_literal: true +# @private class Rage::Deferred::Scheduler - LOCK_PATH = "/tmp/rage_deferred_scheduler.lock" - def self.start(tasks) return if tasks.empty? - Rage::Internal.pick_a_worker(lock_path: LOCK_PATH) do - Rage.logger.info " Worker PID #{Process.pid} is managing scheduled tasks" - register_timers tasks + Rage::Internal.pick_a_worker(purpose: "deferred-scheduler") do + puts("INFO: #{Process.pid} is managing scheduled tasks.") if Rage.logger.info? + register_timers(tasks) end end def self.register_timers(tasks) tasks.each do |entry| - Iodine.run_every((entry[:interval] * 1000).to_i) do - entry[:task].enqueue + interval = (entry[:interval] * 1000).to_i + + if Rage.env.development? + Iodine.run_every(interval) { Object.const_get(entry[:task].name).enqueue } + else + Iodine.run_every(interval) { entry[:task].enqueue } end end end diff --git a/lib/rage/internal.rb b/lib/rage/internal.rb index b0d09430..9d4ba856 100644 --- a/lib/rage/internal.rb +++ b/lib/rage/internal.rb @@ -64,16 +64,20 @@ def stream_name_for(streamables) name_segments.join(":") end + LOCK_FILE_SUFFIX = rand(0x100000000).to_s(36) + # Pick a worker process to execute a block of code. # This is useful for ensuring that certain code is only executed by a single worker in a multi-worker setup, e.g. for broadcasting messages to known streams or for running periodic tasks. # @yield The block of code to be executed by the picked worker - def pick_a_worker(lock_path: nil, &block) - @lock_file, lock_path = Tempfile.new.yield_self { |f| [f, f.path] } unless lock_path - + def pick_a_worker(purpose:, &block) attempt = proc do - worker_lock = File.open(lock_path, File::CREAT | File::WRONLY) - if worker_lock.flock(File::LOCK_EX | File::LOCK_NB) - @worker_lock = worker_lock + lock_path = Pathname.new(Dir.tmpdir).join("rage-#{purpose}-lock-#{LOCK_FILE_SUFFIX}") + + lock_file = File.open(lock_path, File::CREAT | File::WRONLY) + + if lock_file.flock(File::LOCK_EX | File::LOCK_NB) + Iodine.on_state(:on_finish) { File.unlink(lock_file) } + worker_locks << lock_file block.call end end @@ -83,6 +87,10 @@ def pick_a_worker(lock_path: nil, &block) private + def worker_locks + @worker_locks ||= [] + end + def dynamic_name_seed @dynamic_name_seed ||= ("a".."j").to_a.permutation end diff --git a/lib/rage/pubsub/adapters/redis.rb b/lib/rage/pubsub/adapters/redis.rb index 1f4220f4..d5dbaf3b 100644 --- a/lib/rage/pubsub/adapters/redis.rb +++ b/lib/rage/pubsub/adapters/redis.rb @@ -40,7 +40,7 @@ def initialize(config) @trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid - Rage::Internal.pick_a_worker do + Rage::Internal.pick_a_worker(purpose: "redis-pubsub") do puts("INFO: #{Process.pid} is managing Redis subscriptions.") if Rage.logger.info? poll end diff --git a/spec/deferred/scheduler_spec.rb b/spec/deferred/scheduler_spec.rb index 1448d985..0dc383d1 100644 --- a/spec/deferred/scheduler_spec.rb +++ b/spec/deferred/scheduler_spec.rb @@ -3,7 +3,7 @@ RSpec.describe Rage::Deferred::Scheduler do let(:task) { double("Rage::Deferred::Task") } let(:tasks) { [{ interval: 60, task: task }] } - let(:logger) { double("Logger", info: nil) } + let(:logger) { double("Logger", info: nil, info?: true) } before do allow(Rage).to receive(:logger).and_return(logger) @@ -15,7 +15,7 @@ describe ".start" do it "does not start when no tasks are configured" do described_class.start([]) - expect(Iodine).not_to have_received(:run_every) + expect(Rage::Internal).not_to have_received(:pick_a_worker) end it "registers timers when leader is elected" do @@ -40,14 +40,26 @@ end it "calls enqueue on the task when timer fires" do + allow(Rage.env).to receive(:development?).and_return(false) + allow(Iodine).to receive(:run_every).with(60_000) { |&block| block.call } + described_class.start(tasks) + expect(task).to have_received(:enqueue) + end + + it "reloads the task in development" do + allow(Rage.env).to receive(:development?).and_return(true) + + allow(task).to receive(:name).and_return("TestTask") + stub_const("TestTask", task) + allow(Iodine).to receive(:run_every).with(60_000) { |&block| block.call } described_class.start(tasks) expect(task).to have_received(:enqueue) end - it "passes the correct lock path to pick_a_worker" do + it "passes the correct purpose to pick_a_worker" do described_class.start(tasks) - expect(Rage::Internal).to have_received(:pick_a_worker).with(lock_path: Rage::Deferred::Scheduler::LOCK_PATH) + expect(Rage::Internal).to have_received(:pick_a_worker).with(purpose: "deferred-scheduler") end end end diff --git a/spec/internal_spec.rb b/spec/internal_spec.rb index bc46ba09..47b18abb 100644 --- a/spec/internal_spec.rb +++ b/spec/internal_spec.rb @@ -97,21 +97,29 @@ describe ".pick_a_worker" do let(:on_start_callbacks) { [] } + let(:on_finish_callbacks) { [] } before do allow(Iodine).to receive(:running?).and_return(false) allow(Iodine).to receive(:on_state).with(:on_start) do |&block| on_start_callbacks << block end + + allow(Iodine).to receive(:on_state).with(:on_finish) do |&block| + on_finish_callbacks << block + end + + allow(File).to receive(:open).and_return(lock_file) end + let(:lock_file) { Tempfile.create } + after do - described_class.instance_variable_set(:@lock_file, nil) - described_class.instance_variable_set(:@worker_lock, nil) + described_class.send(:worker_locks).clear end it "registers a callback" do - described_class.pick_a_worker { "work" } + described_class.pick_a_worker(purpose: "test") { "work" } expect(Iodine).to have_received(:on_state).with(:on_start) expect(on_start_callbacks.size).to eq(1) @@ -120,27 +128,30 @@ it "executes the block immediately when Iodine is already running" do allow(Iodine).to receive(:running?).and_return(true) executed = false - described_class.pick_a_worker { executed = true } + described_class.pick_a_worker(purpose: "test") { executed = true } expect(executed).to be(true) end it "creates a lock file" do - described_class.pick_a_worker { "work" } + described_class.pick_a_worker(purpose: "test") { "work" } + + on_start_callbacks.first.call - lock_file = described_class.instance_variable_get(:@lock_file) expect(File.exist?(lock_file.path)).to be(true) end - it "uses provided lock_path instead of creating a new tempfile" do - lock_path = Tempfile.new.path - expect(Tempfile).not_to receive(:new) - described_class.pick_a_worker(lock_path: lock_path) { "work" } + it "deletes the lock file" do + described_class.pick_a_worker(purpose: "test") { "work" } + on_start_callbacks.first.call + on_finish_callbacks.first.call + + expect(File.exist?(lock_file.path)).to be(false) end it "executes the block when lock is acquired" do executed = false - described_class.pick_a_worker { executed = true } + described_class.pick_a_worker(purpose: "test") { executed = true } on_start_callbacks.first.call @@ -148,19 +159,17 @@ end it "stores the worker lock when acquired" do - described_class.pick_a_worker { "work" } + described_class.pick_a_worker(purpose: "test") { "work" } on_start_callbacks.first.call - worker_lock = described_class.instance_variable_get(:@worker_lock) - expect(worker_lock).to be_a(File) + worker_lock = described_class.send(:worker_locks).first + expect(worker_lock).to eq(lock_file) end it "does not execute the block when lock cannot be acquired" do executed = false - described_class.pick_a_worker { executed = true } - - lock_file = described_class.instance_variable_get(:@lock_file) + described_class.pick_a_worker(purpose: "test") { executed = true } external_lock = File.new(lock_file.path) external_lock.flock(File::LOCK_EX | File::LOCK_NB) @@ -174,9 +183,7 @@ it "allows only one worker to execute the block" do execution_count = 0 - described_class.pick_a_worker { execution_count += 1 } - - lock_file = described_class.instance_variable_get(:@lock_file) + described_class.pick_a_worker(purpose: "test") { execution_count += 1 } on_start_callbacks.first.call expect(execution_count).to eq(1)