A lightweight Kafka-protocol-compatible server library and tool for Go. It accepts produce requests from standard Kafka producers without requiring a full Kafka cluster.
- Accepts produce requests from any Kafka producer (kafka-console-producer, librdkafka, franz-go, etc.)
- Pull-based API: call
ReadBatchto receive messages, then acknowledge - SASL authentication (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)
- TLS and mutual TLS (mTLS)
- Topic filtering
- Idempotent producer support
The cmd/ksink tool forwards received messages to an output sink:
# Write raw message values (binary) to stdout, one per line (default)
go run ./cmd/ksink --addr :9092
# Write to a file instead of stdout
go run ./cmd/ksink --addr :9092 --output messages.bin
# Write JSON lines to a file
go run ./cmd/ksink --addr :9092 --output messages.jsonl --output-format jsonl
# Send JSON lines via HTTP POST to an HTTP endpoint
go run ./cmd/ksink --addr :9092 --output http://localhost:8080/ingest --output-format jsonl
# Send JSON lines via HTTPS POST to a remote endpoint
go run ./cmd/ksink --addr :9092 --output https://example.com/ingest --output-format jsonl
# Print the JSON schema for the jsonl output format
go run ./cmd/ksink json-schemaUse --output-format to control how messages are serialized:
| Format | Description |
|---|---|
binary |
Raw message value bytes, newline-delimited (default) |
jsonl |
JSON lines with per-field UTF-8/base64 encoding options |
kcat |
kcat-compatible format string (requires --output-format-string) |
Use --output-separator to set the delimiter appended after each binary
message (default: \n). Common escape sequences (\n, \r, \t, \0) are
interpreted. Use --output-separator-hex for hex-encoded binary delimiters
(e.g. 0a for newline, 00 for null byte). Use --no-separator to clear
the delimiter entirely. Separator options only apply to binary format; jsonl
is always newline-delimited and HTTP output is delimited by HTTP requests.
For JSON output, use --output-json-base64-key and/or
--output-json-base64-value to base64-encode those fields when payloads are
not safe to emit as UTF-8.
When using HTTP output, Kafka message metadata is sent as HTTP headers:
X-Kafka-Topic, X-Kafka-Partition, X-Kafka-Offset, X-Kafka-Key
(base64-encoded), X-Kafka-Header-{name}, X-Kafka-Timestamp
(Unix milliseconds), and X-Kafka-Client-Addr.
# Binary output with no separator to a file
go run ./cmd/ksink --no-separator --output data.bin
# Binary values, one per line
go run ./cmd/ksink --output messages.bin
# JSON with base64-encoded key/value for binary payloads
go run ./cmd/ksink --output-format jsonl --output-json-base64-key --output-json-base64-value --output messages.jsonl
# Binary delimiter using hex encoding (null byte)
go run ./cmd/ksink --output-separator-hex "00" --output messages.bin
# kcat-compatible output formatting
go run ./cmd/ksink --output-format kcat \
--output-format-string 'Topic: %t Partition: %p Offset: %o\nKey: %k\nValue: %s\n---\n'go get github.com/marre/ksinkpackage main
import (
"context"
"fmt"
"log"
"github.com/marre/ksink"
)
func main() {
srv, err := ksink.New(ksink.Config{
Address: ":9092",
})
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
if err := srv.Start(ctx); err != nil {
log.Fatal(err)
}
defer srv.Close(ctx)
log.Printf("Listening on %s", srv.Addr())
for {
msgs, ack, err := srv.ReadBatch(ctx)
if err != nil {
log.Fatal(err)
}
for _, msg := range msgs {
fmt.Printf("topic=%s key=%s value=%s\n", msg.Topic, msg.Key, msg.Value)
}
ack(nil) // acknowledge successful processing
}
}Then send messages using any Kafka producer:
echo "hello world" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testcfg := ksink.Config{
Address: ":9092", // Listen address
AdvertisedAddress: "myhost:9092", // Address advertised to clients
Topics: []string{"events"},// Restrict to specific topics
CertFile: "server.pem", // TLS certificate
KeyFile: "server-key.pem", // TLS private key
MTLSAuth: "require_and_verify",
MTLSCAsFiles: []string{"ca.pem"},
SASL: []ksink.SASLCredential{
{Mechanism: "PLAIN", Username: "user", Password: "pass"},
},
Timeout: 30 * time.Second,
IdleTimeout: 60 * time.Second,
MaxMessageBytes: 1048576,
IdempotentWrite: false,
}