diff --git a/test/clients/consumer/messages-stream.test.ts b/test/clients/consumer/messages-stream.test.ts index 0ed2d42..3b049d4 100644 --- a/test/clients/consumer/messages-stream.test.ts +++ b/test/clients/consumer/messages-stream.test.ts @@ -48,7 +48,8 @@ import { mockedErrorMessage, mockedOperationId, mockMetadata, - mockMethod + mockMethod, + waitFor } from '../../helpers.ts' const defaultRetryDelay = 500 @@ -750,6 +751,10 @@ test('should support consume-transform-produce patterns', async t => { } await transaction.commit() + await waitFor(async () => { + const committedOffsets = await consumer.listCommittedOffsets({ topics: [{ topic, partitions: [0] }] }) + deepStrictEqual(committedOffsets.get(topic), [10n]) + }) await consumer.close(true) // Now consume again to verify all messages were committed @@ -765,7 +770,7 @@ test('should support consume-transform-produce patterns', async t => { stream2.on('data', message => resolve(message.key)) // This will return immediately if there are messages - deepStrictEqual(await executeWithTimeout(promise, 1000, 'timeout'), 'timeout') + deepStrictEqual(await executeWithTimeout(promise, 3000, 'timeout'), 'timeout') }) test('should support maxFetches option', async t => {