Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## [Unreleased]

### Added

- Implement `FiberScheduler#fiber_interrupt` (#283).

## [1.24.0] - 2026-05-12

### Added
Expand Down
37 changes: 14 additions & 23 deletions lib/rage/fiber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,7 @@ def __get_id
end

# @private
def __block_channel(force = false)
@__block_channel_i ||= 0
@__block_channel_i += 1 if force

"block:#{object_id}:#{@__block_channel_i}"
end

# @private
def __await_channel(force = false)
@__fiber_channel_i ||= 0
@__fiber_channel_i += 1 if force

"await:#{object_id}:#{@__fiber_channel_i}"
end

# @private
attr_accessor :__awaited_fileno
attr_accessor :__awaited_fileno, :__wait_generation, :__block_channel, :__await_channel

# @private
# pause a fiber and resume in the next iteration of the event loop
Expand Down Expand Up @@ -156,7 +140,10 @@ class << self
# @note This method should only be used when multiple fibers have to be processed in parallel. There's no need to use `Fiber.await` for single IO calls.
def self.await(fibers)
f, fibers = Fiber.current, Array(fibers)
await_channel = f.__await_channel(true)

f.__wait_generation ||= 0
gen = (f.__wait_generation += 1)
channel = f.__await_channel = "await:#{f.object_id}:#{gen}"

Rage::Telemetry.tracer.span_core_fiber_await(fibers:) do
# check which fibers are alive (i.e. have yielded) and which have errored out
Expand All @@ -179,17 +166,21 @@ def self.await(fibers)
end

# wait on async fibers; resume right away if one of the fibers errors out
Iodine.subscribe(await_channel) do |_, err|
if err == AWAIT_ERROR_MESSAGE
f.resume
Iodine.subscribe(channel) do |_, err|
done = if err == AWAIT_ERROR_MESSAGE
true
else
num_wait_for -= 1
f.resume if num_wait_for == 0
num_wait_for == 0
end

if done
Iodine.defer { Iodine.unsubscribe(channel) }
f.resume if f.alive? && gen == f.__wait_generation
end
end

Fiber.defer(-1)
Iodine.defer { Iodine.unsubscribe(await_channel) }

# if num_wait_for is not 0 means we exited prematurely because of an error
if num_wait_for > 0
Expand Down
25 changes: 19 additions & 6 deletions lib/rage/fiber_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ def initialize

def io_wait(io, events, timeout = nil)
f = Fiber.current
::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) }
gen = (f.__wait_generation += 1)

::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) do |err|
f.resume(err) if f.alive? && gen == f.__wait_generation
end

err = Fiber.defer(io.fileno)
if err == false || (err && err < 0)
Expand Down Expand Up @@ -91,13 +95,16 @@ def address_resolve(hostname)
end

def block(_blocker, timeout = nil)
f, fulfilled, channel = Fiber.current, false, Fiber.current.__block_channel(true)
f, fulfilled = Fiber.current, false

gen = (f.__wait_generation += 1)
channel = f.__block_channel = "block:#{f.object_id}:#{gen}"

resume_fiber_block = proc do
unless fulfilled
fulfilled = true
::Iodine.defer { ::Iodine.unsubscribe(channel) }
f.resume if f.alive?
f.resume if f.alive? && gen == f.__wait_generation
end
end

Expand All @@ -110,7 +117,12 @@ def block(_blocker, timeout = nil)
end

def unblock(_blocker, fiber)
::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS)
::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS) if fiber.__block_channel
end

def fiber_interrupt(fiber, exception)
fiber.__wait_generation += 1
fiber.raise(exception)
end

