Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.

## [1.8.0] - 2026-03-25
### Added
- Ability to compress data on publisher level and decompress on consumer

## [1.7.0] - 2025-08-19
### Added
- Ability to specify a custom job class for publishing via `publishing_job_class_callable` config.
Expand Down
6 changes: 5 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ GIT
PATH
remote: .
specs:
rabbit_messaging (1.7.0)
rabbit_messaging (1.8.0)
bunny (~> 2.0)
kicks
msgpack
zlib

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -66,6 +68,7 @@ GEM
logger (1.6.5)
method_source (1.1.0)
minitest (5.25.4)
msgpack (1.8.0)
parallel (1.26.3)
parser (3.3.7.1)
ast (~> 2.4.1)
Expand Down Expand Up @@ -158,6 +161,7 @@ GEM
unicode-display_width (3.1.4)
unicode-emoji (~> 4.0, >= 4.0.4)
unicode-emoji (4.0.4)
zlib (3.2.3)

PLATFORMS
arm64-darwin
Expand Down
1 change: 1 addition & 0 deletions lib/rabbit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "rabbit/daemon"
require "rabbit/publishing"
require "rabbit/event_handler"
require "rabbit/compressor"

require "rabbit/extensions/bunny/channel"

Expand Down
32 changes: 32 additions & 0 deletions lib/rabbit/compressor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

require "msgpack"
require "zlib"
require "base64"

module Rabbit::Compressor
Error = Class.new(StandardError)
UncompressingError = Class.new(Error)

extend self

delegate :decode64, :strict_encode64, to: Base64

def dump(data, msgpack_options: {}, with_base64: false)
dumped = Zlib::Deflate.deflate(MessagePack.pack(data, msgpack_options))

return dumped unless with_base64

strict_encode64(dumped)
end

def load(data, msgpack_options: {}, with_base64: false)
return {} unless data

data = decode64(data) if with_base64

MessagePack.unpack(Zlib::Inflate.inflate(data), msgpack_options)
rescue Zlib::Error, MessagePack::UnpackError => error
raise UncompressingError, "Unable to uncompress data, #{error}"
end
end
16 changes: 11 additions & 5 deletions lib/rabbit/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

module Rabbit
module Helper
def self.generate_message(message_part, parts, index)
def self.generate_message(message_part, parts, index, compressed: false)
if parts == 1
message_part
format(message_part, compressed)
elsif index.zero?
"#{message_part}..."
"#{format(message_part, compressed)}..."
elsif index == parts - 1
"...#{message_part}"
"...#{format(message_part, compressed)}"
else
"...#{message_part}..."
"...#{format(message_part, compressed)}..."
end
end

def self.format(message_part, compressed)
return message_part unless compressed

"message bytes #{message_part.bytesize}"
end
end
end
30 changes: 24 additions & 6 deletions lib/rabbit/publishing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,35 @@ def log(message)
message.event, message.confirm_select? ? "confirm" : "no-confirm"
]

