Check this post on my blog here.
Python's multithreading is often misunderstood, especially when people expect real parallelism. But when used correctly, it can be incredibly powerful in the right scenarios, particularly I/O-bound systems where you need to handle multiple tasks simultaneously without getting stuck waiting.
In this post, we’ll build a backend system step-by-step using threading, queue, and advanced synchronization tools. We’ll go from basic thread creation all the way to building a file ingestion pipeline for a log monitoring system.
Multithreading allows your program to run multiple operations at the same time or at least appear to. It's like having multiple workers doing tasks at once. But in Python, there's a catch:
Python's GIL prevents multiple native threads from executing Python bytecodes at the same time. So:
- For CPU-heavy work: Multithreading doesn't help much. Use multiprocessing instead.
- For I/O-bound work like reading files, making HTTP calls: Multithreading shines because while one thread is waiting for I/O, another can run.
Python provides the threading module to create and manage threads. Important terms:
Thread: A unit of execution.start(): Begins thread execution.join(): Waits for a thread to finish.daemon: A background thread that exits when the main program ends.Lock,RLock: Synchronization primitives for safe access to shared data.Queue: A thread-safe way to send data between threads.
Imagine you're building a backend service that processes log files generated by different microservices in real-time. These logs come in from multiple sources and are stored temporarily on disk. Your job is to:
- Watch a directory for new log files.
- Read and parse them in chunks.
- Send parsed data to an API or database.
- Avoid overloading the system with too many threads.
This is I/O-bound and fits perfectly with a multithreading solution.
Let’s start to build it!
Let's start with the simplest possible thread:
import threading
def greet():
print("Hello from thread!")
thread = threading.Thread(target=greet)
thread.start()
thread.join()
# Hello from thread!What’s happening?
Thread(target=...): Tells Python what function to run in the thread..start(): Starts running that function in a new thread..join(): Pauses the main thread until the new one finishes.
This is the minimum viable thread. In real apps, you’ll typically run threads indefinitely or until a signal.
We'll use queue.Queue for passing file paths from the watcher to the worker threads. Queues are great because they're thread-safe by default.
import os
import threading
import time
import queue
log_queue = queue.Queue()
def file_watcher(path_to_watch):
seen_files = set()
while True:
for fname in os.listdir(path_to_watch):
full_path = os.path.join(path_to_watch, fname)
if full_path not in seen_files:
log_queue.put(full_path)
seen_files.add(full_path)
time.sleep(1)- We maintain a
seen_filesset to avoid reprocessing the same files. queue.put()adds a file path into the queue to be processed.- This thread runs continuously and places new tasks into the queue.
Let’s create multiple worker threads to process files in parallel:
def process_log():
while True:
file_path = log_queue.get()
try:
with open(file_path, 'r') as f:
for line in f:
parse_and_store(line)
finally:
log_queue.task_done()
threads = []
for _ in range(4):
t = threading.Thread(target=process_log, daemon=True)
t.start()
threads.append(t)What this does:
- Each worker thread pulls a file from the queue.
- Opens and processes the file line by line.
- Calls a function to parse and store the data.
task_done()lets the queue know that the job is complete.
Using multiple threads here allows us to process several files concurrently without writing complex thread management logic.
Let’s keep the system alive while the workers do their job:
watcher_thread = threading.Thread(target=file_watcher, args=("/logs",), daemon=True)
watcher_thread.start()
# Wait for logs to be processed (optional for daemon)
log_queue.join()join()blocks until all items in the queue have been processed (i.e.,task_done()was called for each).- This ensures we don't exit prematurely if you're running this as a one-time script.
Suppose we want to keep track of how many log entries each microservice produced. Threads will share this dictionary.
from collections import defaultdict
lock = threading.RLock()
file_stats = defaultdict(int)
def parse_and_store(line):
# extract service name from log line
service = extract_service_name(line)
with lock:
file_stats[service] += 1
# Send to DB or API (omitted here)Why use RLock?
Lockworks fine unless the same thread might re-acquire the lock like indirectly through a function call.RLock(reentrant lock) avoids deadlock in those cases.- Always use locks to guard shared data like
file_statsto avoid race conditions.
Wrap processing in try/except to catch and retry errors:
try:
process_log_file(file_path)
except Exception as e:
print(f"Error: {e}, re-queuing {file_path}")
log_queue.put(file_path)Instead of running forever, let’s use an event to stop threads:
stop_event = threading.Event()
def file_watcher(path_to_watch):
while not stop_event.is_set():
# file detection logic
...
# To stop
stop_event.set()To avoid overloading resources:
sema = threading.Semaphore(3)
def process_log():
while True:
with sema:
file_path = log_queue.get()
# processing logicThis limits concurrent processing to 3 threads at any time.
Multithreading in Python isn’t about raw speed, thanks to the GIL, but it’s extremely useful for I/O-bound systems like our file ingestion engine. When used with queues, locks, and coordination tools like events and semaphores, you can build fast, scalable, and safe background systems.
Tips for production:
- Use
Queueinstead of managinglistordictacross threads. - Always guard shared mutable state.
- Avoid over-engineering, threads are powerful, but also tricky to debug.