Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/producer.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Options:

It also supports all the constructor options of `Base`.

When `idempotent` is enabled, `retries` defaults to `Number.MAX_SAFE_INTEGER` when not set, and it is raised to `Number.MAX_SAFE_INTEGER` for values less than or equal to `1` to avoid unsafe idempotent retry behavior.

The default partitioning behavior for keyed messages uses the library's own murmur2 normalization. If you need compatibility with Java/kafkajs partitioning, use `compatibilityPartitioner`:

```typescript
Expand Down
4 changes: 3 additions & 1 deletion src/clients/producer/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ export class Producer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
if (options.idempotent) {
options.maxInflights = 1
options.acks = ProduceAcks.ALL
options.retries = Number.MAX_SAFE_INTEGER
if (options.retries === undefined || (typeof options.retries === 'number' && options.retries <= 1)) {
options.retries = Number.MAX_SAFE_INTEGER
}
} else {
options.idempotent = false
}
Expand Down
27 changes: 26 additions & 1 deletion test/clients/producer/producer.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { deepStrictEqual, ok, rejects, strictEqual } from 'node:assert'
import { test } from 'node:test'
import * as Prometheus from 'prom-client'
import { kConnections } from '../../../src/clients/base/base.ts'
import { kConnections, kOptions } from '../../../src/clients/base/base.ts'
import {
baseMetadataChannel,
type ClientDiagnosticEvent,
Expand Down Expand Up @@ -143,6 +143,31 @@ test('constructor should validat other options even when not in strict mode', ()
}
})

test('constructor should keep custom retries when idempotent is enabled', t => {
const producer = createProducer(t, {
idempotent: true,
retries: 7
})

strictEqual((producer[kOptions] as { retries: number }).retries, 7)
})

test('constructor should default retries to safe value only when not set', t => {
const producer = createProducer(t, { idempotent: true })

strictEqual((producer[kOptions] as { retries: number }).retries, Number.MAX_SAFE_INTEGER)
})

test('constructor should set retries to safe value when idempotent and too low', t => {
const producerZero = createProducer(t, { idempotent: true, retries: 0 })
const producerOne = createProducer(t, { idempotent: true, retries: 1 })
const producerNone = createProducer(t, { idempotent: true })

strictEqual((producerZero[kOptions] as { retries: number }).retries, Number.MAX_SAFE_INTEGER)
strictEqual((producerOne[kOptions] as { retries: number }).retries, Number.MAX_SAFE_INTEGER)
strictEqual((producerNone[kOptions] as { retries: number }).retries, Number.MAX_SAFE_INTEGER)
})

test('constructor should emit warnings for experimental APIs', () => {
const warnings: string[] = []
const originalEmitWarning = process.emitWarning
Expand Down
Loading