Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions docs/producer.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,38 @@ Creates a new producer with type `Producer<Key, Value, HeaderKey, HeaderValue>`.

Options:

| Property | Type | Description |
| ----------------------- | --------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `producerId` | `bigint` | Producer ID. |
| `producerEpoch` | `number` | Producer epoch. |
| `idempotent` | `boolean` | Idempotency of the producer. Required for transactions. |
| `transactionalId` | `string` | Transactional ID for the producer. If not specified, a random UUID is generated. Required when using transactions to ensure the same ID is used across restarts. |
| `acks` | `number` | Acknowledgement to wait before returning.<br/><br/>Valid values are defined in the `ProduceAcks` enumeration. |
| `compression` | `string` | Compression algorithm to use before sending messages to the broker.<br/><br/>Valid values are: `snappy`, `lz4`, `gzip`, `zstd` |
| `partitioner` | `(message: MessageToProduce<Key, Value, HeaderKey, HeaderValue>) => number` | Partitioner to use to assign a partition to messages that lack it.<br/><br/>It is a function that receives a message and should return the partition number. |
| `autocreateTopics` | `boolean` | Whether to ask brokers to auto-create missing topics during produce requests. |
| `repeatOnStaleMetadata` | `boolean` | Whether to retry a produce operation when the system detects outdated topic or broker information.<br/><br/>Default is `true`. |
| `serializers` | `Serializers<Key, Value, HeaderKey, HeaderValue>` | Object that specifies which serialisers to use.<br/><br/>The object should only contain one or more of the `key`, `value`, `headerKey` and `headerValue` properties.<br/><br/>**Note:** Should not be provided when using a `registry`. |
| `beforeSerialization` | `BeforeSerializationHook<Key, Value, HeaderKey, HeaderValue>` | Hook function called before serialization of each message component (key, value, headers).<br/><br/>**Experimental:** Does not follow semver and may change in minor/patch releases.<br/><br/>**Note:** Should not be provided when using a `registry`. |
| `registry` | `AbstractSchemaRegistry<Key, Value, HeaderKey, HeaderValue>` | Schema registry instance for automatic serialization with schema management. See the [Confluent Schema Registry](./confluent-schema-registry.md) guide for details.<br/><br/>**Experimental:** Does not follow semver and may change in minor/patch releases.<br/><br/>**Note:** When provided, do not use `serializers` or `beforeSerialization`. |
| Property | Type | Description |
| ----------------------- | ---------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `producerId` | `bigint` | Producer ID. |
| `producerEpoch` | `number` | Producer epoch. |
| `idempotent` | `boolean` | Idempotency of the producer. Required for transactions. |
| `transactionalId` | `string` | Transactional ID for the producer. If not specified, a random UUID is generated. Required when using transactions to ensure the same ID is used across restarts. |
| `acks` | `number` | Acknowledgement to wait before returning.<br/><br/>Valid values are defined in the `ProduceAcks` enumeration. |
| `compression` | `string` | Compression algorithm to use before sending messages to the broker.<br/><br/>Valid values are: `snappy`, `lz4`, `gzip`, `zstd` |
| `partitioner` | `(message: MessageToProduce<Key, Value, HeaderKey, HeaderValue>, key: Buffer | undefined) => number` | Partitioner to use to assign a partition to messages that lack it.<br/><br/>It is a function that receives a message and a optional serialized key and should return the partition number. |
| `autocreateTopics` | `boolean` | Whether to ask brokers to auto-create missing topics during produce requests. |
| `repeatOnStaleMetadata` | `boolean` | Whether to retry a produce operation when the system detects outdated topic or broker information.<br/><br/>Default is `true`. |
| `serializers` | `Serializers<Key, Value, HeaderKey, HeaderValue>` | Object that specifies which serialisers to use.<br/><br/>The object should only contain one or more of the `key`, `value`, `headerKey` and `headerValue` properties.<br/><br/>**Note:** Should not be provided when using a `registry`. |
| `beforeSerialization` | `BeforeSerializationHook<Key, Value, HeaderKey, HeaderValue>` | Hook function called before serialization of each message component (key, value, headers).<br/><br/>**Experimental:** Does not follow semver and may change in minor/patch releases.<br/><br/>**Note:** Should not be provided when using a `registry`. |
| `registry` | `AbstractSchemaRegistry<Key, Value, HeaderKey, HeaderValue>` | Schema registry instance for automatic serialization with schema management. See the [Confluent Schema Registry](./confluent-schema-registry.md) guide for details.<br/><br/>**Experimental:** Does not follow semver and may change in minor/patch releases.<br/><br/>**Note:** When provided, do not use `serializers` or `beforeSerialization`. |

It also supports all the constructor options of `Base`.

The default partitioning behavior for keyed messages uses the library's own murmur2 normalization. If you need compatibility with Java/kafkajs partitioning, use `compatibilityPartitioner`:

