Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions site/docs/reference/Connectors/capture-connectors/apache-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ This connector captures streaming data from Apache Kafka topics.

## Supported message formats

This connectors supports Kafka messages encoded in Avro or JSON format.
This connector supports Kafka messages encoded in **Avro**, **Protobuf**, or **JSON** format.

For Avro messages, the connector must be configured to use a [schema
registry](https://docs.confluent.io/platform/current/schema-registry/index.html).
For Avro and Protobuf messages, the connector must be configured to use a [schema
registry](https://docs.confluent.io/platform/current/schema-registry/index.html). Schema references (`import` for Protobuf) are supported for Protobuf schemas but not for Avro or JSON schemas.

Protobuf field names are serialized using the proto field name (snake_case), not the default camelCase JSON mapping.

JSON messages may be read without a schema registry. If the JSON messages were
encoded with a JSON schema, configuring a schema registry is recommended to
Expand All @@ -23,7 +25,7 @@ enable discovery of collection keys if the message key has an associated schema.
- The endpoint to use for connecting to the schema registry
- Username for authentication
- Password for authentication
- Flat schemas, i.e. no use of schema references (`import`, `$ref`), as these are not currently supported
- For Avro and JSON schemas: flat schemas only (no schema references / `$ref`). Protobuf schemas support references (`import`).

:::tip
If you are using the Confluent Cloud Schema Registry, your schema registry
Expand All @@ -49,7 +51,24 @@ key](../../../concepts/collections.md#keys), with the following additional consi
- Keys may contain nested fields, such as types with nested Avro records

If a topic has a registered key schema but it does not fit these requirements,
the default collection key of `parition` and `offset` will be used instead.
the default collection key of `partition` and `offset` will be used instead.

### Captured document structure

Each captured document is constructed from the Kafka message's key, value, and metadata:

- **Message value** fields become top-level fields in the captured document.
- **Message key** fields are merged as top-level fields. If there is a name collision between key and value fields, the key field takes precedence.
- If the key is a **scalar type** (not an object/record), it is captured under a synthetic `_key` field.
- If a key is present but has **no schema**, it is captured as a base64-encoded string under `_key`.
- A **null message payload** (absent value) is interpreted as a deletion tombstone (`_meta.op` is set to `"d"`).
- **Metadata** is captured under the `_meta` object, including:
- `topic` — the Kafka topic name
- `partition` — the partition number
- `offset` — the message offset
- `op` — the operation type (`"u"` for upsert, `"d"` for delete)
- `headers` — message headers (if present), as key-value pairs
- `timestamp` — the message timestamp (if available), either creation time or log append time

### Authentication and connection security

Expand Down
Loading