def fiber(&block)
Expand All @@ -131,13 +143,14 @@ def fiber(&block)
Fiber.current.__set_result(block.call)
end
# send a message for `Fiber.await` to work
Iodine.publish(parent.__await_channel, "", Iodine::PubSub::PROCESS) if parent.alive?
Iodine.publish(parent.__await_channel, "", Iodine::PubSub::PROCESS) if parent.__await_channel
rescue Exception => e
Fiber.current.__set_err(e)
Iodine.publish(parent.__await_channel, Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.alive?
Iodine.publish(parent.__await_channel, Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.__await_channel
end
end

fiber.__wait_generation = 0
fiber.resume

fiber
Expand Down
159 changes: 159 additions & 0 deletions spec/fiber_scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -356,4 +356,163 @@
-> { raise e }
end
end

context "fiber interruption" do
before do
skip("skipping on Ruby < 4") if Gem::Version.create(RUBY_VERSION) < Gem::Version.create(4)
end

it "correctly interrupts a fiber" do
within_reactor do
error = begin
r, _ = IO.pipe

child = Fiber.schedule do
r.gets
end

Fiber.schedule do
r.close # This will interrupt the child fiber.
end

Fiber.await(child)
rescue => e
e
end

-> { expect(error).to be_a(IOError) }
end
end
end

context "with wait generation tracking" do
it "initializes __wait_generation to 0 for scheduled fibers" do
within_reactor do
fiber = Fiber.schedule { Fiber.current.__wait_generation }
result = Fiber.await(fiber)

-> { expect(result).to eq([0]) }
end
end

it "increments __wait_generation on each block call" do
queue1 = Queue.new
queue2 = Queue.new

Thread.new do
sleep 0.2
queue1 << "first"
sleep 0.2
queue2 << "second"
end

within_reactor do
gen_before = Fiber.current.__wait_generation
queue1.pop
gen_after_first = Fiber.current.__wait_generation
queue2.pop
gen_after_second = Fiber.current.__wait_generation

-> do
expect(gen_after_first).to eq(gen_before + 1)
expect(gen_after_second).to eq(gen_before + 2)
end
end
end

it "increments __wait_generation on Fiber.await" do
within_reactor do
gen_before = Fiber.current.__wait_generation

Fiber.await(Fiber.schedule { sleep 0.1 })
gen_after_first = Fiber.current.__wait_generation

Fiber.await(Fiber.schedule { sleep 0.1 })
gen_after_second = Fiber.current.__wait_generation

-> do
expect(gen_after_first).to eq(gen_before + 1)
expect(gen_after_second).to eq(gen_before + 2)
end
end
end

it "prevents stale resume when fiber moves to new wait state" do
queue1 = Queue.new
queue2 = Queue.new
old_channel = nil
results = []

within_reactor do
fiber = Fiber.schedule do
f = Fiber.current

# Start a thread that will:
# 1. Unblock the first wait
# 2. Capture the old channel
# 3. Try to publish to the old channel while fiber is in second wait
Thread.new do
sleep 0.1
old_channel = f.__block_channel
queue1 << "first"

sleep 0.2
# Try to publish to the old channel (simulating a stale unblock)
# This shouldn't resume the fiber because generation has changed
Iodine.publish(old_channel, "", Iodine::PubSub::PROCESS)
end

results << queue1.pop
results << "starting second wait"

# Start second unblock thread
Thread.new { sleep 0.5; queue2 << "second" }

# Now block on second queue - stale resume from old_channel should be ignored
results << queue2.pop
results
end

result = Fiber.await(fiber)
-> { expect(result.first).to eq(["first", "starting second wait", "second"]) }
end
end

it "fiber_interrupt increments generation before raising" do
test_fiber = nil
generation_before = nil
generation_in_rescue = nil

within_reactor do
test_fiber = Fiber.schedule do
generation_before = Fiber.current.__wait_generation

begin
sleep 10 # Block for a long time
rescue => e
generation_in_rescue = Fiber.current.__wait_generation
raise e
end
end

# Give the fiber time to start sleeping
sleep 0.1

# Interrupt the fiber
Fiber.scheduler.fiber_interrupt(test_fiber, RuntimeError.new("interrupted"))

# Wait for fiber to process the exception
sleep 0.1

-> do
# generation_before is 0 (after init)
# sleep increments to 1
# fiber_interrupt increments to 2
expect(generation_in_rescue).to eq(generation_before + 2)
expect(test_fiber.__get_err).to be_a(RuntimeError)
expect(test_fiber.__get_err.message).to eq("interrupted")
end
end
end
end
end
71 changes: 71 additions & 0 deletions spec/fiber_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,75 @@
-> {}
end
end

it "correctly processes killed fibers" do
within_reactor do
f = Fiber.schedule { sleep 30 }

Fiber.schedule do
sleep 1
f.kill
end

result = Fiber.await(f)

-> { expect(result).to eq([nil]) }
end
end

it "correctly processes killed fibers" do
within_reactor do
f1 = Fiber.schedule { sleep 30 }

f2 = Fiber.schedule do
sleep 0.5
"test result"
end

Fiber.schedule do
sleep 1
f1.kill
end

result = Fiber.await([f1, f2])

-> { expect(result).to eq([nil, "test result"]) }
end
end

context "with wait generation tracking" do
it "uses unique await channels for each Fiber.await call" do
within_reactor do
channels = []

Fiber.await(Fiber.schedule { sleep 0.05 })
channels << Fiber.current.__await_channel

Fiber.await(Fiber.schedule { sleep 0.05 })
channels << Fiber.current.__await_channel

Fiber.await(Fiber.schedule { sleep 0.05 })
channels << Fiber.current.__await_channel

-> { expect(channels.uniq.size).to eq(3) }
end
end

it "increments __wait_generation for each await" do
within_reactor do
generations = [Fiber.current.__wait_generation]

Fiber.await(Fiber.schedule { sleep 0.05 })
generations << Fiber.current.__wait_generation

Fiber.await(Fiber.schedule { sleep 0.05 })
generations << Fiber.current.__wait_generation

-> do
expect(generations[1]).to eq(generations[0] + 1)
expect(generations[2]).to eq(generations[0] + 2)
end
end
end
end
end