message_parts = JSON.dump(message.data)
.scan(/.{1,#{Rabbit.config.logger_message_size_limit}}/)
return log_compressed(message.dumped_data, metadata: metadata) if message.compress

message_parts.each_with_index do |message_part, index|
message = Rabbit::Helper.generate_message(message_part, message_parts.size, index)
@logger.debug "#{metadata.join ' / '}: #{message}"
end
log_by_parts(message, metadata: metadata)
end

def reinitialize_channels_pool
MUTEX.synchronize { @pool = ChannelsPool.new(create_client) }
end

def log_compressed(message_for_publish, metadata:)
formatted_message = Rabbit::Helper.generate_message(
message_for_publish, 1, 0, compressed: true
)

@logger.debug "#{metadata.join ' / '}: #{formatted_message}"
end

def log_by_parts(message_for_publish, metadata:)
message_parts =
message_for_publish
.dumped_data
.scan(/.{1,#{Rabbit.config.logger_message_size_limit}}/)

message_parts.each_with_index do |message_part, index|
formatted_message = Rabbit::Helper.generate_message(
message_part, message_parts.size, index
)
@logger.debug "#{metadata.join ' / '}: #{formatted_message}"
end
end
end
end
29 changes: 25 additions & 4 deletions lib/rabbit/publishing/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Rabbit::Publishing
class Message
attr_accessor :routing_key, :event, :data,
attr_accessor :routing_key, :event, :data, :compress,
:confirm_select, :realtime, :headers, :message_id
attr_reader :exchange_name

Expand All @@ -27,14 +27,16 @@ def initialize(
self.realtime = realtime
self.headers = headers
self.message_id = message_id
self.compress = headers.with_indifferent_access.fetch(:compress, false)
end

def to_hash
instance_variables.each_with_object({}) do |var, hash|
key = var.to_s.delete("@").to_sym
next if key == :compress
value = instance_variable_get(var)
hash[key] = value
end.merge(data: JSON.parse(data.to_json))
end.merge(data: data_for_hash)
end

def to_s
Expand All @@ -55,9 +57,11 @@ def basic_publish_args
app_id: Rabbit.config.app_name,
headers: headers,
message_id: message_id,
}
}.tap do |ops|
ops[:content_encoding] = "gzip" if compress
end

[JSON.dump(data), real_exchange_name, routing_key.to_s, options]
[dumped_data, real_exchange_name, routing_key.to_s, options]
end

def exchange_name=(names)
Expand All @@ -67,5 +71,22 @@ def exchange_name=(names)
def real_exchange_name
[Rabbit.config.group_id, Rabbit.config.project_id, *exchange_name].join(".")
end

def dumped_data
return JSON.dump(data) unless compress
# NOTE: when compress true and realtime false it means data from job
# already has been compressed and encoded in base64
return Rabbit::Compressor.dump(data) if realtime

Rabbit::Compressor.decode64(data)
end

private

def data_for_hash
return JSON.parse(data.to_json) unless compress

Rabbit::Compressor.dump(data, with_base64: true)
end
end
end
18 changes: 15 additions & 3 deletions lib/rabbit/receiving/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

module Rabbit::Receiving
class Message
attr_accessor :group_id, :project_id, :message_id,
attr_accessor :group_id, :project_id, :message_id, :compress,
:event, :arguments, :original_message
attr_reader :data

Expand All @@ -15,6 +15,7 @@ def self.build(message, arguments)
group_id: group_id,
project_id: project_id,
event: arguments.fetch(:type),
compress: arguments.dig(:headers, "compress") || false,
data: message,
message_id: arguments.fetch(:message_id, nil),
arguments: arguments,
Expand All @@ -28,20 +29,22 @@ def initialize(
event: nil,
data: nil,
arguments: nil,
original_message: nil
original_message: nil,
compress: false
)
self.group_id = group_id
self.project_id = project_id
self.message_id = message_id
self.event = event
self.compress = compress
self.data = data unless data.nil?
self.arguments = arguments
self.original_message = original_message
end

def data=(value)
self.original_message = value
parsed = JSON.parse(value).deep_symbolize_keys
parsed = parsed_data(value)
@data = parsed
rescue JSON::ParserError => error
mark_as_malformed!("JSON::ParserError: #{error.message}")
Expand All @@ -60,7 +63,16 @@ def attributes
data: data,
arguments: arguments,
original_message: original_message,
compress: compress,
}
end

private

def parsed_data(value)
return JSON.parse(value).deep_symbolize_keys unless compress
Comment thread
tycooon marked this conversation as resolved.

Rabbit::Compressor.load(value, msgpack_options: { symbolize_keys: true }, with_base64: true)
end
end
end
13 changes: 12 additions & 1 deletion lib/rabbit/receiving/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

require "rabbit"
require "rabbit/receiving/receive"
require "base64"

class Rabbit::Receiving::Worker
include Sneakers::Worker
Expand All @@ -29,8 +30,10 @@ def work_with_params(message, delivery_info, arguments)
end

def receive_message(message, delivery_info, arguments)
compress = arguments.dig(:headers, "compress") || false

Rabbit::Receiving::Receive.new(
message: message.dup.force_encoding("UTF-8"),
message: prepare_message_for_receiving(message.dup, compress),
delivery_info: delivery_info,
arguments: arguments,
).call
Expand All @@ -49,4 +52,12 @@ def reinitialize_connection
@queue.instance_variable_set(:@banny, nil)
run
end

private

def prepare_message_for_receiving(message, compress)
return Base64.strict_encode64(message.b) if compress

message.force_encoding("UTF-8")
end
end
2 changes: 1 addition & 1 deletion lib/rabbit/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Rabbit
VERSION = "1.7.0"
VERSION = "1.8.0"
end
2 changes: 2 additions & 0 deletions rabbit_messaging.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ Gem::Specification.new do |spec|

spec.add_dependency "bunny", "~> 2.0"
spec.add_dependency "kicks"
spec.add_dependency "msgpack"
spec.add_dependency "zlib"
end
30 changes: 27 additions & 3 deletions spec/units/rabbit/publishing/message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
{
event: :ping,
routing_key: :nah,
data: { foo: :bar },
data: incoming_data,
exchange_name: :fanout,
headers: { "foo" => "bar" },
headers: headers,
message_id: "super-uuid",
}
end
let(:headers) { { "foo" => "bar", "compress" => false } }
let(:incoming_data) { { foo: :bar } }

its(:basic_publish_args) do
is_expected.to eq [
Expand All @@ -41,11 +43,33 @@
type: "ping",
content_type: "application/json",
app_id: "test_group_id.test_project_id",
headers: { "foo" => "bar" },
headers: { "foo" => "bar", "compress" => false },
message_id: "super-uuid",
}
]
end

context "when message should be compressed" do
let(:headers) { super().merge({ "compress" => true }) }
let(:packed_data) { Rabbit::Compressor.dump({ foo: :bar }) }
let(:incoming_data) { Rabbit::Compressor.dump({ foo: :bar }, with_base64: true) }

its(:basic_publish_args) do
is_expected.to eq [
packed_data, "test_group_id.test_project_id.fanout", "nah",
{
mandatory: true,
persistent: true,
type: "ping",
content_type: "application/json",
content_encoding: "gzip",
app_id: "test_group_id.test_project_id",
headers: { "foo" => "bar", "compress" => true },
message_id: "super-uuid",
}
]
end
end
end

context "IPAddr" do
Expand Down
Loading
Loading