From 996e9df0a1cecbacf68eae725f55f35279b3e1b4 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Mon, 23 Mar 2026 15:39:31 +0300 Subject: [PATCH 01/13] Add compression when publish and receive big messages --- Gemfile.lock | 4 ++++ lib/rabbit.rb | 5 +++- lib/rabbit/compressor.rb | 23 ++++++++++++++++++ lib/rabbit/publishing/message.rb | 25 ++++++++++++++++---- lib/rabbit/receiving/job.rb | 1 + lib/rabbit/receiving/message.rb | 18 +++++++++++--- lib/rabbit/receiving/worker.rb | 12 +++++++++- rabbit_messaging.gemspec | 2 ++ spec/units/rabbit/publishing/message_spec.rb | 23 ++++++++++++++++++ spec/units/rabbit/receiving_spec.rb | 14 +++++++++++ 10 files changed, 118 insertions(+), 9 deletions(-) create mode 100644 lib/rabbit/compressor.rb diff --git a/Gemfile.lock b/Gemfile.lock index ba5a85b..0d32e78 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -11,6 +11,8 @@ PATH rabbit_messaging (1.7.0) bunny (~> 2.0) kicks + msgpack + zlib GEM remote: https://rubygems.org/ @@ -66,6 +68,7 @@ GEM logger (1.6.5) method_source (1.1.0) minitest (5.25.4) + msgpack (1.8.0) parallel (1.26.3) parser (3.3.7.1) ast (~> 2.4.1) @@ -158,6 +161,7 @@ GEM unicode-display_width (3.1.4) unicode-emoji (~> 4.0, >= 4.0.4) unicode-emoji (4.0.4) + zlib (3.2.3) PLATFORMS arm64-darwin diff --git a/lib/rabbit.rb b/lib/rabbit.rb index 36d0daa..e4473f7 100644 --- a/lib/rabbit.rb +++ b/lib/rabbit.rb @@ -4,6 +4,7 @@ require "rabbit/daemon" require "rabbit/publishing" require "rabbit/event_handler" +require "rabbit/compressor" require "rabbit/extensions/bunny/channel" @@ -178,7 +179,8 @@ def publish( realtime: false, headers: {}, message_id: nil, - custom_queue_name: nil + custom_queue_name: nil, + compress: false ) message = Publishing::Message.new( routing_key: routing_key, @@ -189,6 +191,7 @@ def publish( realtime: realtime, headers: headers, message_id: message_id, + compress: compress, ) job_class = config.publishing_job_class_callable publish_job_callable = job_class.is_a?(Proc) ? job_class.call : (job_class || Publishing::Job) diff --git a/lib/rabbit/compressor.rb b/lib/rabbit/compressor.rb new file mode 100644 index 0000000..5f6d584 --- /dev/null +++ b/lib/rabbit/compressor.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +require "msgpack" +require "zlib" + +module Rabbit::Compressor + Error = Class.new(StandardError) + UncompressingError = Class.new(Error) + + extend self + + def dump(data, msgpack_options: {}) + Zlib::Deflate.deflate(MessagePack.pack(data, msgpack_options)) + end + + def load(data, msgpack_options: {}) + return {} unless data + + MessagePack.unpack(Zlib::Inflate.inflate(data), msgpack_options) + rescue Zlib::Error, MessagePack::UnpackError => error + raise UncompressingError, "Unable to uncompress data, #{error}" + end +end diff --git a/lib/rabbit/publishing/message.rb b/lib/rabbit/publishing/message.rb index 6c867d7..59d2580 100644 --- a/lib/rabbit/publishing/message.rb +++ b/lib/rabbit/publishing/message.rb @@ -2,7 +2,7 @@ module Rabbit::Publishing class Message - attr_accessor :routing_key, :event, :data, + attr_accessor :routing_key, :event, :data, :compress, :confirm_select, :realtime, :headers, :message_id attr_reader :exchange_name @@ -17,7 +17,8 @@ def initialize( confirm_select: true, realtime: false, headers: {}, - message_id: nil + message_id: nil, + compress: false ) self.routing_key = routing_key self.event = event&.to_s @@ -27,6 +28,7 @@ def initialize( self.realtime = realtime self.headers = headers self.message_id = message_id + self.compress = compress end def to_hash @@ -34,7 +36,7 @@ def to_hash key = var.to_s.delete("@").to_sym value = instance_variable_get(var) hash[key] = value - end.merge(data: JSON.parse(data.to_json)) + end.merge(data: data_for_hash) end def to_s @@ -55,9 +57,10 @@ def basic_publish_args app_id: Rabbit.config.app_name, headers: headers, message_id: message_id, + compress: compress, } - [JSON.dump(data), real_exchange_name, routing_key.to_s, options] + [dumped_data, real_exchange_name, routing_key.to_s, options] end def exchange_name=(names) @@ -67,5 +70,19 @@ def exchange_name=(names) def real_exchange_name [Rabbit.config.group_id, Rabbit.config.project_id, *exchange_name].join(".") end + + private + + def dumped_data + return JSON.dump(data) unless compress + + Rabbit::Compressor.dump(data) + end + + def data_for_hash + return JSON.parse(data.to_json) unless compress + + Rabbit::Compressor.dump(data) + end end end diff --git a/lib/rabbit/receiving/job.rb b/lib/rabbit/receiving/job.rb index b07869e..81212e2 100644 --- a/lib/rabbit/receiving/job.rb +++ b/lib/rabbit/receiving/job.rb @@ -11,6 +11,7 @@ class Rabbit::Receiving::Job < ActiveJob::Base def perform(message, arguments) + # binding.pry message = Rabbit::Receiving::Message.build(message, arguments) handler = Rabbit::Receiving::HandlerResolver.handler_for(message) handler.new(message).call diff --git a/lib/rabbit/receiving/message.rb b/lib/rabbit/receiving/message.rb index 26dccff..750c6bd 100644 --- a/lib/rabbit/receiving/message.rb +++ b/lib/rabbit/receiving/message.rb @@ -4,7 +4,7 @@ module Rabbit::Receiving class Message - attr_accessor :group_id, :project_id, :message_id, + attr_accessor :group_id, :project_id, :message_id, :compress, :event, :arguments, :original_message attr_reader :data @@ -15,6 +15,7 @@ def self.build(message, arguments) group_id: group_id, project_id: project_id, event: arguments.fetch(:type), + compress: arguments.fetch(:compress, false), data: message, message_id: arguments.fetch(:message_id, nil), arguments: arguments, @@ -28,12 +29,14 @@ def initialize( event: nil, data: nil, arguments: nil, - original_message: nil + original_message: nil, + compress: false ) self.group_id = group_id self.project_id = project_id self.message_id = message_id self.event = event + self.compress = compress self.data = data unless data.nil? self.arguments = arguments self.original_message = original_message @@ -41,7 +44,7 @@ def initialize( def data=(value) self.original_message = value - parsed = JSON.parse(value).deep_symbolize_keys + parsed = parsed_data(value) @data = parsed rescue JSON::ParserError => error mark_as_malformed!("JSON::ParserError: #{error.message}") @@ -60,7 +63,16 @@ def attributes data: data, arguments: arguments, original_message: original_message, + compress: compress, } end + + private + + def parsed_data(value) + return JSON.parse(value).deep_symbolize_keys unless compress + + Rabbit::Compressor.load(value, msgpack_options: { symbolize_keys: true }) + end end end diff --git a/lib/rabbit/receiving/worker.rb b/lib/rabbit/receiving/worker.rb index 1741397..5d7b749 100644 --- a/lib/rabbit/receiving/worker.rb +++ b/lib/rabbit/receiving/worker.rb @@ -29,8 +29,10 @@ def work_with_params(message, delivery_info, arguments) end def receive_message(message, delivery_info, arguments) + compress = arguments.fetch(:compress, false) + Rabbit::Receiving::Receive.new( - message: message.dup.force_encoding("UTF-8"), + message: prepare_message_for_receiving(message.dup, compress), delivery_info: delivery_info, arguments: arguments, ).call @@ -49,4 +51,12 @@ def reinitialize_connection @queue.instance_variable_set(:@banny, nil) run end + + private + + def prepare_message_for_receiving(message, compress) + return message if compress + + message.force_encoding("UTF-8") + end end diff --git a/rabbit_messaging.gemspec b/rabbit_messaging.gemspec index 2a1212e..4d60416 100644 --- a/rabbit_messaging.gemspec +++ b/rabbit_messaging.gemspec @@ -21,4 +21,6 @@ Gem::Specification.new do |spec| spec.add_dependency "bunny", "~> 2.0" spec.add_dependency "kicks" + spec.add_dependency "msgpack" + spec.add_dependency "zlib" end diff --git a/spec/units/rabbit/publishing/message_spec.rb b/spec/units/rabbit/publishing/message_spec.rb index cf0e56b..415c0fe 100644 --- a/spec/units/rabbit/publishing/message_spec.rb +++ b/spec/units/rabbit/publishing/message_spec.rb @@ -41,11 +41,33 @@ type: "ping", content_type: "application/json", app_id: "test_group_id.test_project_id", + compress: false, headers: { "foo" => "bar" }, message_id: "super-uuid", } ] end + + context "when message should be compressed" do + let(:attributes) { super().merge(compress: true) } + let(:packed_data) { Zlib::Deflate.deflate(MessagePack.pack({ foo: :bar })) } + + its(:basic_publish_args) do + is_expected.to eq [ + packed_data, "test_group_id.test_project_id.fanout", "nah", + { + mandatory: true, + persistent: true, + type: "ping", + content_type: "application/json", + app_id: "test_group_id.test_project_id", + compress: true, + headers: { "foo" => "bar" }, + message_id: "super-uuid", + } + ] + end + end end context "IPAddr" do @@ -65,6 +87,7 @@ type: "update", content_type: "application/json", app_id: "test_group_id.test_project_id", + compress: false, headers: {}, message_id: nil, } diff --git a/spec/units/rabbit/receiving_spec.rb b/spec/units/rabbit/receiving_spec.rb index a9301ae..b29651b 100644 --- a/spec/units/rabbit/receiving_spec.rb +++ b/spec/units/rabbit/receiving_spec.rb @@ -135,6 +135,20 @@ def call; end run_receive end + context "message has been compressed" do + let(:arguments) { super().merge(compress: true) } + let(:message) { Zlib::Deflate.deflate(MessagePack.pack({ hello: "world", foo: "bar" })) } + + it "performs job successfully" do + expect(Rabbit.config.exception_notifier).not_to receive(:call) + + expect_job_queue_to_be_set + expect_some_handler_to_be_called + + run_receive + end + end + context "custom job configuration" do let(:job_configs) { Hash[some: :kek, pek: 123] } From 39885b71be9d47e88d699ee6000fe9ced9fe17d7 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Wed, 25 Mar 2026 10:36:55 +0300 Subject: [PATCH 02/13] f --- lib/rabbit/receiving/job.rb | 1 - spec/units/rabbit/receiving_spec.rb | 17 +++++++++++++++++ spec/units/rabbit_spec.rb | 2 ++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/lib/rabbit/receiving/job.rb b/lib/rabbit/receiving/job.rb index 81212e2..b07869e 100644 --- a/lib/rabbit/receiving/job.rb +++ b/lib/rabbit/receiving/job.rb @@ -11,7 +11,6 @@ class Rabbit::Receiving::Job < ActiveJob::Base def perform(message, arguments) - # binding.pry message = Rabbit::Receiving::Message.build(message, arguments) handler = Rabbit::Receiving::HandlerResolver.handler_for(message) handler.new(message).call diff --git a/spec/units/rabbit/receiving_spec.rb b/spec/units/rabbit/receiving_spec.rb index b29651b..352aa80 100644 --- a/spec/units/rabbit/receiving_spec.rb +++ b/spec/units/rabbit/receiving_spec.rb @@ -147,6 +147,23 @@ def call; end run_receive end + + context "when data has deep inheritance" do + let(:message) do + Zlib::Deflate.deflate(MessagePack.pack({ hello: "world", foo: { inherited: "bar" } })) + end + + it "performs job successfully" do + expect_job_queue_to_be_set + expect_any_instance_of(handler).to receive(:call) do |instance| + expect(instance.hello).to eq("world") + expect(instance.data).to eq(hello: "world", foo: { inherited: "bar" }) + expect(instance.message_info).to include(message_info) + end + + run_receive + end + end end context "custom job configuration" do diff --git a/spec/units/rabbit_spec.rb b/spec/units/rabbit_spec.rb index 40e1d14..06c5032 100644 --- a/spec/units/rabbit_spec.rb +++ b/spec/units/rabbit_spec.rb @@ -43,6 +43,7 @@ mandatory: true, persistent: true, type: "some_event", + compress: false, content_type: "application/json", app_id: "test_group_id.test_project_id", headers: { "foo" => "bar" }, @@ -61,6 +62,7 @@ data: { "hello" => "world" }, exchange_name: %w[some_exchange], confirm_select: true, + compress: false, realtime: realtime, headers: { "foo" => "bar" }, message_id: "uuid", From 4da4edcfcd8f0c50ba6ba80f7fa746ef987d71e4 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Wed, 25 Mar 2026 10:41:09 +0300 Subject: [PATCH 03/13] f --- CHANGELOG.md | 4 ++++ lib/rabbit/version.rb | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc7e8d7..5e40a30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All notable changes to this project will be documented in this file. +## [1.8.0] - 2026-03-25 +### Added +- Ability to compress data on publisher level and decompress on consumer + ## [1.7.0] - 2025-08-19 ### Added - Ability to specify a custom job class for publishing via `publishing_job_class_callable` config. diff --git a/lib/rabbit/version.rb b/lib/rabbit/version.rb index 4e7dd30..118303a 100644 --- a/lib/rabbit/version.rb +++ b/lib/rabbit/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Rabbit - VERSION = "1.7.0" + VERSION = "1.8.0" end From 2a31c6878f012fd3a31f538a9d1ac2d0670a2e50 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Wed, 25 Mar 2026 10:44:43 +0300 Subject: [PATCH 04/13] f --- Gemfile.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Gemfile.lock b/Gemfile.lock index 0d32e78..575a135 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -8,7 +8,7 @@ GIT PATH remote: . specs: - rabbit_messaging (1.7.0) + rabbit_messaging (1.8.0) bunny (~> 2.0) kicks msgpack From 80af7bf39890123aacbba9b94c131654507b3070 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Fri, 27 Mar 2026 15:45:44 +0300 Subject: [PATCH 05/13] f --- lib/rabbit/receiving/worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rabbit/receiving/worker.rb b/lib/rabbit/receiving/worker.rb index 5d7b749..8c2554a 100644 --- a/lib/rabbit/receiving/worker.rb +++ b/lib/rabbit/receiving/worker.rb @@ -55,7 +55,7 @@ def reinitialize_connection private def prepare_message_for_receiving(message, compress) - return message if compress + return message.b if compress message.force_encoding("UTF-8") end From a7c85902a8655745525e3f378e58f816ec4ab9c0 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Fri, 27 Mar 2026 17:39:11 +0300 Subject: [PATCH 06/13] f --- lib/rabbit/receiving/receive.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/rabbit/receiving/receive.rb b/lib/rabbit/receiving/receive.rb index 087e0ee..4fc6581 100644 --- a/lib/rabbit/receiving/receive.rb +++ b/lib/rabbit/receiving/receive.rb @@ -6,16 +6,17 @@ require "rabbit/helper" class Rabbit::Receiving::Receive - attr_accessor :message, :delivery_info, :arguments + attr_accessor :message, :delivery_info, :arguments, :compress def initialize(message: nil, delivery_info: nil, arguments: nil) self.message = message self.delivery_info = delivery_info self.arguments = arguments + self.compress = arguments.fetch(:compress, false) end def call - log! + log! unless compress call_hooks(before_hooks) process_message call_hooks(after_hooks) From 975cdf925ab5102c0234bfcf2732dbaaa49a0ed3 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Wed, 1 Apr 2026 12:40:12 +0300 Subject: [PATCH 07/13] f --- lib/rabbit.rb | 3 +-- lib/rabbit/publishing.rb | 5 +++-- lib/rabbit/publishing/message.rb | 16 +++++++++------- lib/rabbit/receiving/message.rb | 2 +- lib/rabbit/receiving/receive.rb | 2 +- lib/rabbit/receiving/worker.rb | 2 +- spec/units/rabbit/publishing/message_spec.rb | 15 +++++++-------- spec/units/rabbit/receiving_spec.rb | 7 +++++-- spec/units/rabbit_spec.rb | 18 ++++++++++-------- 9 files changed, 38 insertions(+), 32 deletions(-) diff --git a/lib/rabbit.rb b/lib/rabbit.rb index e4473f7..33c8be6 100644 --- a/lib/rabbit.rb +++ b/lib/rabbit.rb @@ -189,9 +189,8 @@ def publish( exchange_name: exchange_name, confirm_select: confirm_select, realtime: realtime, - headers: headers, + headers: headers.merge(compress: compress), message_id: message_id, - compress: compress, ) job_class = config.publishing_job_class_callable publish_job_callable = job_class.is_a?(Proc) ? job_class.call : (job_class || Publishing::Job) diff --git a/lib/rabbit/publishing.rb b/lib/rabbit/publishing.rb index 3007308..e9d13e7 100644 --- a/lib/rabbit/publishing.rb +++ b/lib/rabbit/publishing.rb @@ -66,8 +66,9 @@ def log(message) message.event, message.confirm_select? ? "confirm" : "no-confirm" ] - message_parts = JSON.dump(message.data) - .scan(/.{1,#{Rabbit.config.logger_message_size_limit}}/) + message_parts = message + .dumped_data + .scan(/.{1,#{Rabbit.config.logger_message_size_limit}}/) message_parts.each_with_index do |message_part, index| message = Rabbit::Helper.generate_message(message_part, message_parts.size, index) diff --git a/lib/rabbit/publishing/message.rb b/lib/rabbit/publishing/message.rb index 59d2580..e6ef6b5 100644 --- a/lib/rabbit/publishing/message.rb +++ b/lib/rabbit/publishing/message.rb @@ -17,8 +17,7 @@ def initialize( confirm_select: true, realtime: false, headers: {}, - message_id: nil, - compress: false + message_id: nil ) self.routing_key = routing_key self.event = event&.to_s @@ -28,12 +27,13 @@ def initialize( self.realtime = realtime self.headers = headers self.message_id = message_id - self.compress = compress + self.compress = headers.fetch(:compress, false) end def to_hash instance_variables.each_with_object({}) do |var, hash| key = var.to_s.delete("@").to_sym + next if key == :compress value = instance_variable_get(var) hash[key] = value end.merge(data: data_for_hash) @@ -57,8 +57,10 @@ def basic_publish_args app_id: Rabbit.config.app_name, headers: headers, message_id: message_id, - compress: compress, - } + }.tap do |ops| + ops[:content_encoding] = "gzip" if compress + ops[:headers] = ops[:headers].merge(compress: compress) + end [dumped_data, real_exchange_name, routing_key.to_s, options] end @@ -71,14 +73,14 @@ def real_exchange_name [Rabbit.config.group_id, Rabbit.config.project_id, *exchange_name].join(".") end - private - def dumped_data return JSON.dump(data) unless compress Rabbit::Compressor.dump(data) end + private + def data_for_hash return JSON.parse(data.to_json) unless compress diff --git a/lib/rabbit/receiving/message.rb b/lib/rabbit/receiving/message.rb index 750c6bd..79b400f 100644 --- a/lib/rabbit/receiving/message.rb +++ b/lib/rabbit/receiving/message.rb @@ -15,7 +15,7 @@ def self.build(message, arguments) group_id: group_id, project_id: project_id, event: arguments.fetch(:type), - compress: arguments.fetch(:compress, false), + compress: arguments.dig(:headers, :compress) || false, data: message, message_id: arguments.fetch(:message_id, nil), arguments: arguments, diff --git a/lib/rabbit/receiving/receive.rb b/lib/rabbit/receiving/receive.rb index 4fc6581..9919c73 100644 --- a/lib/rabbit/receiving/receive.rb +++ b/lib/rabbit/receiving/receive.rb @@ -12,7 +12,7 @@ def initialize(message: nil, delivery_info: nil, arguments: nil) self.message = message self.delivery_info = delivery_info self.arguments = arguments - self.compress = arguments.fetch(:compress, false) + self.compress = arguments.dig(:headers, :compress) || false end def call diff --git a/lib/rabbit/receiving/worker.rb b/lib/rabbit/receiving/worker.rb index 8c2554a..0f405f5 100644 --- a/lib/rabbit/receiving/worker.rb +++ b/lib/rabbit/receiving/worker.rb @@ -29,7 +29,7 @@ def work_with_params(message, delivery_info, arguments) end def receive_message(message, delivery_info, arguments) - compress = arguments.fetch(:compress, false) + compress = arguments.dig(:headers, :compress) || false Rabbit::Receiving::Receive.new( message: prepare_message_for_receiving(message.dup, compress), diff --git a/spec/units/rabbit/publishing/message_spec.rb b/spec/units/rabbit/publishing/message_spec.rb index 415c0fe..213a215 100644 --- a/spec/units/rabbit/publishing/message_spec.rb +++ b/spec/units/rabbit/publishing/message_spec.rb @@ -27,10 +27,11 @@ routing_key: :nah, data: { foo: :bar }, exchange_name: :fanout, - headers: { "foo" => "bar" }, + headers: headers, message_id: "super-uuid", } end + let(:headers) { { "foo" => "bar" } } its(:basic_publish_args) do is_expected.to eq [ @@ -41,15 +42,14 @@ type: "ping", content_type: "application/json", app_id: "test_group_id.test_project_id", - compress: false, - headers: { "foo" => "bar" }, + headers: { "foo" => "bar", compress: false }, message_id: "super-uuid", } ] end context "when message should be compressed" do - let(:attributes) { super().merge(compress: true) } + let(:headers) { super().merge({ compress: true }) } let(:packed_data) { Zlib::Deflate.deflate(MessagePack.pack({ foo: :bar })) } its(:basic_publish_args) do @@ -60,9 +60,9 @@ persistent: true, type: "ping", content_type: "application/json", + content_encoding: "gzip", app_id: "test_group_id.test_project_id", - compress: true, - headers: { "foo" => "bar" }, + headers: { "foo" => "bar", compress: true }, message_id: "super-uuid", } ] @@ -87,8 +87,7 @@ type: "update", content_type: "application/json", app_id: "test_group_id.test_project_id", - compress: false, - headers: {}, + headers: { compress: false }, message_id: nil, } ] diff --git a/spec/units/rabbit/receiving_spec.rb b/spec/units/rabbit/receiving_spec.rb index 352aa80..a389e4d 100644 --- a/spec/units/rabbit/receiving_spec.rb +++ b/spec/units/rabbit/receiving_spec.rb @@ -6,7 +6,9 @@ let(:worker) { Rabbit::Receiving::Worker.new } let(:message) { { hello: "world", foo: "bar" }.to_json } let(:delivery_info) { { exchange: "some exchange", routing_key: "some_key" } } - let(:arguments) { { type: event, app_id: "some_group.some_app", message_id: "uuid" } } + let(:arguments) do + { type: event, app_id: "some_group.some_app", message_id: "uuid", headers: headers } + end let(:event) { "some_successful_event" } let(:job_class) { Rabbit::Receiving::Job } let(:job_configs) { {} } @@ -15,6 +17,7 @@ let(:before_hook) { double("before hook") } let(:after_hook) { double("after hook") } let(:message_info) { arguments.merge(delivery_info.slice(:exchange, :routing_key)) } + let(:headers) { {} } def expect_job_queue_to_be_set expect(job_class).to receive(:set).with(queue: queue, **job_configs) @@ -136,7 +139,7 @@ def call; end end context "message has been compressed" do - let(:arguments) { super().merge(compress: true) } + let(:headers) { super().merge(compress: true) } let(:message) { Zlib::Deflate.deflate(MessagePack.pack({ hello: "world", foo: "bar" })) } it "performs job successfully" do diff --git a/spec/units/rabbit_spec.rb b/spec/units/rabbit_spec.rb index 06c5032..31ccdad 100644 --- a/spec/units/rabbit_spec.rb +++ b/spec/units/rabbit_spec.rb @@ -43,10 +43,9 @@ mandatory: true, persistent: true, type: "some_event", - compress: false, content_type: "application/json", app_id: "test_group_id.test_project_id", - headers: { "foo" => "bar" }, + headers: { "foo" => "bar", compress: false }, message_id: "uuid", ), ) @@ -62,9 +61,8 @@ data: { "hello" => "world" }, exchange_name: %w[some_exchange], confirm_select: true, - compress: false, realtime: realtime, - headers: { "foo" => "bar" }, + headers: { "foo" => "bar", compress: false }, message_id: "uuid", } expect_any_instance_of(ActiveJob::ConfiguredJob) @@ -74,14 +72,16 @@ expect(job_class).not_to receive(:perform_later) end + # rubocop:disable Layout/LineLength expect(publish_logger).to receive(:debug).with(<<~MSG.strip) - test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar"} / some_event / \ + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":false} / some_event / \ confirm: {"hello":"... MSG expect(publish_logger).to receive(:debug).with(<<~MSG.strip) - test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar"} / some_event / \ + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":false} / some_event / \ confirm: ...world"} MSG + # rubocop:enable Layout/LineLength described_class.publish(**message_options, **additional_params) end @@ -131,10 +131,12 @@ end expect(channel).to receive(:basic_publish).exactly(max_retries + 1).times + # rubocop:disable Layout/LineLength expect(publish_logger).to receive(:debug).with(<<~MSG.strip).once - test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar"} / some_event / \ - confirm: {"hello":"world"} + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":false} / some_event / \ + confirm: {"hello":"world"} MSG + # rubocop:enable Layout/LineLength expect { described_class.publish(**message_options) }.not_to raise_error end From f4bad61660daf33fdb038e533556290c7d6ad9d8 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Fri, 3 Apr 2026 13:16:30 +0300 Subject: [PATCH 08/13] f --- lib/rabbit.rb | 5 ++--- lib/rabbit/publishing/message.rb | 3 +-- lib/rabbit/receiving/receive.rb | 5 ++--- spec/units/rabbit/publishing/message_spec.rb | 10 +++++----- spec/units/rabbit_spec.rb | 6 +++--- 5 files changed, 13 insertions(+), 16 deletions(-) diff --git a/lib/rabbit.rb b/lib/rabbit.rb index 33c8be6..3847829 100644 --- a/lib/rabbit.rb +++ b/lib/rabbit.rb @@ -179,8 +179,7 @@ def publish( realtime: false, headers: {}, message_id: nil, - custom_queue_name: nil, - compress: false + custom_queue_name: nil ) message = Publishing::Message.new( routing_key: routing_key, @@ -189,7 +188,7 @@ def publish( exchange_name: exchange_name, confirm_select: confirm_select, realtime: realtime, - headers: headers.merge(compress: compress), + headers: headers, message_id: message_id, ) job_class = config.publishing_job_class_callable diff --git a/lib/rabbit/publishing/message.rb b/lib/rabbit/publishing/message.rb index e6ef6b5..0cdd7e6 100644 --- a/lib/rabbit/publishing/message.rb +++ b/lib/rabbit/publishing/message.rb @@ -27,7 +27,7 @@ def initialize( self.realtime = realtime self.headers = headers self.message_id = message_id - self.compress = headers.fetch(:compress, false) + self.compress = headers.with_indifferent_access.fetch(:compress, false) end def to_hash @@ -59,7 +59,6 @@ def basic_publish_args message_id: message_id, }.tap do |ops| ops[:content_encoding] = "gzip" if compress - ops[:headers] = ops[:headers].merge(compress: compress) end [dumped_data, real_exchange_name, routing_key.to_s, options] diff --git a/lib/rabbit/receiving/receive.rb b/lib/rabbit/receiving/receive.rb index 9919c73..087e0ee 100644 --- a/lib/rabbit/receiving/receive.rb +++ b/lib/rabbit/receiving/receive.rb @@ -6,17 +6,16 @@ require "rabbit/helper" class Rabbit::Receiving::Receive - attr_accessor :message, :delivery_info, :arguments, :compress + attr_accessor :message, :delivery_info, :arguments def initialize(message: nil, delivery_info: nil, arguments: nil) self.message = message self.delivery_info = delivery_info self.arguments = arguments - self.compress = arguments.dig(:headers, :compress) || false end def call - log! unless compress + log! call_hooks(before_hooks) process_message call_hooks(after_hooks) diff --git a/spec/units/rabbit/publishing/message_spec.rb b/spec/units/rabbit/publishing/message_spec.rb index 213a215..c6d1ef2 100644 --- a/spec/units/rabbit/publishing/message_spec.rb +++ b/spec/units/rabbit/publishing/message_spec.rb @@ -31,7 +31,7 @@ message_id: "super-uuid", } end - let(:headers) { { "foo" => "bar" } } + let(:headers) { { "foo" => "bar", "compress" => false } } its(:basic_publish_args) do is_expected.to eq [ @@ -42,14 +42,14 @@ type: "ping", content_type: "application/json", app_id: "test_group_id.test_project_id", - headers: { "foo" => "bar", compress: false }, + headers: { "foo" => "bar", "compress" => false }, message_id: "super-uuid", } ] end context "when message should be compressed" do - let(:headers) { super().merge({ compress: true }) } + let(:headers) { super().merge({ "compress" => true }) } let(:packed_data) { Zlib::Deflate.deflate(MessagePack.pack({ foo: :bar })) } its(:basic_publish_args) do @@ -62,7 +62,7 @@ content_type: "application/json", content_encoding: "gzip", app_id: "test_group_id.test_project_id", - headers: { "foo" => "bar", compress: true }, + headers: { "foo" => "bar", "compress" => true }, message_id: "super-uuid", } ] @@ -87,7 +87,7 @@ type: "update", content_type: "application/json", app_id: "test_group_id.test_project_id", - headers: { compress: false }, + headers: {}, message_id: nil, } ] diff --git a/spec/units/rabbit_spec.rb b/spec/units/rabbit_spec.rb index 31ccdad..d535fbe 100644 --- a/spec/units/rabbit_spec.rb +++ b/spec/units/rabbit_spec.rb @@ -8,7 +8,7 @@ event: "some_event", data: { hello: :world }, realtime: realtime, - headers: { "foo" => "bar" }, + headers: { "foo" => "bar", "compress" => false }, message_id: "uuid", } end @@ -45,7 +45,7 @@ type: "some_event", content_type: "application/json", app_id: "test_group_id.test_project_id", - headers: { "foo" => "bar", compress: false }, + headers: { "foo" => "bar", "compress" => false }, message_id: "uuid", ), ) @@ -62,7 +62,7 @@ exchange_name: %w[some_exchange], confirm_select: true, realtime: realtime, - headers: { "foo" => "bar", compress: false }, + headers: { "foo" => "bar", "compress" => false }, message_id: "uuid", } expect_any_instance_of(ActiveJob::ConfiguredJob) From 0fe0fd09975e222b0abd13c31c9e09b3ca40ede6 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Fri, 3 Apr 2026 15:39:36 +0300 Subject: [PATCH 09/13] f --- lib/rabbit/receiving/worker.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/rabbit/receiving/worker.rb b/lib/rabbit/receiving/worker.rb index 0f405f5..ef38c4c 100644 --- a/lib/rabbit/receiving/worker.rb +++ b/lib/rabbit/receiving/worker.rb @@ -29,6 +29,7 @@ def work_with_params(message, delivery_info, arguments) end def receive_message(message, delivery_info, arguments) + arguments = arguments.with_indifferent_access compress = arguments.dig(:headers, :compress) || false Rabbit::Receiving::Receive.new( From 3ebe0d31ea9d6a9f2532b7e49d0e5de590367dda Mon Sep 17 00:00:00 2001 From: JustSnow Date: Fri, 3 Apr 2026 16:08:28 +0300 Subject: [PATCH 10/13] f --- lib/rabbit/receiving/message.rb | 2 +- lib/rabbit/receiving/worker.rb | 3 +-- spec/units/rabbit/receiving_spec.rb | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/rabbit/receiving/message.rb b/lib/rabbit/receiving/message.rb index 79b400f..22ae08d 100644 --- a/lib/rabbit/receiving/message.rb +++ b/lib/rabbit/receiving/message.rb @@ -15,7 +15,7 @@ def self.build(message, arguments) group_id: group_id, project_id: project_id, event: arguments.fetch(:type), - compress: arguments.dig(:headers, :compress) || false, + compress: arguments.dig(:headers, "compress") || false, data: message, message_id: arguments.fetch(:message_id, nil), arguments: arguments, diff --git a/lib/rabbit/receiving/worker.rb b/lib/rabbit/receiving/worker.rb index ef38c4c..44419a6 100644 --- a/lib/rabbit/receiving/worker.rb +++ b/lib/rabbit/receiving/worker.rb @@ -29,8 +29,7 @@ def work_with_params(message, delivery_info, arguments) end def receive_message(message, delivery_info, arguments) - arguments = arguments.with_indifferent_access - compress = arguments.dig(:headers, :compress) || false + compress = arguments.dig(:headers, "compress") || false Rabbit::Receiving::Receive.new( message: prepare_message_for_receiving(message.dup, compress), diff --git a/spec/units/rabbit/receiving_spec.rb b/spec/units/rabbit/receiving_spec.rb index a389e4d..b1fc7fa 100644 --- a/spec/units/rabbit/receiving_spec.rb +++ b/spec/units/rabbit/receiving_spec.rb @@ -139,7 +139,7 @@ def call; end end context "message has been compressed" do - let(:headers) { super().merge(compress: true) } + let(:headers) { super().merge("compress" => true) } let(:message) { Zlib::Deflate.deflate(MessagePack.pack({ hello: "world", foo: "bar" })) } it "performs job successfully" do From e7da21ebcdd914025e5ef8586d8bcb17d5a05b58 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Tue, 7 Apr 2026 10:46:43 +0300 Subject: [PATCH 11/13] f --- lib/rabbit/receiving/message.rb | 3 ++- lib/rabbit/receiving/worker.rb | 3 ++- spec/units/rabbit/receiving_spec.rb | 12 ++++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/lib/rabbit/receiving/message.rb b/lib/rabbit/receiving/message.rb index 22ae08d..8353f6d 100644 --- a/lib/rabbit/receiving/message.rb +++ b/lib/rabbit/receiving/message.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "rabbit/receiving/malformed_message" +require "base64" module Rabbit::Receiving class Message @@ -72,7 +73,7 @@ def attributes def parsed_data(value) return JSON.parse(value).deep_symbolize_keys unless compress - Rabbit::Compressor.load(value, msgpack_options: { symbolize_keys: true }) + Rabbit::Compressor.load(Base64.decode64(value), msgpack_options: { symbolize_keys: true }) end end end diff --git a/lib/rabbit/receiving/worker.rb b/lib/rabbit/receiving/worker.rb index 44419a6..23aec44 100644 --- a/lib/rabbit/receiving/worker.rb +++ b/lib/rabbit/receiving/worker.rb @@ -4,6 +4,7 @@ require "rabbit" require "rabbit/receiving/receive" +require "base64" class Rabbit::Receiving::Worker include Sneakers::Worker @@ -55,7 +56,7 @@ def reinitialize_connection private def prepare_message_for_receiving(message, compress) - return message.b if compress + return Base64.strict_encode64(message.b) if compress message.force_encoding("UTF-8") end diff --git a/spec/units/rabbit/receiving_spec.rb b/spec/units/rabbit/receiving_spec.rb index b1fc7fa..00d8f31 100644 --- a/spec/units/rabbit/receiving_spec.rb +++ b/spec/units/rabbit/receiving_spec.rb @@ -18,6 +18,8 @@ let(:after_hook) { double("after hook") } let(:message_info) { arguments.merge(delivery_info.slice(:exchange, :routing_key)) } let(:headers) { {} } + let(:before_hook_args) { [message, message_info] } + let(:after_hook_args) { [message, message_info] } def expect_job_queue_to_be_set expect(job_class).to receive(:set).with(queue: queue, **job_configs) @@ -44,8 +46,8 @@ def expect_notification end def expect_hooks_to_be_called - expect(before_hook).to receive(:call).with(message, message_info) - expect(after_hook).to receive(:call).with(message, message_info) + expect(before_hook).to receive(:call).with(*before_hook_args) + expect(after_hook).to receive(:call).with(*after_hook_args) end before do @@ -58,8 +60,8 @@ def expect_hooks_to_be_called allow(job_class).to receive(:set).with(queue: queue, **job_configs).and_call_original - allow(before_hook).to receive(:call).with(message, message_info) - allow(after_hook).to receive(:call).with(message, message_info) + allow(before_hook).to receive(:call).with(*before_hook_args) + allow(after_hook).to receive(:call).with(*after_hook_args) handler.ignore_queue_conversion = conversion end @@ -141,6 +143,8 @@ def call; end context "message has been compressed" do let(:headers) { super().merge("compress" => true) } let(:message) { Zlib::Deflate.deflate(MessagePack.pack({ hello: "world", foo: "bar" })) } + let(:before_hook_args) { [Base64.strict_encode64(message), message_info] } + let(:after_hook_args) { [Base64.strict_encode64(message), message_info] } it "performs job successfully" do expect(Rabbit.config.exception_notifier).not_to receive(:call) From 8d95a63bf23923ba779df004cab5c6c09ceec1c0 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Wed, 8 Apr 2026 12:12:45 +0300 Subject: [PATCH 12/13] f --- lib/rabbit/compressor.rb | 15 +++- lib/rabbit/helper.rb | 16 ++-- lib/rabbit/publishing.rb | 6 +- lib/rabbit/publishing/message.rb | 7 +- lib/rabbit/receiving/message.rb | 2 +- spec/units/rabbit/publishing/message_spec.rb | 6 +- spec/units/rabbit/receiving_spec.rb | 2 +- spec/units/rabbit_spec.rb | 79 +++++++++++++++----- 8 files changed, 97 insertions(+), 36 deletions(-) diff --git a/lib/rabbit/compressor.rb b/lib/rabbit/compressor.rb index 5f6d584..44089ab 100644 --- a/lib/rabbit/compressor.rb +++ b/lib/rabbit/compressor.rb @@ -2,6 +2,7 @@ require "msgpack" require "zlib" +require "base64" module Rabbit::Compressor Error = Class.new(StandardError) @@ -9,13 +10,21 @@ module Rabbit::Compressor extend self - def dump(data, msgpack_options: {}) - Zlib::Deflate.deflate(MessagePack.pack(data, msgpack_options)) + delegate :decode64, :strict_encode64, to: Base64 + + def dump(data, msgpack_options: {}, with_base64: false) + dumped = Zlib::Deflate.deflate(MessagePack.pack(data, msgpack_options)) + + return dumped unless with_base64 + + strict_encode64(dumped) end - def load(data, msgpack_options: {}) + def load(data, msgpack_options: {}, with_base64: false) return {} unless data + data = decode64(data) if with_base64 + MessagePack.unpack(Zlib::Inflate.inflate(data), msgpack_options) rescue Zlib::Error, MessagePack::UnpackError => error raise UncompressingError, "Unable to uncompress data, #{error}" diff --git a/lib/rabbit/helper.rb b/lib/rabbit/helper.rb index 1bf6610..ca62bfa 100644 --- a/lib/rabbit/helper.rb +++ b/lib/rabbit/helper.rb @@ -2,16 +2,22 @@ module Rabbit module Helper - def self.generate_message(message_part, parts, index) + def self.generate_message(message_part, parts, index, compressed: false) if parts == 1 - message_part + format(message_part, compressed) elsif index.zero? - "#{message_part}..." + "#{format(message_part, compressed)}..." elsif index == parts - 1 - "...#{message_part}" + "...#{format(message_part, compressed)}" else - "...#{message_part}..." + "...#{format(message_part, compressed)}..." end end + + def self.format(message_part, compressed) + return message_part unless compressed + + "message part bytes #{message_part.bytesize}" + end end end diff --git a/lib/rabbit/publishing.rb b/lib/rabbit/publishing.rb index e9d13e7..0123e30 100644 --- a/lib/rabbit/publishing.rb +++ b/lib/rabbit/publishing.rb @@ -71,8 +71,10 @@ def log(message) .scan(/.{1,#{Rabbit.config.logger_message_size_limit}}/) message_parts.each_with_index do |message_part, index| - message = Rabbit::Helper.generate_message(message_part, message_parts.size, index) - @logger.debug "#{metadata.join ' / '}: #{message}" + formatted_message = Rabbit::Helper.generate_message( + message_part, message_parts.size, index, compressed: message.compress + ) + @logger.debug "#{metadata.join ' / '}: #{formatted_message}" end end diff --git a/lib/rabbit/publishing/message.rb b/lib/rabbit/publishing/message.rb index 0cdd7e6..1622f5b 100644 --- a/lib/rabbit/publishing/message.rb +++ b/lib/rabbit/publishing/message.rb @@ -74,8 +74,11 @@ def real_exchange_name def dumped_data return JSON.dump(data) unless compress + # NOTE: when compress true and realtime false it means data from job + # already has been compressed and encoded in base64 + return Rabbit::Compressor.dump(data) if realtime - Rabbit::Compressor.dump(data) + Rabbit::Compressor.decode64(data) end private @@ -83,7 +86,7 @@ def dumped_data def data_for_hash return JSON.parse(data.to_json) unless compress - Rabbit::Compressor.dump(data) + Rabbit::Compressor.dump(data, with_base64: true) end end end diff --git a/lib/rabbit/receiving/message.rb b/lib/rabbit/receiving/message.rb index 8353f6d..21ea3b0 100644 --- a/lib/rabbit/receiving/message.rb +++ b/lib/rabbit/receiving/message.rb @@ -73,7 +73,7 @@ def attributes def parsed_data(value) return JSON.parse(value).deep_symbolize_keys unless compress - Rabbit::Compressor.load(Base64.decode64(value), msgpack_options: { symbolize_keys: true }) + Rabbit::Compressor.load(value, msgpack_options: { symbolize_keys: true }, with_base64: true) end end end diff --git a/spec/units/rabbit/publishing/message_spec.rb b/spec/units/rabbit/publishing/message_spec.rb index c6d1ef2..e0e7cfc 100644 --- a/spec/units/rabbit/publishing/message_spec.rb +++ b/spec/units/rabbit/publishing/message_spec.rb @@ -25,13 +25,14 @@ { event: :ping, routing_key: :nah, - data: { foo: :bar }, + data: incoming_data, exchange_name: :fanout, headers: headers, message_id: "super-uuid", } end let(:headers) { { "foo" => "bar", "compress" => false } } + let(:incoming_data) { { foo: :bar } } its(:basic_publish_args) do is_expected.to eq [ @@ -50,7 +51,8 @@ context "when message should be compressed" do let(:headers) { super().merge({ "compress" => true }) } - let(:packed_data) { Zlib::Deflate.deflate(MessagePack.pack({ foo: :bar })) } + let(:packed_data) { Rabbit::Compressor.dump({ foo: :bar }) } + let(:incoming_data) { Rabbit::Compressor.dump({ foo: :bar }, with_base64: true) } its(:basic_publish_args) do is_expected.to eq [ diff --git a/spec/units/rabbit/receiving_spec.rb b/spec/units/rabbit/receiving_spec.rb index 00d8f31..32c4661 100644 --- a/spec/units/rabbit/receiving_spec.rb +++ b/spec/units/rabbit/receiving_spec.rb @@ -142,7 +142,7 @@ def call; end context "message has been compressed" do let(:headers) { super().merge("compress" => true) } - let(:message) { Zlib::Deflate.deflate(MessagePack.pack({ hello: "world", foo: "bar" })) } + let(:message) { Rabbit::Compressor.dump({ hello: "world", foo: "bar" }) } let(:before_hook_args) { [Base64.strict_encode64(message), message_info] } let(:after_hook_args) { [Base64.strict_encode64(message), message_info] } diff --git a/spec/units/rabbit_spec.rb b/spec/units/rabbit_spec.rb index d535fbe..ff22e14 100644 --- a/spec/units/rabbit_spec.rb +++ b/spec/units/rabbit_spec.rb @@ -6,13 +6,31 @@ exchange_name: "some_exchange", routing_key: "some_queue", event: "some_event", - data: { hello: :world }, + data: message_data, realtime: realtime, - headers: { "foo" => "bar", "compress" => false }, + headers: { "foo" => "bar", "compress" => compress }, message_id: "uuid", } end let(:additional_params) { {} } + let(:compress) { false } + let(:expected_data_for_publish_job) { { "hello" => "world" } } + let(:message_data) { { hello: :world } } + let(:basic_publish_data) { message_data.to_json } + let(:basic_publish_expected_args) do + { + mandatory: true, + persistent: true, + type: "some_event", + content_type: "application/json", + app_id: "test_group_id.test_project_id", + headers: { "foo" => "bar", "compress" => compress }, + message_id: "uuid", + } + end + let(:logger_first_part_message) { '{"hello":"...' } + let(:logger_second_part_message) { '...world"}' } + let(:logger_message_size_limit) { 10 } before do Rabbit.config.queue_name_conversion = -> (queue) { "#{queue}_prepared" } @@ -31,23 +49,17 @@ allow(channel).to receive(:open?).and_return(true) allow(Rabbit.config).to receive(:publish_logger) { publish_logger } - allow(Rabbit.config).to receive(:logger_message_size_limit).and_return(10) + allow(Rabbit.config).to \ + receive(:logger_message_size_limit) + .and_return(logger_message_size_limit) expect(channel).to receive(:confirm_select).once allow(channel).to receive(:wait_for_confirms).and_return(true) expect(channel).to receive(:basic_publish).with( - { hello: :world }.to_json, + basic_publish_data, "test_group_id.test_project_id.some_exchange", "some_queue", - match( - mandatory: true, - persistent: true, - type: "some_event", - content_type: "application/json", - app_id: "test_group_id.test_project_id", - headers: { "foo" => "bar", "compress" => false }, - message_id: "uuid", - ), + match(basic_publish_expected_args), ) end @@ -58,11 +70,11 @@ perform_params = { routing_key: "some_queue", event: "some_event", - data: { "hello" => "world" }, + data: expected_data_for_publish_job, exchange_name: %w[some_exchange], confirm_select: true, realtime: realtime, - headers: { "foo" => "bar", "compress" => false }, + headers: { "foo" => "bar", "compress" => compress }, message_id: "uuid", } expect_any_instance_of(ActiveJob::ConfiguredJob) @@ -74,12 +86,12 @@ # rubocop:disable Layout/LineLength expect(publish_logger).to receive(:debug).with(<<~MSG.strip) - test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":false} / some_event / \ - confirm: {"hello":"... + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ + confirm: #{logger_first_part_message} MSG expect(publish_logger).to receive(:debug).with(<<~MSG.strip) - test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":false} / some_event / \ - confirm: ...world"} + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ + confirm: #{logger_second_part_message} MSG # rubocop:enable Layout/LineLength described_class.publish(**message_options, **additional_params) @@ -133,7 +145,7 @@ expect(channel).to receive(:basic_publish).exactly(max_retries + 1).times # rubocop:disable Layout/LineLength expect(publish_logger).to receive(:debug).with(<<~MSG.strip).once - test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":false} / some_event / \ + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ confirm: {"hello":"world"} MSG # rubocop:enable Layout/LineLength @@ -214,6 +226,33 @@ include_examples "publishes" end + context "when data should be compressed" do + let(:realtime) { false } + let(:compress) { true } + let(:expect_to_use_job) { true } + let(:expected_queue) { "default_prepared" } + let(:job_class) { Rabbit::Publishing::Job } + let(:expected_data_for_publish_job) do + Rabbit::Compressor.dump({ "hello" => "world" }, with_base64: true) + end + let(:basic_publish_data) { Rabbit::Compressor.dump(message_data) } + let(:basic_publish_expected_args) do + super().merge(content_encoding: "gzip") + end + let(:logger_first_part_message) { "message part bytes 15..." } + let(:logger_second_part_message) { "...message part bytes 6" } + let(:logger_message_size_limit) { 15 } + + it_behaves_like "publishes" + + context "when data should be sent immediately" do + let(:realtime) { true } + let(:expect_to_use_job) { false } + + it_behaves_like "publishes" + end + end + describe "config" do describe "#read_queue" do specify { expect(Rabbit.config.read_queue).to eq("test_group_id.test_project_id") } From a7fe68119f3e72b5a50429055d591e0a098730c5 Mon Sep 17 00:00:00 2001 From: JustSnow Date: Fri, 10 Apr 2026 10:54:16 +0300 Subject: [PATCH 13/13] f --- lib/rabbit/helper.rb | 2 +- lib/rabbit/publishing.rb | 31 ++++++++++++++++++++------- lib/rabbit/receiving/message.rb | 1 - spec/units/rabbit_spec.rb | 38 ++++++++++++++++++--------------- 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/lib/rabbit/helper.rb b/lib/rabbit/helper.rb index ca62bfa..60d07fd 100644 --- a/lib/rabbit/helper.rb +++ b/lib/rabbit/helper.rb @@ -17,7 +17,7 @@ def self.generate_message(message_part, parts, index, compressed: false) def self.format(message_part, compressed) return message_part unless compressed - "message part bytes #{message_part.bytesize}" + "message bytes #{message_part.bytesize}" end end end diff --git a/lib/rabbit/publishing.rb b/lib/rabbit/publishing.rb index 0123e30..2fa41c7 100644 --- a/lib/rabbit/publishing.rb +++ b/lib/rabbit/publishing.rb @@ -66,20 +66,35 @@ def log(message) message.event, message.confirm_select? ? "confirm" : "no-confirm" ] - message_parts = message - .dumped_data - .scan(/.{1,#{Rabbit.config.logger_message_size_limit}}/) + return log_compressed(message.dumped_data, metadata: metadata) if message.compress + + log_by_parts(message, metadata: metadata) + end + + def reinitialize_channels_pool + MUTEX.synchronize { @pool = ChannelsPool.new(create_client) } + end + + def log_compressed(message_for_publish, metadata:) + formatted_message = Rabbit::Helper.generate_message( + message_for_publish, 1, 0, compressed: true + ) + + @logger.debug "#{metadata.join ' / '}: #{formatted_message}" + end + + def log_by_parts(message_for_publish, metadata:) + message_parts = + message_for_publish + .dumped_data + .scan(/.{1,#{Rabbit.config.logger_message_size_limit}}/) message_parts.each_with_index do |message_part, index| formatted_message = Rabbit::Helper.generate_message( - message_part, message_parts.size, index, compressed: message.compress + message_part, message_parts.size, index ) @logger.debug "#{metadata.join ' / '}: #{formatted_message}" end end - - def reinitialize_channels_pool - MUTEX.synchronize { @pool = ChannelsPool.new(create_client) } - end end end diff --git a/lib/rabbit/receiving/message.rb b/lib/rabbit/receiving/message.rb index 21ea3b0..4ce26b9 100644 --- a/lib/rabbit/receiving/message.rb +++ b/lib/rabbit/receiving/message.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require "rabbit/receiving/malformed_message" -require "base64" module Rabbit::Receiving class Message diff --git a/spec/units/rabbit_spec.rb b/spec/units/rabbit_spec.rb index ff22e14..cfc4992 100644 --- a/spec/units/rabbit_spec.rb +++ b/spec/units/rabbit_spec.rb @@ -28,9 +28,6 @@ message_id: "uuid", } end - let(:logger_first_part_message) { '{"hello":"...' } - let(:logger_second_part_message) { '...world"}' } - let(:logger_message_size_limit) { 10 } before do Rabbit.config.queue_name_conversion = -> (queue) { "#{queue}_prepared" } @@ -49,9 +46,7 @@ allow(channel).to receive(:open?).and_return(true) allow(Rabbit.config).to receive(:publish_logger) { publish_logger } - allow(Rabbit.config).to \ - receive(:logger_message_size_limit) - .and_return(logger_message_size_limit) + allow(Rabbit.config).to receive(:logger_message_size_limit).and_return(10) expect(channel).to receive(:confirm_select).once allow(channel).to receive(:wait_for_confirms).and_return(true) @@ -84,16 +79,20 @@ expect(job_class).not_to receive(:perform_later) end - # rubocop:disable Layout/LineLength - expect(publish_logger).to receive(:debug).with(<<~MSG.strip) + if compress + expect(publish_logger).to receive(:debug).with(expected_log_compressed_message) + else + # rubocop:disable Layout/LineLength + expect(publish_logger).to receive(:debug).with(<<~MSG.strip) test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ - confirm: #{logger_first_part_message} - MSG - expect(publish_logger).to receive(:debug).with(<<~MSG.strip) + confirm: {"hello":"... + MSG + expect(publish_logger).to receive(:debug).with(<<~MSG.strip) test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ - confirm: #{logger_second_part_message} - MSG - # rubocop:enable Layout/LineLength + confirm: ...world"} + MSG + # rubocop:enable Layout/LineLength + end described_class.publish(**message_options, **additional_params) end @@ -239,9 +238,14 @@ let(:basic_publish_expected_args) do super().merge(content_encoding: "gzip") end - let(:logger_first_part_message) { "message part bytes 15..." } - let(:logger_second_part_message) { "...message part bytes 6" } - let(:logger_message_size_limit) { 15 } + # rubocop:disable Layout/LineLength + let(:expected_log_compressed_message) do + <<~MSG.strip + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ + confirm: message bytes 21 + MSG + end + # rubocop:enable Layout/LineLength it_behaves_like "publishes"