Problem: Not connected to the Kafka server via SSH tunnel.
Solution:
- Make sure you've established the SSH tunnel first:
ssh -L <local_port>:localhost:<remote_port> <user>@<remote_server> -NTf
- Verify the tunnel is active:
lsof -i :<local_port>(should show ssh process) - Use the same port number in your
bootstrap_serversparameter
% ERROR: Failed to query metadata for topic <topic_name>: Local: Broker transport failure
Connect to ipv6#[::1]:9092 failed: Connection refused
Problem: SSH tunnel not established before running kcat.
Solution:
- Always establish SSH tunnel BEFORE running kcat commands
- Use:
ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 <user>@<remote_server> -NTf - Verify connection with:
kcat -b localhost:9092 -L(should list topics)
Problem: Another SSH tunnel is already using that port, or previous tunnel wasn't killed.
Solution:
- Find and kill existing tunnel:
lsof -ti:<local_port> | xargs kill -9
- Or use a different local port in your SSH command
Problem: Trying to send string directly instead of bytes, or consumer trying to decode already-decoded data.
Solution:
- For Producer: Use
value_serializer=lambda v: json.dumps(v).encode('utf-8')orvalue_serializer=lambda m: dumps(m).encode('utf-8') - For Consumer: If producer used serializer, consumer needs
value_deserializer=lambda m: loads(m.decode('utf-8')). Otherwise, manually decode:message.value.decode('utf-8')
Problem: auto_offset_reset setting or consumer group behavior.
Solution:
- Use
auto_offset_reset='earliest'to read from beginning - Use
auto_offset_reset='latest'to read only new messages - If using same consumer group, Kafka remembers your offset. Either:
- Use a different
group_ideach time, OR - Set
auto_offset_reset='earliest'andenable_auto_commit=Falsefor testing
- Use a different
Problem: Topic hasn't been created yet, or wrong topic name.
Solution:
- Make sure you ran the producer code first to create the topic
- Verify topic exists:
kcat -b localhost:9092 -L(lists all topics) - Check topic name spelling matches exactly (case-sensitive)
Problem: Consumer code trying to decode when value_deserializer already decoded the message.
Solution:
- If using
value_deserializerin consumer,message.valueis already a dict, no need to decode/loads - If NOT using deserializer, then decode:
message.value.decode('utf-8')thenloads(...)
Problem: kafka-python not installed or wrong Python environment.
Solution:
- Activate your virtual environment:
source <env_name>/bin/activate - Install:
pip install kafka-pythonorpip install -r requirements.txt - Verify:
python -c "from kafka import KafkaProducer; print('OK')"
Problem: kcat not installed.
Solution:
- macOS:
brew install kcat - Ubuntu/Debian:
sudo apt-get install kcat - Windows: Use WSL or pair with someone on Mac/Linux for this deliverable
- Always check SSH tunnel first:
lsof -i :<your_port>should show an ssh process - Test connection: Try
kcat -b localhost:<port> -Lto list topics before running Python code - Check topic name: Make sure producer and consumer use the exact same topic name
- Restart consumer: If consumer seems stuck, stop it (Ctrl+C) and restart with a new group_id
- Verify data format: Print
message.valuebefore processing to see what format you're getting