diff --git a/docs/producer.md b/docs/producer.md index 2d612c1..fdc7d5d 100644 --- a/docs/producer.md +++ b/docs/producer.md @@ -14,23 +14,38 @@ Creates a new producer with type `Producer`. Options: -| Property | Type | Description | -| ----------------------- | --------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `producerId` | `bigint` | Producer ID. | -| `producerEpoch` | `number` | Producer epoch. | -| `idempotent` | `boolean` | Idempotency of the producer. Required for transactions. | -| `transactionalId` | `string` | Transactional ID for the producer. If not specified, a random UUID is generated. Required when using transactions to ensure the same ID is used across restarts. | -| `acks` | `number` | Acknowledgement to wait before returning.

Valid values are defined in the `ProduceAcks` enumeration. | -| `compression` | `string` | Compression algorithm to use before sending messages to the broker.

Valid values are: `snappy`, `lz4`, `gzip`, `zstd` | -| `partitioner` | `(message: MessageToProduce) => number` | Partitioner to use to assign a partition to messages that lack it.

It is a function that receives a message and should return the partition number. | -| `autocreateTopics` | `boolean` | Whether to ask brokers to auto-create missing topics during produce requests. | -| `repeatOnStaleMetadata` | `boolean` | Whether to retry a produce operation when the system detects outdated topic or broker information.

Default is `true`. | -| `serializers` | `Serializers` | Object that specifies which serialisers to use.

The object should only contain one or more of the `key`, `value`, `headerKey` and `headerValue` properties.

**Note:** Should not be provided when using a `registry`. | -| `beforeSerialization` | `BeforeSerializationHook` | Hook function called before serialization of each message component (key, value, headers).

**Experimental:** Does not follow semver and may change in minor/patch releases.

**Note:** Should not be provided when using a `registry`. | -| `registry` | `AbstractSchemaRegistry` | Schema registry instance for automatic serialization with schema management. See the [Confluent Schema Registry](./confluent-schema-registry.md) guide for details.

**Experimental:** Does not follow semver and may change in minor/patch releases.

**Note:** When provided, do not use `serializers` or `beforeSerialization`. | +| Property | Type | Description | +| ----------------------- | ---------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `producerId` | `bigint` | Producer ID. | +| `producerEpoch` | `number` | Producer epoch. | +| `idempotent` | `boolean` | Idempotency of the producer. Required for transactions. | +| `transactionalId` | `string` | Transactional ID for the producer. If not specified, a random UUID is generated. Required when using transactions to ensure the same ID is used across restarts. | +| `acks` | `number` | Acknowledgement to wait before returning.

Valid values are defined in the `ProduceAcks` enumeration. | +| `compression` | `string` | Compression algorithm to use before sending messages to the broker.

Valid values are: `snappy`, `lz4`, `gzip`, `zstd` | +| `partitioner` | `(message: MessageToProduce, key: Buffer | undefined) => number` | Partitioner to use to assign a partition to messages that lack it.

It is a function that receives a message and a optional serialized key and should return the partition number. | +| `autocreateTopics` | `boolean` | Whether to ask brokers to auto-create missing topics during produce requests. | +| `repeatOnStaleMetadata` | `boolean` | Whether to retry a produce operation when the system detects outdated topic or broker information.

Default is `true`. | +| `serializers` | `Serializers` | Object that specifies which serialisers to use.

The object should only contain one or more of the `key`, `value`, `headerKey` and `headerValue` properties.

**Note:** Should not be provided when using a `registry`. | +| `beforeSerialization` | `BeforeSerializationHook` | Hook function called before serialization of each message component (key, value, headers).

**Experimental:** Does not follow semver and may change in minor/patch releases.

**Note:** Should not be provided when using a `registry`. | +| `registry` | `AbstractSchemaRegistry` | Schema registry instance for automatic serialization with schema management. See the [Confluent Schema Registry](./confluent-schema-registry.md) guide for details.

**Experimental:** Does not follow semver and may change in minor/patch releases.

