Async Producer Consumer
Deep dive · part of Python Asyncio
An asyncio.Queue decouples producers from consumers. Producers put work items; consumers get them and process. This is the canonical pattern for back-pressured async pipelines.
asyncio.Queue decouples producers that generate work from consumers that process it, with optional maxsize back-pressure so fast producers do not drown slow consumers. This pattern structures crawlers, worker pools, and stream processors without threads.
Sentinel values (None) or TaskGroup cancellation propagate shutdown; understanding get/put await points helps you avoid deadlocks when all consumers exit early.
Production code combines this topic with logging, tests, and clear module boundaries so refactors stay safe when requirements grow.
Queue(maxsize=n) blocks put() when full and get() when empty—natural throttling.
asyncio.create_task spawns concurrent coroutines; gather waits for completion.
CancelledError must be re-raised after cleanup in except blocks (Python 3.8+ idioms).
task.cancel() requests cooperative cancellation at the next await.
Multiple consumers on one queue load-balance items unpredictably but fairly.
Producer-consumer differs from gather-all URLs: queues buffer uneven speeds.
For graceful shutdown, use asyncio.Event shared between producers and consumers, or send a poison pill per consumer count. TaskGroup (3.11+) simplifies structured concurrency when any child failure should cancel siblings.
Metrics: track queue depth over time—monotonic growth signals consumers are too slow or stuck.
Bridge sync code with asyncio.to_thread for legacy libraries inside consumers without blocking the loop.
When designing shutdown, count consumers and send one sentinel per consumer so every task exits; a single None may starve remaining workers if they block on get() forever.
When shutting down, send one sentinel per consumer so every worker unblocks; a single None starves parallel consumers still waiting on get().
Read the parent tutorial on pythondeck.com for runnable snippets, then reproduce them locally in a virtual environment with pinned dependency versions matching your deployment target.
When pairing with teammates, agree on one idiomatic pattern per concern—mixed styles in one repo slow reviews and invite subtle integration bugs during merges.
Queue size zero (unbounded) letting memory grow without limit under a fast producer.
Forgetting await on put/get, creating coroutine objects that never run.
Swallowing CancelledError without re-raise, leaving tasks half-dead.
Sharing non-thread-safe objects across tasks without documenting single-thread assumption.
Pick maxsize from consumer throughput tests, not arbitrary defaults.
Use asyncio.timeout (3.11+) around consumer work units.
Log task exceptions via gather(return_exceptions=True) during development.
Integration-test shutdown by cancelling producers mid-stream.
Re-read the examples below with these ideas in mind; change variable names and inputs to match your own project.
The program below demonstrates bounded queue. Read the comments on each line, run the code, then change names or values to see how the output shifts.
# Example: Bounded queue
# Run in the REPL or save as a .py file and execute with python.
import asyncio, random
async def producer(q, n):
for i in range(n):
await asyncio.sleep(random.uniform(0, 0.1))
await q.put(i)
await q.put(None) # sentinel
async def consumer(q, name):
while True:
item = await q.get()
if item is None:
await q.put(None) # let others stop too
break
print(name, "->", item)
async def main():
q = asyncio.Queue(maxsize=4)
await asyncio.gather(
producer(q, 10),
consumer(q, "A"),
consumer(q, "B"),
)
asyncio.run(main())
This sample walks through cancel a task in a small, runnable script. Paste it into the REPL or save it as a .py file before you continue to the next block.
# Example: Cancel a task
# Run in the REPL or save as a .py file and execute with python.
import asyncio
async def long_job():
try:
for i in range(10):
print("working", i)
await asyncio.sleep(0.2)
except asyncio.CancelledError:
print("cleanup")
raise
async def main():
t = asyncio.create_task(long_job())
await asyncio.sleep(0.5)
t.cancel()
try: await t
except asyncio.CancelledError: pass
asyncio.run(main())
Here is a hands-on illustration of async queue. Follow the inline comments first; only then execute the snippet and compare the result with what you expected.
# asyncio.Queue decouples producers and consumers
import asyncio # asyncio
async def producer(q, n): # enqueue items
for i in range(n): # produce n
await q.put(i) # await slot
await q.put(None) # sentinel stops consumers
async def consumer(q, name): # dequeue items
while True: # until sentinel
item = await q.get() # wait for work
if item is None: # stop signal
await q.put(None) # propagate sentinel
break # exit loop
print(name, item) # process
async def main(): # setup
q = asyncio.Queue(maxsize=2) # bounded back-pressure
await asyncio.gather(producer(q, 4), consumer(q, "A")) # run
asyncio.run(main()) # go
The program below demonstrates cancel task. Read the comments on each line, run the code, then change names or values to see how the output shifts.
# Tasks can be cancelled cooperatively
import asyncio # library
async def long_job(): # cancellable work
try: # cancellation protocol
for i in range(5): # steps
print("step", i) # progress
await asyncio.sleep(0.1) # yield
except asyncio.CancelledError: # on cancel
print("cleanup") # teardown
raise # re-raise required
async def main(): # supervisor
t = asyncio.create_task(long_job()) # schedule
await asyncio.sleep(0.25) # let it run briefly
t.cancel() # request cancellation
try: await t # await completion
except asyncio.CancelledError: pass # expected
asyncio.run(main()) # run
Related deep dives on Python Asyncio: