Async Producer Consumer

Deep dive · part of Python Asyncio

An asyncio.Queue decouples producers from consumers. Producers put work items; consumers get them and process. This is the canonical pattern for back-pressured async pipelines.

asyncio.Queue decouples producers that generate work from consumers that process it, with optional maxsize back-pressure so fast producers do not drown slow consumers. This pattern structures crawlers, worker pools, and stream processors without threads.

Sentinel values (None) or TaskGroup cancellation propagate shutdown; understanding get/put await points helps you avoid deadlocks when all consumers exit early.

Production code combines this topic with logging, tests, and clear module boundaries so refactors stay safe when requirements grow.

Queue(maxsize=n) blocks put() when full and get() when empty—natural throttling.

asyncio.create_task spawns concurrent coroutines; gather waits for completion.

CancelledError must be re-raised after cleanup in except blocks (Python 3.8+ idioms).

task.cancel() requests cooperative cancellation at the next await.

Multiple consumers on one queue load-balance items unpredictably but fairly.

Producer-consumer differs from gather-all URLs: queues buffer uneven speeds.

For graceful shutdown, use asyncio.Event shared between producers and consumers, or send a poison pill per consumer count. TaskGroup (3.11+) simplifies structured concurrency when any child failure should cancel siblings.

Metrics: track queue depth over time—monotonic growth signals consumers are too slow or stuck.

Bridge sync code with asyncio.to_thread for legacy libraries inside consumers without blocking the loop.

When designing shutdown, count consumers and send one sentinel per consumer so every task exits; a single None may starve remaining workers if they block on get() forever.

When shutting down, send one sentinel per consumer so every worker unblocks; a single None starves parallel consumers still waiting on get().

Read the parent tutorial on pythondeck.com for runnable snippets, then reproduce them locally in a virtual environment with pinned dependency versions matching your deployment target.

When pairing with teammates, agree on one idiomatic pattern per concern—mixed styles in one repo slow reviews and invite subtle integration bugs during merges.

Queue size zero (unbounded) letting memory grow without limit under a fast producer.

Forgetting await on put/get, creating coroutine objects that never run.

Swallowing CancelledError without re-raise, leaving tasks half-dead.

Sharing non-thread-safe objects across tasks without documenting single-thread assumption.

Pick maxsize from consumer throughput tests, not arbitrary defaults.

Use asyncio.timeout (3.11+) around consumer work units.

Log task exceptions via gather(return_exceptions=True) during development.

Integration-test shutdown by cancelling producers mid-stream.

Re-read the examples below with these ideas in mind; change variable names and inputs to match your own project.

The program below demonstrates bounded queue. Read the comments on each line, run the code, then change names or values to see how the output shifts.

# Example: Bounded queue
# Run in the REPL or save as a .py file and execute with python.
import asyncio, random

async def producer(q, n):
    for i in range(n):
        await asyncio.sleep(random.uniform(0, 0.1))
        await q.put(i)
    await q.put(None)   # sentinel

async def consumer(q, name):
    while True:
        item = await q.get()
        if item is None:
            await q.put(None)   # let others stop too
            break
        print(name, "->", item)

async def main():
    q = asyncio.Queue(maxsize=4)
    await asyncio.gather(
        producer(q, 10),
        consumer(q, "A"),
        consumer(q, "B"),
    )

asyncio.run(main())

This sample walks through cancel a task in a small, runnable script. Paste it into the REPL or save it as a .py file before you continue to the next block.

# Example: Cancel a task
# Run in the REPL or save as a .py file and execute with python.
import asyncio

async def long_job():
    try:
        for i in range(10):
            print("working", i)
            await asyncio.sleep(0.2)
    except asyncio.CancelledError:
        print("cleanup")
        raise

async def main():
    t = asyncio.create_task(long_job())
    await asyncio.sleep(0.5)
    t.cancel()
    try: await t
    except asyncio.CancelledError: pass

asyncio.run(main())

Here is a hands-on illustration of async queue. Follow the inline comments first; only then execute the snippet and compare the result with what you expected.

# asyncio.Queue decouples producers and consumers
import asyncio  # asyncio

async def producer(q, n):  # enqueue items
    for i in range(n):  # produce n
        await q.put(i)  # await slot
    await q.put(None)  # sentinel stops consumers

async def consumer(q, name):  # dequeue items
    while True:  # until sentinel
        item = await q.get()  # wait for work
        if item is None:  # stop signal
            await q.put(None)  # propagate sentinel
            break  # exit loop
        print(name, item)  # process

async def main():  # setup
    q = asyncio.Queue(maxsize=2)  # bounded back-pressure
    await asyncio.gather(producer(q, 4), consumer(q, "A"))  # run
asyncio.run(main())  # go

The program below demonstrates cancel task. Read the comments on each line, run the code, then change names or values to see how the output shifts.

# Tasks can be cancelled cooperatively
import asyncio  # library

async def long_job():  # cancellable work
    try:  # cancellation protocol
        for i in range(5):  # steps
            print("step", i)  # progress
            await asyncio.sleep(0.1)  # yield
    except asyncio.CancelledError:  # on cancel
        print("cleanup")  # teardown
        raise  # re-raise required

async def main():  # supervisor
    t = asyncio.create_task(long_job())  # schedule
    await asyncio.sleep(0.25)  # let it run briefly
    t.cancel()  # request cancellation
    try: await t  # await completion
    except asyncio.CancelledError: pass  # expected

asyncio.run(main())  # run

« back to Python Asyncio All tutorials