Generator Pipelines
Deep dive · part of Python Generators
Generators compose into lazy data pipelines: each stage pulls from the previous one, so memory use stays constant even with multi-GB inputs. This is the Python equivalent of Unix pipes.
Generator pipelines chain lazy iterators so memory stays O(1) in the pipeline stage even when inputs are gigabytes—log files, sensor streams, or database cursors. Each stage pulls from the previous iterator; nothing materializes the full dataset unless you list() it.
This is the Python expression of Unix pipes: read_lines -> filter_errors -> parse_json -> aggregate runs as nested for-loops without temporary lists. itertools and generator expressions complete the toolkit for slicing and limiting infinite streams.
yield suspends the function preserving local state; next() or for resumes it until StopIteration.
Generator expressions (x*2 for x in it) are single-use iterators like generator functions.
itertools.islice, takewhile, chain, and groupby build pipelines without custom classes.
send(value) pushes data into a generator, enabling coroutine-style transducers (see coroutine-basics).
Closing generators with .close() runs finally blocks; throw() injects exceptions for cleanup tests.
Pipeline stages should accept and return iterators for composability, not lists.
Profile before list()-ing a pipeline: the moment you materialize, you lose the memory advantage. For parallel CPU-bound stages, generators alone are not enough—consider multiprocessing pools feeding queues, or asyncio for I/O-bound stages.
Error handling: if stage three raises, stages one and two may have partially consumed input; design idempotent stages or use tee() sparingly with awareness of buffering costs.
Structured logging pipelines often tag each record dict early, then filter, then serialize once at the sink generator.
Building huge lists inside every stage, defeating laziness.
Reusing a consumed generator without tee(), getting empty downstream stages.
Mixing return with yield in ways that confuse callers about StopIteration.value.
Ignoring encoding when streaming text files on Windows (specify encoding=utf-8).
Name pipeline functions as verbs (read_lines, only_errors) and compose at the bottom of the module.
Use context managers for file opens inside the first generator, not around the whole pipeline.
Cap infinite streams with itertools.islice or a take(n) helper.
Document expected item type per stage (str line vs dict record) for maintainers.
Re-read the examples below with these ideas in mind; change variable names and inputs to match your own project.
The program below demonstrates read big log lazily. Read the comments on each line, run the code, then change names or values to see how the output shifts.
# Example: Read big log lazily
# Run in the REPL or save as a .py file and execute with python.
def read_lines(path):
with open(path) as f:
for line in f:
yield line.rstrip()
def only_errors(lines):
for line in lines:
if "ERROR" in line:
yield line
def first_n(it, n):
for i, x in enumerate(it):
if i >= n: break
yield x
for row in first_n(only_errors(read_lines("app.log")), 10):
print(row)
This sample walks through compose with itertools in a small, runnable script. Paste it into the REPL or save it as a .py file before you continue to the next block.
# Example: Compose with itertools
# Run in the REPL or save as a .py file and execute with python.
from itertools import islice, takewhile
def naturals():
n = 1
while True:
yield n
n += 1
squares = (x*x for x in naturals())
small = takewhile(lambda x: x < 200, squares)
print(list(small))
Here is a hands-on illustration of send() into a generator. Follow the inline comments first; only then execute the snippet and compare the result with what you expected.
# Example: send() into a generator
# Run in the REPL or save as a .py file and execute with python.
def averager():
total, n = 0, 0
while True:
x = yield (total / n if n else None)
total += x
n += 1
g = averager()
next(g) # prime
print(g.send(10))
print(g.send(20))
print(g.send(30))
The program below demonstrates lazy pipeline. Read the comments on each line, run the code, then change names or values to see how the output shifts.
# Chain generators so only one item in flight at a time
def read_lines(path): # source stage
with open(path, encoding="utf-8") as fh: # open file
for line in fh: # stream
yield line.rstrip() # strip newline
def only_errors(lines): # filter stage
for line in lines: # pull upstream
if "ERROR" in line: # match level
yield line # pass downstream
from pathlib import Path # setup sample log
p = Path("app.log"); p.write_text("INFO ok\nERROR boom\n", encoding="utf-8")
for row in only_errors(read_lines(p)): # consume pipeline
print(row) # ERROR boom
This sample walks through itertools slice in a small, runnable script. Paste it into the REPL or save it as a .py file before you continue to the next block.
# itertools composes infinite or large streams
from itertools import islice, takewhile # helpers
def naturals(): # infinite counter
n = 1 # start
while True: # endless
yield n # produce
n += 1 # step
squares = (x * x for x in naturals()) # generator expression stage
small = takewhile(lambda v: v < 200, squares) # stop when >=200
print(list(islice(small, 5))) # first five under 200
Related deep dives on Python Generators: