diff --git a/public/service/message.go b/public/service/message.go index 1e4d87878..c60b16602 100644 --- a/public/service/message.go +++ b/public/service/message.go @@ -6,6 +6,7 @@ import ( "context" "errors" + "github.com/redpanda-data/benthos/v4/internal/batch" "github.com/redpanda-data/benthos/v4/internal/bloblang/mapping" "github.com/redpanda-data/benthos/v4/internal/bloblang/query" "github.com/redpanda-data/benthos/v4/internal/message" @@ -215,6 +216,32 @@ func (m *Message) WithContext(ctx context.Context) *Message { } } +// WithCollapsedCount returns a new message indicating that it is the result of +// collapsing count messages into one. The count is accumulated: if a message +// already has a collapsed count of 3 and WithCollapsedCount(2) is called, the +// result has a collapsed count of 4 (the existing count plus count-1 to avoid +// double-counting the message itself). The count parameter must be >= 1. +// +// This allows downstream components to know how many total messages were +// combined, which is important for accurate output metrics (e.g. output_sent). +// This is useful when implementing processors that combine multiple messages +// into one (such as archive). +func (m *Message) WithCollapsedCount(count int) *Message { + ctx := batch.CtxWithCollapsedCount(m.Context(), count) + return m.WithContext(ctx) +} + +// CollapsedCount returns the actual number of messages that were collapsed into +// the resulting message batch. This value could differ from len(batch) when +// processors that archive batched message parts have been applied. +func (b MessageBatch) CollapsedCount() int { + total := 0 + for _, m := range b { + total += batch.CtxCollapsedCount(m.Context()) + } + return total +} + // AsBytes returns the underlying byte array contents of a message or, if the // contents are a structured type, attempts to marshal the contents as a JSON // document and returns either the byte array result or an error. diff --git a/public/service/message_test.go b/public/service/message_test.go index 0a0b6a000..04dec7bbc 100644 --- a/public/service/message_test.go +++ b/public/service/message_test.go @@ -1083,3 +1083,25 @@ func TestSyncResponseBatched(t *testing.T) { assert.Equal(t, c, string(data)) } } + +func TestMessageWithCollapsedCount(t *testing.T) { + m1 := NewMessage([]byte("foo")) + + // Default collapsed count is 1 + b1 := MessageBatch{m1} + assert.Equal(t, 1, b1.CollapsedCount()) + + // Setting collapsed count to 3 + m2 := m1.WithCollapsedCount(3) + b2 := MessageBatch{m2} + assert.Equal(t, 3, b2.CollapsedCount()) + + // Chaining collapsed counts accumulates + m3 := m2.WithCollapsedCount(2) + b3 := MessageBatch{m3} + assert.Equal(t, 4, b3.CollapsedCount()) + + // Multiple messages in a batch sum their collapsed counts + bAll := MessageBatch{m1, m2, m3} + assert.Equal(t, 8, bAll.CollapsedCount()) +}