diff --git a/CHANGELOG.md b/CHANGELOG.md index 41155363..6045196c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## [Unreleased] +### Added + +- Implement `FiberScheduler#fiber_interrupt` (#283). + ## [1.24.0] - 2026-05-12 ### Added diff --git a/lib/rage/fiber.rb b/lib/rage/fiber.rb index eb9ed97d..a7e02d08 100644 --- a/lib/rage/fiber.rb +++ b/lib/rage/fiber.rb @@ -111,23 +111,7 @@ def __get_id end # @private - def __block_channel(force = false) - @__block_channel_i ||= 0 - @__block_channel_i += 1 if force - - "block:#{object_id}:#{@__block_channel_i}" - end - - # @private - def __await_channel(force = false) - @__fiber_channel_i ||= 0 - @__fiber_channel_i += 1 if force - - "await:#{object_id}:#{@__fiber_channel_i}" - end - - # @private - attr_accessor :__awaited_fileno + attr_accessor :__awaited_fileno, :__wait_generation, :__block_channel, :__await_channel # @private # pause a fiber and resume in the next iteration of the event loop @@ -156,7 +140,10 @@ class << self # @note This method should only be used when multiple fibers have to be processed in parallel. There's no need to use `Fiber.await` for single IO calls. def self.await(fibers) f, fibers = Fiber.current, Array(fibers) - await_channel = f.__await_channel(true) + + f.__wait_generation ||= 0 + gen = (f.__wait_generation += 1) + channel = f.__await_channel = "await:#{f.object_id}:#{gen}" Rage::Telemetry.tracer.span_core_fiber_await(fibers:) do # check which fibers are alive (i.e. have yielded) and which have errored out @@ -179,17 +166,21 @@ def self.await(fibers) end # wait on async fibers; resume right away if one of the fibers errors out - Iodine.subscribe(await_channel) do |_, err| - if err == AWAIT_ERROR_MESSAGE - f.resume + Iodine.subscribe(channel) do |_, err| + done = if err == AWAIT_ERROR_MESSAGE + true else num_wait_for -= 1 - f.resume if num_wait_for == 0 + num_wait_for == 0 + end + + if done + Iodine.defer { Iodine.unsubscribe(channel) } + f.resume if f.alive? && gen == f.__wait_generation end end Fiber.defer(-1) - Iodine.defer { Iodine.unsubscribe(await_channel) } # if num_wait_for is not 0 means we exited prematurely because of an error if num_wait_for > 0 diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 5bec50c0..65e0a48e 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -12,7 +12,11 @@ def initialize def io_wait(io, events, timeout = nil) f = Fiber.current - ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) } + gen = (f.__wait_generation += 1) + + ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) do |err| + f.resume(err) if f.alive? && gen == f.__wait_generation + end err = Fiber.defer(io.fileno) if err == false || (err && err < 0) @@ -91,13 +95,16 @@ def address_resolve(hostname) end def block(_blocker, timeout = nil) - f, fulfilled, channel = Fiber.current, false, Fiber.current.__block_channel(true) + f, fulfilled = Fiber.current, false + + gen = (f.__wait_generation += 1) + channel = f.__block_channel = "block:#{f.object_id}:#{gen}" resume_fiber_block = proc do unless fulfilled fulfilled = true ::Iodine.defer { ::Iodine.unsubscribe(channel) } - f.resume if f.alive? + f.resume if f.alive? && gen == f.__wait_generation end end @@ -110,7 +117,12 @@ def block(_blocker, timeout = nil) end def unblock(_blocker, fiber) - ::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS) + ::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS) if fiber.__block_channel + end + + def fiber_interrupt(fiber, exception) + fiber.__wait_generation += 1 + fiber.raise(exception) end def fiber(&block) @@ -131,13 +143,14 @@ def fiber(&block) Fiber.current.__set_result(block.call) end # send a message for `Fiber.await` to work - Iodine.publish(parent.__await_channel, "", Iodine::PubSub::PROCESS) if parent.alive? + Iodine.publish(parent.__await_channel, "", Iodine::PubSub::PROCESS) if parent.__await_channel rescue Exception => e Fiber.current.__set_err(e) - Iodine.publish(parent.__await_channel, Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.alive? + Iodine.publish(parent.__await_channel, Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.__await_channel end end + fiber.__wait_generation = 0 fiber.resume fiber diff --git a/spec/fiber_scheduler_spec.rb b/spec/fiber_scheduler_spec.rb index f5f49ba9..3f8bdc91 100644 --- a/spec/fiber_scheduler_spec.rb +++ b/spec/fiber_scheduler_spec.rb @@ -356,4 +356,163 @@ -> { raise e } end end + + context "fiber interruption" do + before do + skip("skipping on Ruby < 4") if Gem::Version.create(RUBY_VERSION) < Gem::Version.create(4) + end + + it "correctly interrupts a fiber" do + within_reactor do + error = begin + r, _ = IO.pipe + + child = Fiber.schedule do + r.gets + end + + Fiber.schedule do + r.close # This will interrupt the child fiber. + end + + Fiber.await(child) + rescue => e + e + end + + -> { expect(error).to be_a(IOError) } + end + end + end + + context "with wait generation tracking" do + it "initializes __wait_generation to 0 for scheduled fibers" do + within_reactor do + fiber = Fiber.schedule { Fiber.current.__wait_generation } + result = Fiber.await(fiber) + + -> { expect(result).to eq([0]) } + end + end + + it "increments __wait_generation on each block call" do + queue1 = Queue.new + queue2 = Queue.new + + Thread.new do + sleep 0.2 + queue1 << "first" + sleep 0.2 + queue2 << "second" + end + + within_reactor do + gen_before = Fiber.current.__wait_generation + queue1.pop + gen_after_first = Fiber.current.__wait_generation + queue2.pop + gen_after_second = Fiber.current.__wait_generation + + -> do + expect(gen_after_first).to eq(gen_before + 1) + expect(gen_after_second).to eq(gen_before + 2) + end + end + end + + it "increments __wait_generation on Fiber.await" do + within_reactor do + gen_before = Fiber.current.__wait_generation + + Fiber.await(Fiber.schedule { sleep 0.1 }) + gen_after_first = Fiber.current.__wait_generation + + Fiber.await(Fiber.schedule { sleep 0.1 }) + gen_after_second = Fiber.current.__wait_generation + + -> do + expect(gen_after_first).to eq(gen_before + 1) + expect(gen_after_second).to eq(gen_before + 2) + end + end + end + + it "prevents stale resume when fiber moves to new wait state" do + queue1 = Queue.new + queue2 = Queue.new + old_channel = nil + results = [] + + within_reactor do + fiber = Fiber.schedule do + f = Fiber.current + + # Start a thread that will: + # 1. Unblock the first wait + # 2. Capture the old channel + # 3. Try to publish to the old channel while fiber is in second wait + Thread.new do + sleep 0.1 + old_channel = f.__block_channel + queue1 << "first" + + sleep 0.2 + # Try to publish to the old channel (simulating a stale unblock) + # This shouldn't resume the fiber because generation has changed + Iodine.publish(old_channel, "", Iodine::PubSub::PROCESS) + end + + results << queue1.pop + results << "starting second wait" + + # Start second unblock thread + Thread.new { sleep 0.5; queue2 << "second" } + + # Now block on second queue - stale resume from old_channel should be ignored + results << queue2.pop + results + end + + result = Fiber.await(fiber) + -> { expect(result.first).to eq(["first", "starting second wait", "second"]) } + end + end + + it "fiber_interrupt increments generation before raising" do + test_fiber = nil + generation_before = nil + generation_in_rescue = nil + + within_reactor do + test_fiber = Fiber.schedule do + generation_before = Fiber.current.__wait_generation + + begin + sleep 10 # Block for a long time + rescue => e + generation_in_rescue = Fiber.current.__wait_generation + raise e + end + end + + # Give the fiber time to start sleeping + sleep 0.1 + + # Interrupt the fiber + Fiber.scheduler.fiber_interrupt(test_fiber, RuntimeError.new("interrupted")) + + # Wait for fiber to process the exception + sleep 0.1 + + -> do + # generation_before is 0 (after init) + # sleep increments to 1 + # fiber_interrupt increments to 2 + expect(generation_in_rescue).to eq(generation_before + 2) + expect(test_fiber.__get_err).to be_a(RuntimeError) + expect(test_fiber.__get_err.message).to eq("interrupted") + end + end + end + end end diff --git a/spec/fiber_spec.rb b/spec/fiber_spec.rb index a1954984..3116cd9b 100644 --- a/spec/fiber_spec.rb +++ b/spec/fiber_spec.rb @@ -193,4 +193,75 @@ -> {} end end + + it "correctly processes killed fibers" do + within_reactor do + f = Fiber.schedule { sleep 30 } + + Fiber.schedule do + sleep 1 + f.kill + end + + result = Fiber.await(f) + + -> { expect(result).to eq([nil]) } + end + end + + it "correctly processes killed fibers" do + within_reactor do + f1 = Fiber.schedule { sleep 30 } + + f2 = Fiber.schedule do + sleep 0.5 + "test result" + end + + Fiber.schedule do + sleep 1 + f1.kill + end + + result = Fiber.await([f1, f2]) + + -> { expect(result).to eq([nil, "test result"]) } + end + end + + context "with wait generation tracking" do + it "uses unique await channels for each Fiber.await call" do + within_reactor do + channels = [] + + Fiber.await(Fiber.schedule { sleep 0.05 }) + channels << Fiber.current.__await_channel + + Fiber.await(Fiber.schedule { sleep 0.05 }) + channels << Fiber.current.__await_channel + + Fiber.await(Fiber.schedule { sleep 0.05 }) + channels << Fiber.current.__await_channel + + -> { expect(channels.uniq.size).to eq(3) } + end + end + + it "increments __wait_generation for each await" do + within_reactor do + generations = [Fiber.current.__wait_generation] + + Fiber.await(Fiber.schedule { sleep 0.05 }) + generations << Fiber.current.__wait_generation + + Fiber.await(Fiber.schedule { sleep 0.05 }) + generations << Fiber.current.__wait_generation + + -> do + expect(generations[1]).to eq(generations[0] + 1) + expect(generations[2]).to eq(generations[0] + 2) + end + end + end + end end