From ddee0d51ea479b468c1b206b5fb8b2e222e3b843 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Mon, 27 Apr 2026 13:07:02 +0100 Subject: [PATCH 1/2] Update Rage::Cable to use the new PubSub module --- lib/rage/cable/adapters/base.rb | 16 -- lib/rage/cable/adapters/redis.rb | 128 -------------- lib/rage/cable/cable.rb | 32 ++-- lib/rage/configuration.rb | 28 +-- spec/cable/adapters/redis_spec.rb | 271 ------------------------------ 5 files changed, 19 insertions(+), 456 deletions(-) delete mode 100644 lib/rage/cable/adapters/base.rb delete mode 100644 lib/rage/cable/adapters/redis.rb delete mode 100644 spec/cable/adapters/redis_spec.rb diff --git a/lib/rage/cable/adapters/base.rb b/lib/rage/cable/adapters/base.rb deleted file mode 100644 index 0ea64fa6..00000000 --- a/lib/rage/cable/adapters/base.rb +++ /dev/null @@ -1,16 +0,0 @@ -# frozen_string_literal: true - -class Rage::Cable::Adapters::Base - def pick_a_worker(&block) - _lock, lock_path = Tempfile.new.yield_self { |file| [file, file.path] } - - Iodine.on_state(:on_start) do - if File.new(lock_path).flock(File::LOCK_EX | File::LOCK_NB) - if Rage.logger.debug? - puts "INFO: #{Process.pid} is managing #{self.class.name.split("::").last} subscriptions." - end - block.call - end - end - end -end diff --git a/lib/rage/cable/adapters/redis.rb b/lib/rage/cable/adapters/redis.rb deleted file mode 100644 index 3016f461..00000000 --- a/lib/rage/cable/adapters/redis.rb +++ /dev/null @@ -1,128 +0,0 @@ -# frozen_string_literal: true - -require "securerandom" - -if !defined?(RedisClient) - fail <<~ERR - - Redis adapter depends on the `redis-client` gem. Add the following line to your Gemfile: - gem "redis-client" - - ERR -end - -class Rage::Cable::Adapters::Redis < Rage::Cable::Adapters::Base - REDIS_STREAM_NAME = "rage:cable:messages" - DEFAULT_REDIS_OPTIONS = { reconnect_attempts: [0.05, 0.1, 0.5] } - REDIS_MIN_VERSION_SUPPORTED = Gem::Version.create(6) - - def initialize(config) - @redis_stream = if (prefix = config.delete(:channel_prefix)) - "#{prefix}:#{REDIS_STREAM_NAME}" - else - REDIS_STREAM_NAME - end - - @redis_config = RedisClient.config(**DEFAULT_REDIS_OPTIONS.merge(config)) - @server_uuid = SecureRandom.uuid - - redis_version = get_redis_version - if redis_version < REDIS_MIN_VERSION_SUPPORTED - raise "Redis adapter only supports Redis 6+. Detected Redis version: #{redis_version}." - end - - @trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid - - pick_a_worker { poll } - end - - def publish(stream_name, data) - message_uuid = SecureRandom.uuid - - publish_redis.call( - "XADD", - @redis_stream, - trimming_method, "~", trimming_value, - "*", - "1", stream_name, - "2", data.to_json, - "3", @server_uuid, - "4", message_uuid - ) - end - - private - - def publish_redis - @publish_redis ||= @redis_config.new_client - end - - def trimming_method - @trimming_strategy == :maxlen ? "MAXLEN" : "MINID" - end - - def trimming_value - @trimming_strategy == :maxlen ? "10000" : ((Time.now.to_f - 5 * 60) * 1000).to_i - end - - def get_redis_version - service_redis = @redis_config.new_client - version = service_redis.call("INFO").match(/redis_version:([[:graph:]]+)/)[1] - - Gem::Version.create(version) - - rescue RedisClient::Error => e - puts "FATAL: Couldn't connect to Redis - all broadcasts will be limited to the current server." - puts e.backtrace.join("\n") - REDIS_MIN_VERSION_SUPPORTED - - ensure - service_redis.close - end - - def error_backoff_intervals - @error_backoff_intervals ||= Enumerator.new do |y| - y << 0.2 << 0.5 << 1 << 2 << 5 - loop { y << 10 } - end - end - - def poll - unless Fiber.scheduler - Fiber.set_scheduler(Rage::FiberScheduler.new) - end - - Iodine.on_state(:start_shutdown) do - @stopping = true - end - - Fiber.schedule do - read_redis = @redis_config.new_client - last_id = (Time.now.to_f * 1000).to_i - last_message_uuid = nil - - loop do - data = read_redis.blocking_call(5, "XREAD", "COUNT", "100", "BLOCK", "5000", "STREAMS", @redis_stream, last_id) - - if data - data[@redis_stream].each do |id, (_, stream_name, _, serialized_data, _, broadcaster_uuid, _, message_uuid)| - if broadcaster_uuid != @server_uuid && message_uuid != last_message_uuid - Rage.cable.__protocol.broadcast(stream_name, JSON.parse(serialized_data)) - end - - last_id = id - last_message_uuid = message_uuid - end - end - - rescue RedisClient::Error => e - Rage.logger.error("Subscriber error: #{e.message} (#{e.class})") - sleep error_backoff_intervals.next - rescue => e - @stopping ? break : raise(e) - else - error_backoff_intervals.rewind - end - end - end -end diff --git a/lib/rage/cable/cable.rb b/lib/rage/cable/cable.rb index 5e66131c..56c0e625 100644 --- a/lib/rage/cable/cable.rb +++ b/lib/rage/cable/cable.rb @@ -29,6 +29,9 @@ # ``` # module Rage::Cable + PUBSUB_BROADCASTER_ID = "cable" + private_constant :PUBSUB_BROADCASTER_ID + # Create a new Cable application. # # @example @@ -36,9 +39,6 @@ module Rage::Cable # run Rage.cable.application # end def self.application - # explicitly initialize the adapter - __adapter - handler = __build_handler(__protocol) accept_response = [0, __protocol.protocol_definition, []] @@ -61,6 +61,14 @@ def self.application chain end + # @private + def self.__initialize + if (adapter = Rage.config.pubsub.adapter) + adapter.add_broadcaster(PUBSUB_BROADCASTER_ID, __protocol) + @__adapter = adapter + end + end + # @private def self.__router @__router ||= Router.new @@ -71,11 +79,6 @@ def self.__protocol @__protocol ||= Rage.config.cable.protocol.tap { |protocol| protocol.init(__router) } end - # @private - def self.__adapter - @__adapter ||= Rage.config.cable.adapter - end - # @private def self.__build_handler(protocol) klass = Class.new do @@ -142,7 +145,7 @@ def log_error(e) def self.broadcast(stream, data) Rage::Telemetry.tracer.span_cable_stream_broadcast(stream:) do __protocol.broadcast(stream, data) - __adapter&.publish(stream, data) + @__adapter&.publish(PUBSUB_BROADCASTER_ID, stream, data) end true @@ -174,11 +177,6 @@ def self.broadcast(stream, data) # end # end - module Adapters - autoload :Base, "rage/cable/adapters/base" - autoload :Redis, "rage/cable/adapters/redis" - end - module Protocols end @@ -191,3 +189,9 @@ module Protocols require_relative "channel" require_relative "connection" require_relative "router" + +if Rage.config.internal.initialized? + Rage::Cable.__initialize +else + Rage.config.after_initialize { Rage::Cable.__initialize } +end diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index 69c759dd..7335350a 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -640,33 +640,6 @@ def middlewares end end end - - # @private - def config - @config ||= begin - config_file = Rage.root.join("config/cable.yml") - - if config_file.exist? - yaml = ERB.new(config_file.read).result - YAML.safe_load(yaml, aliases: true, symbolize_names: true)[Rage.env.to_sym] || {} - else - {} - end - end - end - - # @private - def adapter_config - config.except(:adapter) - end - - # @private - def adapter - case config[:adapter] - when "redis" - Rage::Cable::Adapters::Redis.new(adapter_config) - end - end end class PublicFileServer @@ -1056,6 +1029,7 @@ def initialize def config @config ||= begin config_file = Rage.root.join("config/pubsub.yml") + config_file = Rage.root.join("config/cable.yml") unless config_file.exist? config = if config_file.exist? yaml = ERB.new(config_file.read).result diff --git a/spec/cable/adapters/redis_spec.rb b/spec/cable/adapters/redis_spec.rb deleted file mode 100644 index 1b41b158..00000000 --- a/spec/cable/adapters/redis_spec.rb +++ /dev/null @@ -1,271 +0,0 @@ -# frozen_string_literal: true - -require "redis-client" - -RSpec.describe Rage::Cable::Adapters::Redis do - subject { described_class.new(adapter_config) } - let(:adapter_config) { {} } - - let(:mock_redis) { instance_double(RedisClient) } - - before do - allow(RedisClient).to receive(:config).and_return(double(new_client: mock_redis)) - - allow(mock_redis).to receive(:call).with("INFO").and_return("redis_version:7.0.5") - allow(mock_redis).to receive(:close) - - allow(Rage.cable).to receive(:__protocol).and_return(double) - end - - describe "#publish" do - it "adds an entry to the stream" do - expect(mock_redis).to receive(:call).with( - "XADD", - "rage:cable:messages", - "MINID", - "~", - instance_of(Integer), - "*", - "1", - "test-stream", - "2", - "{\"hello\":\"world\"}", - "3", - instance_of(String), - "4", - instance_of(String) - ) - - subject.publish("test-stream", { hello: "world" }) - end - - it "uses the same server UUID" do - server_uuids = [] - - expect(mock_redis).to receive(:call).with( - "XADD", - any_args, - "3", - satisfy { |server_uuid| server_uuids << server_uuid }, - anything, - anything - ).twice - - subject.publish("test-stream", {}) - subject.publish("test-stream", {}) - - expect(server_uuids.uniq.count).to eq(1) - end - - it "uses different message UUIDs" do - message_uuids = [] - - expect(mock_redis).to receive(:call).with( - "XADD", - any_args, - "4", - satisfy { |message_uuid| message_uuids << message_uuid } - ).twice - - subject.publish("test-stream", {}) - subject.publish("test-stream", {}) - - expect(message_uuids.uniq.count).to eq(2) - end - - context "with MAXLEN trimming" do - before do - allow(mock_redis).to receive(:call).with("INFO").and_return("redis_version:6.1.0") - end - - it "adds an entry to the stream" do - expect(mock_redis).to receive(:call).with( - "XADD", - "rage:cable:messages", - "MAXLEN", - "~", - "10000", - "*", - "1", - "test-stream", - "2", - "{\"hello\":\"world\"}", - "3", - instance_of(String), - "4", - instance_of(String) - ) - - subject.publish("test-stream", { hello: "world" }) - end - end - - context "with Redis < 6" do - before do - allow(mock_redis).to receive(:call).with("INFO").and_return("redis_version:4.0.0") - end - - it "raises an error" do - expect { subject }.to raise_error("Redis adapter only supports Redis 6+. Detected Redis version: 4.0.0.") - end - end - - context "with channel_prefix" do - let(:adapter_config) { { channel_prefix: "testing" } } - - it "uses the prefixed stream name" do - expect(RedisClient).to receive(:config) do |config| - expect(config).not_to include(:channel_prefix) - double(new_client: mock_redis) - end - - expect(mock_redis).to receive(:call).with( - "XADD", - "testing:rage:cable:messages", - any_args - ) - - subject.publish("test-stream", {}) - end - end - - context "with custom Redis config" do - let(:adapter_config) { { db: "3" } } - - it "applies the config" do - expect(RedisClient).to receive(:config) do |config| - expect(config).to include({ - reconnect_attempts: instance_of(Array), - db: "3" - }) - double(new_client: mock_redis) - end - - allow(mock_redis).to receive(:call).with( - "XADD", - "rage:cable:messages", - any_args - ) - - subject.publish("test-stream", {}) - end - end - - context "with no config" do - let(:adapter_config) { {} } - - it "uses the default config" do - expect(RedisClient).to receive(:config) do |config| - expect(config).to match({ reconnect_attempts: instance_of(Array) }) - double(new_client: mock_redis) - end - - allow(mock_redis).to receive(:call).with( - "XADD", - "rage:cable:messages", - any_args - ) - - subject.publish("test-stream", {}) - end - end - - context "with no available Redis" do - it "doesn't raise error" do - expect(mock_redis).to receive(:call).with("INFO").and_raise(RedisClient::CannotConnectError) - - allow(STDOUT).to receive(:puts).and_call_original - expect(STDOUT).to receive(:puts).with(/Couldn't connect to Redis/) - - expect { subject }.not_to raise_error - end - end - end - - describe "#poll" do - before do - allow_any_instance_of(described_class).to receive(:pick_a_worker).and_yield - allow(Iodine).to receive(:on_state).with(:start_shutdown).and_yield - end - - it "read entries from the stream" do - expect(mock_redis).to receive(:blocking_call).with( - 5, - "XREAD", - "COUNT", - "100", - "BLOCK", - "5000", - "STREAMS", - "rage:cable:messages", - instance_of(Integer) - ).and_raise - - subject - end - - context "with channel_prefix" do - let(:adapter_config) { { channel_prefix: "testing" } } - - it "uses the prefixed stream name" do - expect(mock_redis).to receive(:blocking_call).with( - 5, - "XREAD", - "COUNT", - "100", - "BLOCK", - "5000", - "STREAMS", - "testing:rage:cable:messages", - instance_of(Integer) - ).and_raise - - subject - end - end - - it "broadcasts the message" do - allow(mock_redis).to receive(:blocking_call).and_invoke( - proc { { "rage:cable:messages" => [["id", ["1", "test-stream", "2", "{\"hello\":\"world\"}", "3", "broadcaster UUID", "4", "message UUID"]]] } }, - proc { raise } - ) - - expect(Rage.cable.__protocol).to receive(:broadcast).with( - "test-stream", { "hello" => "world" } - ).once - - subject - end - - it "ignores messages with duplicate message UUIDs" do - allow(mock_redis).to receive(:blocking_call).and_invoke( - proc { - { "rage:cable:messages" => [ - ["id 1", ["1", "test-stream", "2", "{\"hello\":\"world\"}", "3", "broadcaster UUID", "4", "message UUID"]], - ["id 2", ["1", "test-stream", "2", "{\"hello\":\"world\"}", "3", "broadcaster UUID", "4", "message UUID"]] - ] } - }, - proc { raise } - ) - - expect(Rage.cable.__protocol).to receive(:broadcast).with( - "test-stream", { "hello" => "world" } - ).once - - subject - end - - it "ignores messages from the same broadcaster" do - allow(SecureRandom).to receive(:uuid).and_return("broadcaster UUID") - - allow(mock_redis).to receive(:blocking_call).and_invoke( - proc { { "rage:cable:messages" => [["id", ["1", "test-stream", "2", "{\"hello\":\"world\"}", "3", "broadcaster UUID", "4", "message UUID"]]] } }, - proc { raise } - ) - - expect(Rage.cable.__protocol).not_to receive(:broadcast) - - subject - end - end -end From 8ab7009c7ba7539291c72763be29fe44be3ac90c Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Sun, 3 May 2026 16:20:17 +0100 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 04d62db0..dd663efe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ ### Changed - [Deferred] Increase default retry limit to 20 and update default retry backoff to `(attempt**4) + 10 + (rand(15) * attempt)` by [@anuj-pal27](https://github.com/anuj-pal27) (#271). +- Update `Rage::Cable` to use the new `PubSub` module (#281). ## [1.23.0] - 2026-04-15