```typescript
import { Producer, stringSerializers, compatibilityPartitioner } from '@platformatic/kafka'

const producer = new Producer({
clientId: 'my-producer',
bootstrapBrokers: ['localhost:9092'],
serializers: stringSerializers,
partitioner: compatibilityPartitioner
})
```

The `compatibilityPartitioner` implementation uses a Java-style `toPositive` step before partition normalization.

Notes: `zstd` is not available in node `v20`

## Basic Methods
Expand Down
1 change: 1 addition & 0 deletions src/clients/producer/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './options.ts'
export * from './partitioners.ts'
export * from './producer-stream.ts'
export * from './producer.ts'
export * from './types.ts'
18 changes: 18 additions & 0 deletions src/clients/producer/partitioners.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { murmur2 } from '../../protocol/murmur2.ts'
import { type MessageToProduce } from '../../protocol/records.ts'

const compatibilityMurmur2Mask = 0x7fffffff

export function defaultPartitioner<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderValue = Buffer> (
_: MessageToProduce<Key, Value, HeaderKey, HeaderValue>,
key?: Buffer | undefined
): number {
return Buffer.isBuffer(key) ? murmur2(key) >>> 0 : 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit unconvinced on this logic. The original code did not seem to handle anything but buffers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still does, key is just a buffer.
The problem is that not to break the existing contract for Partitioner I needed to wrap as a second argument.

}

export function compatibilityPartitioner<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderValue = Buffer> (
_: MessageToProduce<Key, Value, HeaderKey, HeaderValue>,
key?: Buffer | undefined
): number {
return Buffer.isBuffer(key) ? murmur2(key) & compatibilityMurmur2Mask : 0
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you 👍

}
6 changes: 3 additions & 3 deletions src/clients/producer/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {
} from '../../diagnostic.ts'
import { GenericError, type ProtocolError, UserError } from '../../errors.ts'
import { type Connection } from '../../network/connection.ts'
import { murmur2 } from '../../protocol/murmur2.ts'
import {
type CreateRecordsBatchOptions,
type Message,
Expand Down Expand Up @@ -73,6 +72,7 @@ import {
producerStreamOptionsValidator,
sendOptionsValidator
} from './options.ts'
import { defaultPartitioner } from './partitioners.ts'
import { ProducerStream } from './producer-stream.ts'
import { Transaction } from './transaction.ts'
import {
Expand Down Expand Up @@ -897,9 +897,9 @@ export class Producer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa

if (typeof message.partition !== 'number') {
if (partitioner) {
partition = partitioner(message)
partition = partitioner(message, key)
} else if (key) {
partition = murmur2(key) >>> 0
partition = defaultPartitioner(message, key)
} else {
// Use the roundrobin
partition = this.#partitionsRoundRobin.postIncrement(topic, 1, 0)
Expand Down
3 changes: 2 additions & 1 deletion src/clients/producer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ export interface ProduceResult {
}

export type Partitioner<Key, Value, HeaderKey, HeaderValue> = (
message: MessageToProduce<Key, Value, HeaderKey, HeaderValue>
message: MessageToProduce<Key, Value, HeaderKey, HeaderValue>,
key?: Buffer | undefined
) => number

export interface ProducerStreamReport {
Expand Down
42 changes: 42 additions & 0 deletions test/clients/producer/partitioners.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { strictEqual } from 'node:assert'
import { test } from 'node:test'
import { compatibilityPartitioner, murmur2, ProduceAcks, stringSerializers } from '../../../src/index.ts'
import { createProducer, createTopic } from '../../helpers.ts'

test('compatibilityPartitioner should match Java/KafkaJs positive-toPositive behavior', () => {
const message = { topic: 'topic', key: Buffer.from('partition-key') }
const expected = murmur2(message.key) & 0x7fffffff

strictEqual(compatibilityPartitioner(message, message.key), expected)
strictEqual(compatibilityPartitioner(message, message.key) > -1, true)
})

test('compatibilityPartitioner should normalize string keys using UTF-8 bytes', () => {
const message = { topic: 'topic', key: 'string-key' }

strictEqual(compatibilityPartitioner(message, Buffer.from(message.key)), murmur2(message.key) & 0x7fffffff)
})

test('compatibilityPartitioner should produce expected partition inside produce flow', async t => {
const producer = createProducer<string, string, string, string>(t, {
serializers: stringSerializers,
partitioner: compatibilityPartitioner
})

const testTopic = await createTopic(t, true, 4)
const key = 'compat-topic-key'
const expectedPartition = compatibilityPartitioner({ topic: testTopic, key }, Buffer.from(key)) % 4

const result = await producer.send({
messages: [
{
topic: testTopic,
key,
value: 'payload'
}
],
acks: ProduceAcks.LEADER
})

strictEqual(result.offsets![0].partition, expectedPartition)
})
Loading