In this guide, we go beyond the basics of Python multithreading and dive into a backend scenario: designing a scalable log ingestion system. You’ll learn how to use threading, Queue, RLock, and Event to build thread-safe pipelines step by step.
Python's multithreading is often misunderstood, especially when people expect real parallelism. But when used correctly, it can be incredibly powerful in the right scenarios, particularly I/O-bound systems where you need to handle multiple tasks simultaneously without getting stuck waiting.
In this post, we’ll build a backend system step-by-step using threading
, queue
, and advanced synchronization tools. We’ll go from basic thread creation all the way to building a file ingestion pipeline for a log monitoring system.
Multithreading allows your program to run multiple operations at the same time or at least appear to. It's like having multiple workers doing tasks at once. But in Python, there's a catch:
Python's GIL prevents multiple native threads from executing Python bytecodes at the same time. So:
Python provides the threading
module to create and manage threads. Important terms:
Thread
: A unit of execution.start()
: Begins thread execution.join()
: Waits for a thread to finish.daemon
: A background thread that exits when the main program ends.Lock
, RLock
: Synchronization primitives for safe access to shared data.Queue
: A thread-safe way to send data between threads.Imagine you're building a backend service that processes log files generated by different microservices in real-time. These logs come in from multiple sources and are stored temporarily on disk. Your job is to:
This is I/O-bound and fits perfectly with a multithreading solution.
Let’s start to build it!
Let's start with the simplest possible thread:
import threading
def greet():
print("Hello from thread!")
thread = threading.Thread(target=greet)
thread.start()
thread.join()
# Hello from thread!
What’s happening?
Thread(target=...)
: Tells Python what function to run in the thread..start()
: Starts running that function in a new thread..join()
: Pauses the main thread until the new one finishes.This is the minimum viable thread. In real apps, you’ll typically run threads indefinitely or until a signal.
We'll use queue.Queue
for passing file paths from the watcher to the worker threads. Queues are great because they're thread-safe by default.
import os
import threading
import time
import queue
log_queue = queue.Queue()
def file_watcher(path_to_watch):
seen_files = set()
while True:
for fname in os.listdir(path_to_watch):
full_path = os.path.join(path_to_watch, fname)
if full_path not in seen_files:
log_queue.put(full_path)
seen_files.add(full_path)
time.sleep(1)
seen_files
set to avoid reprocessing the same files.queue.put()
adds a file path into the queue to be processed.Let’s create multiple worker threads to process files in parallel:
def process_log():
while True:
file_path = log_queue.get()
try:
with open(file_path, 'r') as f:
for line in f:
parse_and_store(line)
finally:
log_queue.task_done()
threads = []
for _ in range(4):
t = threading.Thread(target=process_log, daemon=True)
t.start()
threads.append(t)
What this does:
task_done()
lets the queue know that the job is complete.Using multiple threads here allows us to process several files concurrently without writing complex thread management logic.
join()
and task_done()
Let’s keep the system alive while the workers do their job:
watcher_thread = threading.Thread(target=file_watcher, args=("/logs",), daemon=True)
watcher_thread.start()
# Wait for logs to be processed (optional for daemon)
log_queue.join()
join()
blocks until all items in the queue have been processed (i.e., task_done()
was called for each).Suppose we want to keep track of how many log entries each microservice produced. Threads will share this dictionary.
from collections import defaultdict
lock = threading.RLock()
file_stats = defaultdict(int)
def parse_and_store(line):
# extract service name from log line
service = extract_service_name(line)
with lock:
file_stats[service] += 1
# Send to DB or API (omitted here)
Why use RLock
?
Lock
works fine unless the same thread might re-acquire the lock like indirectly through a function call. RLock
(reentrant lock) avoids deadlock in those cases.file_stats
to avoid race conditions.Wrap processing in try/except to catch and retry errors:
try:
process_log_file(file_path)
except Exception as e:
print(f"Error: {e}, re-queuing {file_path}")
log_queue.put(file_path)
Instead of running forever, let’s use an event to stop threads:
stop_event = threading.Event()
def file_watcher(path_to_watch):
while not stop_event.is_set():
# file detection logic
...
# To stop
stop_event.set()
To avoid overloading resources:
sema = threading.Semaphore(3)
def process_log():
while True:
with sema:
file_path = log_queue.get()
# processing logic
This limits concurrent processing to 3 threads at any time.
Multithreading in Python isn’t about raw speed, thanks to the GIL, but it’s extremely useful for I/O-bound systems like our file ingestion engine. When used with queues, locks, and coordination tools like events and semaphores, you can build fast, scalable, and safe background systems.
Tips for production:
Queue
instead of managing list
or dict
across threads.