From e0844436dbea52f6967e606487340cebfab60b0d Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Mon, 4 May 2026 19:02:42 +0100 Subject: [PATCH 1/5] Implement `FiberScheduler#fiber_interrupt` --- lib/rage/fiber.rb | 36 +++----- lib/rage/fiber_scheduler.rb | 25 ++++-- spec/fiber_scheduler_spec.rb | 159 +++++++++++++++++++++++++++++++++++ spec/fiber_spec.rb | 48 ++++++++++- spec/telemetry/spans_spec.rb | 1 + 5 files changed, 237 insertions(+), 32 deletions(-) diff --git a/lib/rage/fiber.rb b/lib/rage/fiber.rb index eb9ed97d..b9757f6d 100644 --- a/lib/rage/fiber.rb +++ b/lib/rage/fiber.rb @@ -111,23 +111,7 @@ def __get_id end # @private - def __block_channel(force = false) - @__block_channel_i ||= 0 - @__block_channel_i += 1 if force - - "block:#{object_id}:#{@__block_channel_i}" - end - - # @private - def __await_channel(force = false) - @__fiber_channel_i ||= 0 - @__fiber_channel_i += 1 if force - - "await:#{object_id}:#{@__fiber_channel_i}" - end - - # @private - attr_accessor :__awaited_fileno + attr_accessor :__awaited_fileno, :__wait_generation, :__block_channel, :__await_channel # @private # pause a fiber and resume in the next iteration of the event loop @@ -156,7 +140,9 @@ class << self # @note This method should only be used when multiple fibers have to be processed in parallel. There's no need to use `Fiber.await` for single IO calls. def self.await(fibers) f, fibers = Fiber.current, Array(fibers) - await_channel = f.__await_channel(true) + + gen = (f.__wait_generation += 1) + channel = f.__await_channel = "await:#{f.object_id}:#{gen}" Rage::Telemetry.tracer.span_core_fiber_await(fibers:) do # check which fibers are alive (i.e. have yielded) and which have errored out @@ -179,17 +165,21 @@ def self.await(fibers) end # wait on async fibers; resume right away if one of the fibers errors out - Iodine.subscribe(await_channel) do |_, err| - if err == AWAIT_ERROR_MESSAGE - f.resume + Iodine.subscribe(channel) do |_, err| + done = if err == AWAIT_ERROR_MESSAGE + true else num_wait_for -= 1 - f.resume if num_wait_for == 0 + num_wait_for == 0 + end + + if done + Iodine.defer { Iodine.unsubscribe(channel) } + f.resume if f.alive? && gen == f.__wait_generation end end Fiber.defer(-1) - Iodine.defer { Iodine.unsubscribe(await_channel) } # if num_wait_for is not 0 means we exited prematurely because of an error if num_wait_for > 0 diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 5bec50c0..65e0a48e 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -12,7 +12,11 @@ def initialize def io_wait(io, events, timeout = nil) f = Fiber.current - ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) } + gen = (f.__wait_generation += 1) + + ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) do |err| + f.resume(err) if f.alive? && gen == f.__wait_generation + end err = Fiber.defer(io.fileno) if err == false || (err && err < 0) @@ -91,13 +95,16 @@ def address_resolve(hostname) end def block(_blocker, timeout = nil) - f, fulfilled, channel = Fiber.current, false, Fiber.current.__block_channel(true) + f, fulfilled = Fiber.current, false + + gen = (f.__wait_generation += 1) + channel = f.__block_channel = "block:#{f.object_id}:#{gen}" resume_fiber_block = proc do unless fulfilled fulfilled = true ::Iodine.defer { ::Iodine.unsubscribe(channel) } - f.resume if f.alive? + f.resume if f.alive? && gen == f.__wait_generation end end @@ -110,7 +117,12 @@ def block(_blocker, timeout = nil) end def unblock(_blocker, fiber) - ::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS) + ::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS) if fiber.__block_channel + end + + def fiber_interrupt(fiber, exception) + fiber.__wait_generation += 1 + fiber.raise(exception) end def fiber(&block) @@ -131,13 +143,14 @@ def fiber(&block) Fiber.current.__set_result(block.call) end # send a message for `Fiber.await` to work - Iodine.publish(parent.__await_channel, "", Iodine::PubSub::PROCESS) if parent.alive? + Iodine.publish(parent.__await_channel, "", Iodine::PubSub::PROCESS) if parent.__await_channel rescue Exception => e Fiber.current.__set_err(e) - Iodine.publish(parent.__await_channel, Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.alive? + Iodine.publish(parent.__await_channel, Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.__await_channel end end + fiber.__wait_generation = 0 fiber.resume fiber diff --git a/spec/fiber_scheduler_spec.rb b/spec/fiber_scheduler_spec.rb index f5f49ba9..3ef9fb4c 100644 --- a/spec/fiber_scheduler_spec.rb +++ b/spec/fiber_scheduler_spec.rb @@ -356,4 +356,163 @@ -> { raise e } end end + + context "fiber interruption" do + before do + skip("skipping on Ruby < 4") if Gem::Version.create(RUBY_VERSION) < Gem::Version.create(4) + end + + it "correctly interrupts a fiber" do + within_reactor do + error = begin + r, w = IO.pipe + + child = Fiber.schedule do + r.gets + end + + Fiber.schedule do + r.close # This will interrupt the child fiber. + end + + Fiber.await(child) + rescue => e + e + end + + -> { expect(error).to be_a(IOError) } + end + end + end + + context "with wait generation tracking" do + it "initializes __wait_generation to 0 for scheduled fibers" do + within_reactor do + fiber = Fiber.schedule { Fiber.current.__wait_generation } + result = Fiber.await(fiber) + + -> { expect(result).to eq([0]) } + end + end + + it "increments __wait_generation on each block call" do + queue1 = Queue.new + queue2 = Queue.new + + Thread.new do + sleep 0.2 + queue1 << "first" + sleep 0.2 + queue2 << "second" + end + + within_reactor do + gen_before = Fiber.current.__wait_generation + queue1.pop + gen_after_first = Fiber.current.__wait_generation + queue2.pop + gen_after_second = Fiber.current.__wait_generation + + -> do + expect(gen_after_first).to eq(gen_before + 1) + expect(gen_after_second).to eq(gen_before + 2) + end + end + end + + it "increments __wait_generation on Fiber.await" do + within_reactor do + gen_before = Fiber.current.__wait_generation + + Fiber.await(Fiber.schedule { sleep 0.1 }) + gen_after_first = Fiber.current.__wait_generation + + Fiber.await(Fiber.schedule { sleep 0.1 }) + gen_after_second = Fiber.current.__wait_generation + + -> do + expect(gen_after_first).to eq(gen_before + 1) + expect(gen_after_second).to eq(gen_before + 2) + end + end + end + + it "prevents stale resume when fiber moves to new wait state" do + queue1 = Queue.new + queue2 = Queue.new + old_channel = nil + results = [] + + within_reactor do + fiber = Fiber.schedule do + f = Fiber.current + + # Start a thread that will: + # 1. Unblock the first wait + # 2. Capture the old channel + # 3. Try to publish to the old channel while fiber is in second wait + Thread.new do + sleep 0.1 + old_channel = f.__block_channel + queue1 << "first" + + sleep 0.2 + # Try to publish to the old channel (simulating a stale unblock) + # This shouldn't resume the fiber because generation has changed + Iodine.publish(old_channel, "", Iodine::PubSub::PROCESS) + end + + results << queue1.pop + results << "starting second wait" + + # Start second unblock thread + Thread.new { sleep 0.5; queue2 << "second" } + + # Now block on second queue - stale resume from old_channel should be ignored + results << queue2.pop + results + end + + result = Fiber.await(fiber) + -> { expect(result.first).to eq(["first", "starting second wait", "second"]) } + end + end + + it "fiber_interrupt increments generation before raising" do + test_fiber = nil + generation_before = nil + generation_in_rescue = nil + + within_reactor do + test_fiber = Fiber.schedule do + generation_before = Fiber.current.__wait_generation + + begin + sleep 10 # Block for a long time + rescue => e + generation_in_rescue = Fiber.current.__wait_generation + raise e + end + end + + # Give the fiber time to start sleeping + sleep 0.1 + + # Interrupt the fiber + Fiber.scheduler.fiber_interrupt(test_fiber, RuntimeError.new("interrupted")) + + # Wait for fiber to process the exception + sleep 0.1 + + -> do + # generation_before is 0 (after init) + # sleep increments to 1 + # fiber_interrupt increments to 2 + expect(generation_in_rescue).to eq(generation_before + 2) + expect(test_fiber.__get_err).to be_a(RuntimeError) + expect(test_fiber.__get_err.message).to eq("interrupted") + end + end + end + end end diff --git a/spec/fiber_spec.rb b/spec/fiber_spec.rb index a1954984..0840ea00 100644 --- a/spec/fiber_spec.rb +++ b/spec/fiber_spec.rb @@ -108,16 +108,22 @@ end it "correctly watches on an empty list" do - expect(Fiber.await([])).to eq([]) + within_reactor do + result = Fiber.await([]) + -> { expect(result).to eq([]) } + end end it "correctly watches on terminated fibers" do within_reactor do fiber = Fiber.schedule { 125 } + result_1 = Fiber.await(fiber) + result_2 = Fiber.await(fiber) + -> do - expect(Fiber.await(fiber)).to eq([125]) - expect(Fiber.await(fiber)).to eq([125]) + expect(result_1).to eq([125]) + expect(result_2).to eq([125]) end end end @@ -193,4 +199,40 @@ -> {} end end + + context "with wait generation tracking" do + it "uses unique await channels for each Fiber.await call" do + within_reactor do + channels = [] + + Fiber.await(Fiber.schedule { sleep 0.05 }) + channels << Fiber.current.__await_channel + + Fiber.await(Fiber.schedule { sleep 0.05 }) + channels << Fiber.current.__await_channel + + Fiber.await(Fiber.schedule { sleep 0.05 }) + channels << Fiber.current.__await_channel + + -> { expect(channels.uniq.size).to eq(3) } + end + end + + it "increments __wait_generation for each await" do + within_reactor do + generations = [Fiber.current.__wait_generation] + + Fiber.await(Fiber.schedule { sleep 0.05 }) + generations << Fiber.current.__wait_generation + + Fiber.await(Fiber.schedule { sleep 0.05 }) + generations << Fiber.current.__wait_generation + + -> do + expect(generations[1]).to eq(generations[0] + 1) + expect(generations[2]).to eq(generations[0] + 2) + end + end + end + end end diff --git a/spec/telemetry/spans_spec.rb b/spec/telemetry/spans_spec.rb index f2cd831c..ec4bcfcd 100644 --- a/spec/telemetry/spans_spec.rb +++ b/spec/telemetry/spans_spec.rb @@ -43,6 +43,7 @@ def self.test_span(**) expect(verifier).to receive(:call).with({ id: "core.fiber.await", name: "Fiber.await", fibers: [f1, f2] }) + Fiber.current.__wait_generation = 0 Fiber.await([f1, f2]) end end From 6bdb56a0fc1ec071f827d90ee7152cf201d4089e Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Mon, 4 May 2026 19:30:23 +0100 Subject: [PATCH 2/5] Fix rubocop --- spec/fiber_scheduler_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/fiber_scheduler_spec.rb b/spec/fiber_scheduler_spec.rb index 3ef9fb4c..3f8bdc91 100644 --- a/spec/fiber_scheduler_spec.rb +++ b/spec/fiber_scheduler_spec.rb @@ -365,12 +365,12 @@ it "correctly interrupts a fiber" do within_reactor do error = begin - r, w = IO.pipe + r, _ = IO.pipe child = Fiber.schedule do r.gets end - + Fiber.schedule do r.close # This will interrupt the child fiber. end From b8922ea0cdd307a10a99be3adf6173c3bf076755 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Mon, 4 May 2026 19:32:34 +0100 Subject: [PATCH 3/5] Ensure generation counter is initialized in `Fiber.await` technically, it should always be initialized if called inside a request fiber. But there could still be cases when it's not, for example in a handshake span. It wouldn't make much sense, but just to be safe --- lib/rage/fiber.rb | 1 + spec/fiber_spec.rb | 12 +++--------- spec/telemetry/spans_spec.rb | 1 - 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/rage/fiber.rb b/lib/rage/fiber.rb index b9757f6d..a7e02d08 100644 --- a/lib/rage/fiber.rb +++ b/lib/rage/fiber.rb @@ -141,6 +141,7 @@ class << self def self.await(fibers) f, fibers = Fiber.current, Array(fibers) + f.__wait_generation ||= 0 gen = (f.__wait_generation += 1) channel = f.__await_channel = "await:#{f.object_id}:#{gen}" diff --git a/spec/fiber_spec.rb b/spec/fiber_spec.rb index 0840ea00..dbe22ec0 100644 --- a/spec/fiber_spec.rb +++ b/spec/fiber_spec.rb @@ -108,22 +108,16 @@ end it "correctly watches on an empty list" do - within_reactor do - result = Fiber.await([]) - -> { expect(result).to eq([]) } - end + expect(Fiber.await([])).to eq([]) end it "correctly watches on terminated fibers" do within_reactor do fiber = Fiber.schedule { 125 } - result_1 = Fiber.await(fiber) - result_2 = Fiber.await(fiber) - -> do - expect(result_1).to eq([125]) - expect(result_2).to eq([125]) + expect(Fiber.await(fiber)).to eq([125]) + expect(Fiber.await(fiber)).to eq([125]) end end end diff --git a/spec/telemetry/spans_spec.rb b/spec/telemetry/spans_spec.rb index ec4bcfcd..f2cd831c 100644 --- a/spec/telemetry/spans_spec.rb +++ b/spec/telemetry/spans_spec.rb @@ -43,7 +43,6 @@ def self.test_span(**) expect(verifier).to receive(:call).with({ id: "core.fiber.await", name: "Fiber.await", fibers: [f1, f2] }) - Fiber.current.__wait_generation = 0 Fiber.await([f1, f2]) end end From 6992e68b3db7288f60d284ca5f0eda3976a72d01 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Mon, 4 May 2026 19:33:15 +0100 Subject: [PATCH 4/5] Update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41155363..6045196c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## [Unreleased] +### Added + +- Implement `FiberScheduler#fiber_interrupt` (#283). + ## [1.24.0] - 2026-05-12 ### Added From 75cb29a42c404d21f65bed41f5b9e71850b94577 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Wed, 6 May 2026 18:19:18 +0100 Subject: [PATCH 5/5] Add Fiber#kill tests --- spec/fiber_spec.rb | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/spec/fiber_spec.rb b/spec/fiber_spec.rb index dbe22ec0..3116cd9b 100644 --- a/spec/fiber_spec.rb +++ b/spec/fiber_spec.rb @@ -194,6 +194,41 @@ end end + it "correctly processes killed fibers" do + within_reactor do + f = Fiber.schedule { sleep 30 } + + Fiber.schedule do + sleep 1 + f.kill + end + + result = Fiber.await(f) + + -> { expect(result).to eq([nil]) } + end + end + + it "correctly processes killed fibers" do + within_reactor do + f1 = Fiber.schedule { sleep 30 } + + f2 = Fiber.schedule do + sleep 0.5 + "test result" + end + + Fiber.schedule do + sleep 1 + f1.kill + end + + result = Fiber.await([f1, f2]) + + -> { expect(result).to eq([nil, "test result"]) } + end + end + context "with wait generation tracking" do it "uses unique await channels for each Fiber.await call" do within_reactor do