**Note:** When provided, do not use `serializers` or `beforeSerialization`. | It also supports all the constructor options of `Base`. +The default partitioning behavior for keyed messages uses the library's own murmur2 normalization. If you need compatibility with Java/kafkajs partitioning, use `compatibilityPartitioner`: + +```typescript +import { Producer, stringSerializers, compatibilityPartitioner } from '@platformatic/kafka' + +const producer = new Producer({ + clientId: 'my-producer', + bootstrapBrokers: ['localhost:9092'], + serializers: stringSerializers, + partitioner: compatibilityPartitioner +}) +``` + +The `compatibilityPartitioner` implementation uses a Java-style `toPositive` step before partition normalization. + Notes: `zstd` is not available in node `v20` ## Basic Methods diff --git a/src/clients/producer/index.ts b/src/clients/producer/index.ts index 42c77f8..2aa721c 100644 --- a/src/clients/producer/index.ts +++ b/src/clients/producer/index.ts @@ -1,4 +1,5 @@ export * from './options.ts' +export * from './partitioners.ts' export * from './producer-stream.ts' export * from './producer.ts' export * from './types.ts' diff --git a/src/clients/producer/partitioners.ts b/src/clients/producer/partitioners.ts new file mode 100644 index 0000000..b06fb03 --- /dev/null +++ b/src/clients/producer/partitioners.ts @@ -0,0 +1,18 @@ +import { murmur2 } from '../../protocol/murmur2.ts' +import { type MessageToProduce } from '../../protocol/records.ts' + +const compatibilityMurmur2Mask = 0x7fffffff + +export function defaultPartitioner ( + _: MessageToProduce, + key?: Buffer | undefined +): number { + return Buffer.isBuffer(key) ? murmur2(key) >>> 0 : 0 +} + +export function compatibilityPartitioner ( + _: MessageToProduce, + key?: Buffer | undefined +): number { + return Buffer.isBuffer(key) ? murmur2(key) & compatibilityMurmur2Mask : 0 +} diff --git a/src/clients/producer/producer.ts b/src/clients/producer/producer.ts index e7d6b46..db5a423 100644 --- a/src/clients/producer/producer.ts +++ b/src/clients/producer/producer.ts @@ -24,7 +24,6 @@ import { } from '../../diagnostic.ts' import { GenericError, type ProtocolError, UserError } from '../../errors.ts' import { type Connection } from '../../network/connection.ts' -import { murmur2 } from '../../protocol/murmur2.ts' import { type CreateRecordsBatchOptions, type Message, @@ -73,6 +72,7 @@ import { producerStreamOptionsValidator, sendOptionsValidator } from './options.ts' +import { defaultPartitioner } from './partitioners.ts' import { ProducerStream } from './producer-stream.ts' import { Transaction } from './transaction.ts' import { @@ -897,9 +897,9 @@ export class Producer>> 0 + partition = defaultPartitioner(message, key) } else { // Use the roundrobin partition = this.#partitionsRoundRobin.postIncrement(topic, 1, 0) diff --git a/src/clients/producer/types.ts b/src/clients/producer/types.ts index 81437c3..e8f0c29 100644 --- a/src/clients/producer/types.ts +++ b/src/clients/producer/types.ts @@ -18,7 +18,8 @@ export interface ProduceResult { } export type Partitioner = ( - message: MessageToProduce + message: MessageToProduce, + key?: Buffer | undefined ) => number export interface ProducerStreamReport { diff --git a/test/clients/producer/partitioners.test.ts b/test/clients/producer/partitioners.test.ts new file mode 100644 index 0000000..f3b7e74 --- /dev/null +++ b/test/clients/producer/partitioners.test.ts @@ -0,0 +1,42 @@ +import { strictEqual } from 'node:assert' +import { test } from 'node:test' +import { compatibilityPartitioner, murmur2, ProduceAcks, stringSerializers } from '../../../src/index.ts' +import { createProducer, createTopic } from '../../helpers.ts' + +test('compatibilityPartitioner should match Java/KafkaJs positive-toPositive behavior', () => { + const message = { topic: 'topic', key: Buffer.from('partition-key') } + const expected = murmur2(message.key) & 0x7fffffff + + strictEqual(compatibilityPartitioner(message, message.key), expected) + strictEqual(compatibilityPartitioner(message, message.key) > -1, true) +}) + +test('compatibilityPartitioner should normalize string keys using UTF-8 bytes', () => { + const message = { topic: 'topic', key: 'string-key' } + + strictEqual(compatibilityPartitioner(message, Buffer.from(message.key)), murmur2(message.key) & 0x7fffffff) +}) + +test('compatibilityPartitioner should produce expected partition inside produce flow', async t => { + const producer = createProducer(t, { + serializers: stringSerializers, + partitioner: compatibilityPartitioner + }) + + const testTopic = await createTopic(t, true, 4) + const key = 'compat-topic-key' + const expectedPartition = compatibilityPartitioner({ topic: testTopic, key }, Buffer.from(key)) % 4 + + const result = await producer.send({ + messages: [ + { + topic: testTopic, + key, + value: 'payload' + } + ], + acks: ProduceAcks.LEADER + }) + + strictEqual(result.offsets![0].partition, expectedPartition) +})