diff --git a/CHANGELOG.md b/CHANGELOG.md index dc7e8d7..5e40a30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog All notable changes to this project will be documented in this file. +## [1.8.0] - 2026-03-25 +### Added +- Ability to compress data on publisher level and decompress on consumer + ## [1.7.0] - 2025-08-19 ### Added - Ability to specify a custom job class for publishing via `publishing_job_class_callable` config. diff --git a/Gemfile.lock b/Gemfile.lock index ba5a85b..575a135 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -8,9 +8,11 @@ GIT PATH remote: . specs: - rabbit_messaging (1.7.0) + rabbit_messaging (1.8.0) bunny (~> 2.0) kicks + msgpack + zlib GEM remote: https://rubygems.org/ @@ -66,6 +68,7 @@ GEM logger (1.6.5) method_source (1.1.0) minitest (5.25.4) + msgpack (1.8.0) parallel (1.26.3) parser (3.3.7.1) ast (~> 2.4.1) @@ -158,6 +161,7 @@ GEM unicode-display_width (3.1.4) unicode-emoji (~> 4.0, >= 4.0.4) unicode-emoji (4.0.4) + zlib (3.2.3) PLATFORMS arm64-darwin diff --git a/lib/rabbit.rb b/lib/rabbit.rb index 36d0daa..3847829 100644 --- a/lib/rabbit.rb +++ b/lib/rabbit.rb @@ -4,6 +4,7 @@ require "rabbit/daemon" require "rabbit/publishing" require "rabbit/event_handler" +require "rabbit/compressor" require "rabbit/extensions/bunny/channel" diff --git a/lib/rabbit/compressor.rb b/lib/rabbit/compressor.rb new file mode 100644 index 0000000..44089ab --- /dev/null +++ b/lib/rabbit/compressor.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +require "msgpack" +require "zlib" +require "base64" + +module Rabbit::Compressor + Error = Class.new(StandardError) + UncompressingError = Class.new(Error) + + extend self + + delegate :decode64, :strict_encode64, to: Base64 + + def dump(data, msgpack_options: {}, with_base64: false) + dumped = Zlib::Deflate.deflate(MessagePack.pack(data, msgpack_options)) + + return dumped unless with_base64 + + strict_encode64(dumped) + end + + def load(data, msgpack_options: {}, with_base64: false) + return {} unless data + + data = decode64(data) if with_base64 + + MessagePack.unpack(Zlib::Inflate.inflate(data), msgpack_options) + rescue Zlib::Error, MessagePack::UnpackError => error + raise UncompressingError, "Unable to uncompress data, #{error}" + end +end diff --git a/lib/rabbit/helper.rb b/lib/rabbit/helper.rb index 1bf6610..60d07fd 100644 --- a/lib/rabbit/helper.rb +++ b/lib/rabbit/helper.rb @@ -2,16 +2,22 @@ module Rabbit module Helper - def self.generate_message(message_part, parts, index) + def self.generate_message(message_part, parts, index, compressed: false) if parts == 1 - message_part + format(message_part, compressed) elsif index.zero? - "#{message_part}..." + "#{format(message_part, compressed)}..." elsif index == parts - 1 - "...#{message_part}" + "...#{format(message_part, compressed)}" else - "...#{message_part}..." + "...#{format(message_part, compressed)}..." end end + + def self.format(message_part, compressed) + return message_part unless compressed + + "message bytes #{message_part.bytesize}" + end end end diff --git a/lib/rabbit/publishing.rb b/lib/rabbit/publishing.rb index 3007308..2fa41c7 100644 --- a/lib/rabbit/publishing.rb +++ b/lib/rabbit/publishing.rb @@ -66,17 +66,35 @@ def log(message) message.event, message.confirm_select? ? "confirm" : "no-confirm" ] - message_parts = JSON.dump(message.data) - .scan(/.{1,#{Rabbit.config.logger_message_size_limit}}/) + return log_compressed(message.dumped_data, metadata: metadata) if message.compress - message_parts.each_with_index do |message_part, index| - message = Rabbit::Helper.generate_message(message_part, message_parts.size, index) - @logger.debug "#{metadata.join ' / '}: #{message}" - end + log_by_parts(message, metadata: metadata) end def reinitialize_channels_pool MUTEX.synchronize { @pool = ChannelsPool.new(create_client) } end + + def log_compressed(message_for_publish, metadata:) + formatted_message = Rabbit::Helper.generate_message( + message_for_publish, 1, 0, compressed: true + ) + + @logger.debug "#{metadata.join ' / '}: #{formatted_message}" + end + + def log_by_parts(message_for_publish, metadata:) + message_parts = + message_for_publish + .dumped_data + .scan(/.{1,#{Rabbit.config.logger_message_size_limit}}/) + + message_parts.each_with_index do |message_part, index| + formatted_message = Rabbit::Helper.generate_message( + message_part, message_parts.size, index + ) + @logger.debug "#{metadata.join ' / '}: #{formatted_message}" + end + end end end diff --git a/lib/rabbit/publishing/message.rb b/lib/rabbit/publishing/message.rb index 6c867d7..1622f5b 100644 --- a/lib/rabbit/publishing/message.rb +++ b/lib/rabbit/publishing/message.rb @@ -2,7 +2,7 @@ module Rabbit::Publishing class Message - attr_accessor :routing_key, :event, :data, + attr_accessor :routing_key, :event, :data, :compress, :confirm_select, :realtime, :headers, :message_id attr_reader :exchange_name @@ -27,14 +27,16 @@ def initialize( self.realtime = realtime self.headers = headers self.message_id = message_id + self.compress = headers.with_indifferent_access.fetch(:compress, false) end def to_hash instance_variables.each_with_object({}) do |var, hash| key = var.to_s.delete("@").to_sym + next if key == :compress value = instance_variable_get(var) hash[key] = value - end.merge(data: JSON.parse(data.to_json)) + end.merge(data: data_for_hash) end def to_s @@ -55,9 +57,11 @@ def basic_publish_args app_id: Rabbit.config.app_name, headers: headers, message_id: message_id, - } + }.tap do |ops| + ops[:content_encoding] = "gzip" if compress + end - [JSON.dump(data), real_exchange_name, routing_key.to_s, options] + [dumped_data, real_exchange_name, routing_key.to_s, options] end def exchange_name=(names) @@ -67,5 +71,22 @@ def exchange_name=(names) def real_exchange_name [Rabbit.config.group_id, Rabbit.config.project_id, *exchange_name].join(".") end + + def dumped_data + return JSON.dump(data) unless compress + # NOTE: when compress true and realtime false it means data from job + # already has been compressed and encoded in base64 + return Rabbit::Compressor.dump(data) if realtime + + Rabbit::Compressor.decode64(data) + end + + private + + def data_for_hash + return JSON.parse(data.to_json) unless compress + + Rabbit::Compressor.dump(data, with_base64: true) + end end end diff --git a/lib/rabbit/receiving/message.rb b/lib/rabbit/receiving/message.rb index 26dccff..4ce26b9 100644 --- a/lib/rabbit/receiving/message.rb +++ b/lib/rabbit/receiving/message.rb @@ -4,7 +4,7 @@ module Rabbit::Receiving class Message - attr_accessor :group_id, :project_id, :message_id, + attr_accessor :group_id, :project_id, :message_id, :compress, :event, :arguments, :original_message attr_reader :data @@ -15,6 +15,7 @@ def self.build(message, arguments) group_id: group_id, project_id: project_id, event: arguments.fetch(:type), + compress: arguments.dig(:headers, "compress") || false, data: message, message_id: arguments.fetch(:message_id, nil), arguments: arguments, @@ -28,12 +29,14 @@ def initialize( event: nil, data: nil, arguments: nil, - original_message: nil + original_message: nil, + compress: false ) self.group_id = group_id self.project_id = project_id self.message_id = message_id self.event = event + self.compress = compress self.data = data unless data.nil? self.arguments = arguments self.original_message = original_message @@ -41,7 +44,7 @@ def initialize( def data=(value) self.original_message = value - parsed = JSON.parse(value).deep_symbolize_keys + parsed = parsed_data(value) @data = parsed rescue JSON::ParserError => error mark_as_malformed!("JSON::ParserError: #{error.message}") @@ -60,7 +63,16 @@ def attributes data: data, arguments: arguments, original_message: original_message, + compress: compress, } end + + private + + def parsed_data(value) + return JSON.parse(value).deep_symbolize_keys unless compress + + Rabbit::Compressor.load(value, msgpack_options: { symbolize_keys: true }, with_base64: true) + end end end diff --git a/lib/rabbit/receiving/worker.rb b/lib/rabbit/receiving/worker.rb index 1741397..23aec44 100644 --- a/lib/rabbit/receiving/worker.rb +++ b/lib/rabbit/receiving/worker.rb @@ -4,6 +4,7 @@ require "rabbit" require "rabbit/receiving/receive" +require "base64" class Rabbit::Receiving::Worker include Sneakers::Worker @@ -29,8 +30,10 @@ def work_with_params(message, delivery_info, arguments) end def receive_message(message, delivery_info, arguments) + compress = arguments.dig(:headers, "compress") || false + Rabbit::Receiving::Receive.new( - message: message.dup.force_encoding("UTF-8"), + message: prepare_message_for_receiving(message.dup, compress), delivery_info: delivery_info, arguments: arguments, ).call @@ -49,4 +52,12 @@ def reinitialize_connection @queue.instance_variable_set(:@banny, nil) run end + + private + + def prepare_message_for_receiving(message, compress) + return Base64.strict_encode64(message.b) if compress + + message.force_encoding("UTF-8") + end end diff --git a/lib/rabbit/version.rb b/lib/rabbit/version.rb index 4e7dd30..118303a 100644 --- a/lib/rabbit/version.rb +++ b/lib/rabbit/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Rabbit - VERSION = "1.7.0" + VERSION = "1.8.0" end diff --git a/rabbit_messaging.gemspec b/rabbit_messaging.gemspec index 2a1212e..4d60416 100644 --- a/rabbit_messaging.gemspec +++ b/rabbit_messaging.gemspec @@ -21,4 +21,6 @@ Gem::Specification.new do |spec| spec.add_dependency "bunny", "~> 2.0" spec.add_dependency "kicks" + spec.add_dependency "msgpack" + spec.add_dependency "zlib" end diff --git a/spec/units/rabbit/publishing/message_spec.rb b/spec/units/rabbit/publishing/message_spec.rb index cf0e56b..e0e7cfc 100644 --- a/spec/units/rabbit/publishing/message_spec.rb +++ b/spec/units/rabbit/publishing/message_spec.rb @@ -25,12 +25,14 @@ { event: :ping, routing_key: :nah, - data: { foo: :bar }, + data: incoming_data, exchange_name: :fanout, - headers: { "foo" => "bar" }, + headers: headers, message_id: "super-uuid", } end + let(:headers) { { "foo" => "bar", "compress" => false } } + let(:incoming_data) { { foo: :bar } } its(:basic_publish_args) do is_expected.to eq [ @@ -41,11 +43,33 @@ type: "ping", content_type: "application/json", app_id: "test_group_id.test_project_id", - headers: { "foo" => "bar" }, + headers: { "foo" => "bar", "compress" => false }, message_id: "super-uuid", } ] end + + context "when message should be compressed" do + let(:headers) { super().merge({ "compress" => true }) } + let(:packed_data) { Rabbit::Compressor.dump({ foo: :bar }) } + let(:incoming_data) { Rabbit::Compressor.dump({ foo: :bar }, with_base64: true) } + + its(:basic_publish_args) do + is_expected.to eq [ + packed_data, "test_group_id.test_project_id.fanout", "nah", + { + mandatory: true, + persistent: true, + type: "ping", + content_type: "application/json", + content_encoding: "gzip", + app_id: "test_group_id.test_project_id", + headers: { "foo" => "bar", "compress" => true }, + message_id: "super-uuid", + } + ] + end + end end context "IPAddr" do diff --git a/spec/units/rabbit/receiving_spec.rb b/spec/units/rabbit/receiving_spec.rb index a9301ae..32c4661 100644 --- a/spec/units/rabbit/receiving_spec.rb +++ b/spec/units/rabbit/receiving_spec.rb @@ -6,7 +6,9 @@ let(:worker) { Rabbit::Receiving::Worker.new } let(:message) { { hello: "world", foo: "bar" }.to_json } let(:delivery_info) { { exchange: "some exchange", routing_key: "some_key" } } - let(:arguments) { { type: event, app_id: "some_group.some_app", message_id: "uuid" } } + let(:arguments) do + { type: event, app_id: "some_group.some_app", message_id: "uuid", headers: headers } + end let(:event) { "some_successful_event" } let(:job_class) { Rabbit::Receiving::Job } let(:job_configs) { {} } @@ -15,6 +17,9 @@ let(:before_hook) { double("before hook") } let(:after_hook) { double("after hook") } let(:message_info) { arguments.merge(delivery_info.slice(:exchange, :routing_key)) } + let(:headers) { {} } + let(:before_hook_args) { [message, message_info] } + let(:after_hook_args) { [message, message_info] } def expect_job_queue_to_be_set expect(job_class).to receive(:set).with(queue: queue, **job_configs) @@ -41,8 +46,8 @@ def expect_notification end def expect_hooks_to_be_called - expect(before_hook).to receive(:call).with(message, message_info) - expect(after_hook).to receive(:call).with(message, message_info) + expect(before_hook).to receive(:call).with(*before_hook_args) + expect(after_hook).to receive(:call).with(*after_hook_args) end before do @@ -55,8 +60,8 @@ def expect_hooks_to_be_called allow(job_class).to receive(:set).with(queue: queue, **job_configs).and_call_original - allow(before_hook).to receive(:call).with(message, message_info) - allow(after_hook).to receive(:call).with(message, message_info) + allow(before_hook).to receive(:call).with(*before_hook_args) + allow(after_hook).to receive(:call).with(*after_hook_args) handler.ignore_queue_conversion = conversion end @@ -135,6 +140,39 @@ def call; end run_receive end + context "message has been compressed" do + let(:headers) { super().merge("compress" => true) } + let(:message) { Rabbit::Compressor.dump({ hello: "world", foo: "bar" }) } + let(:before_hook_args) { [Base64.strict_encode64(message), message_info] } + let(:after_hook_args) { [Base64.strict_encode64(message), message_info] } + + it "performs job successfully" do + expect(Rabbit.config.exception_notifier).not_to receive(:call) + + expect_job_queue_to_be_set + expect_some_handler_to_be_called + + run_receive + end + + context "when data has deep inheritance" do + let(:message) do + Zlib::Deflate.deflate(MessagePack.pack({ hello: "world", foo: { inherited: "bar" } })) + end + + it "performs job successfully" do + expect_job_queue_to_be_set + expect_any_instance_of(handler).to receive(:call) do |instance| + expect(instance.hello).to eq("world") + expect(instance.data).to eq(hello: "world", foo: { inherited: "bar" }) + expect(instance.message_info).to include(message_info) + end + + run_receive + end + end + end + context "custom job configuration" do let(:job_configs) { Hash[some: :kek, pek: 123] } diff --git a/spec/units/rabbit_spec.rb b/spec/units/rabbit_spec.rb index 40e1d14..cfc4992 100644 --- a/spec/units/rabbit_spec.rb +++ b/spec/units/rabbit_spec.rb @@ -6,13 +6,28 @@ exchange_name: "some_exchange", routing_key: "some_queue", event: "some_event", - data: { hello: :world }, + data: message_data, realtime: realtime, - headers: { "foo" => "bar" }, + headers: { "foo" => "bar", "compress" => compress }, message_id: "uuid", } end let(:additional_params) { {} } + let(:compress) { false } + let(:expected_data_for_publish_job) { { "hello" => "world" } } + let(:message_data) { { hello: :world } } + let(:basic_publish_data) { message_data.to_json } + let(:basic_publish_expected_args) do + { + mandatory: true, + persistent: true, + type: "some_event", + content_type: "application/json", + app_id: "test_group_id.test_project_id", + headers: { "foo" => "bar", "compress" => compress }, + message_id: "uuid", + } + end before do Rabbit.config.queue_name_conversion = -> (queue) { "#{queue}_prepared" } @@ -36,18 +51,10 @@ expect(channel).to receive(:confirm_select).once allow(channel).to receive(:wait_for_confirms).and_return(true) expect(channel).to receive(:basic_publish).with( - { hello: :world }.to_json, + basic_publish_data, "test_group_id.test_project_id.some_exchange", "some_queue", - match( - mandatory: true, - persistent: true, - type: "some_event", - content_type: "application/json", - app_id: "test_group_id.test_project_id", - headers: { "foo" => "bar" }, - message_id: "uuid", - ), + match(basic_publish_expected_args), ) end @@ -58,11 +65,11 @@ perform_params = { routing_key: "some_queue", event: "some_event", - data: { "hello" => "world" }, + data: expected_data_for_publish_job, exchange_name: %w[some_exchange], confirm_select: true, realtime: realtime, - headers: { "foo" => "bar" }, + headers: { "foo" => "bar", "compress" => compress }, message_id: "uuid", } expect_any_instance_of(ActiveJob::ConfiguredJob) @@ -72,14 +79,20 @@ expect(job_class).not_to receive(:perform_later) end - expect(publish_logger).to receive(:debug).with(<<~MSG.strip) - test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar"} / some_event / \ + if compress + expect(publish_logger).to receive(:debug).with(expected_log_compressed_message) + else + # rubocop:disable Layout/LineLength + expect(publish_logger).to receive(:debug).with(<<~MSG.strip) + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ confirm: {"hello":"... - MSG - expect(publish_logger).to receive(:debug).with(<<~MSG.strip) - test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar"} / some_event / \ + MSG + expect(publish_logger).to receive(:debug).with(<<~MSG.strip) + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ confirm: ...world"} - MSG + MSG + # rubocop:enable Layout/LineLength + end described_class.publish(**message_options, **additional_params) end @@ -129,10 +142,12 @@ end expect(channel).to receive(:basic_publish).exactly(max_retries + 1).times + # rubocop:disable Layout/LineLength expect(publish_logger).to receive(:debug).with(<<~MSG.strip).once - test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar"} / some_event / \ - confirm: {"hello":"world"} + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ + confirm: {"hello":"world"} MSG + # rubocop:enable Layout/LineLength expect { described_class.publish(**message_options) }.not_to raise_error end @@ -210,6 +225,38 @@ include_examples "publishes" end + context "when data should be compressed" do + let(:realtime) { false } + let(:compress) { true } + let(:expect_to_use_job) { true } + let(:expected_queue) { "default_prepared" } + let(:job_class) { Rabbit::Publishing::Job } + let(:expected_data_for_publish_job) do + Rabbit::Compressor.dump({ "hello" => "world" }, with_base64: true) + end + let(:basic_publish_data) { Rabbit::Compressor.dump(message_data) } + let(:basic_publish_expected_args) do + super().merge(content_encoding: "gzip") + end + # rubocop:disable Layout/LineLength + let(:expected_log_compressed_message) do + <<~MSG.strip + test_group_id.test_project_id.some_exchange / some_queue / {"foo":"bar","compress":#{compress}} / some_event / \ + confirm: message bytes 21 + MSG + end + # rubocop:enable Layout/LineLength + + it_behaves_like "publishes" + + context "when data should be sent immediately" do + let(:realtime) { true } + let(:expect_to_use_job) { false } + + it_behaves_like "publishes" + end + end + describe "config" do describe "#read_queue" do specify { expect(Rabbit.config.read_queue).to eq("test_group_id.test_project_id") }