---
# Content: CC BY-NC-SA 4.0 | Code: MIT - see /LICENSE.md
title: "Data pipelines"
---
## The cell that does everything {#sec-cell-does-everything}
Return one last time to the monolithic cell from Chapter 1 — the one that loads the data, cleans it, engineers features, trains a model, and draws a plot, all in a single 400-line block. It works, and while you're exploring it even feels efficient: everything is in one place, and you can re-run it with a keystroke. The trouble starts when the work has to happen again. New data arrives and you must retrain, so you re-run all 400 lines, including the expensive load you didn't need to repeat. One feature breaks and you re-read the whole cell to find it. A colleague wants just your cleaning logic and can't take it without the training code welded to it.
A pipeline is the same logic, broken into stages that connect. Each stage does one part of the job, takes its input and produces its output explicitly, and can be run, tested, and reused on its own. Nothing about the computation changes; what changes is that the workflow becomes something you can reason about and re-run in pieces rather than a single indivisible block.
## Stages that compose {#sec-stages-compose}
A stage is just a function with one responsibility: load, validate, clean, engineer features, train. Made pure where possible (Chapter 6), each takes data in and returns data out, and the pipeline is their composition.
```{python}
#| label: staged-pipeline
#| echo: true
import numpy as np
import pandas as pd
rng = np.random.default_rng(42)
raw = pd.DataFrame({
"spend": rng.exponential(50, 1_000),
"active_days": rng.integers(0, 365, 1_000),
})
def clean(df: pd.DataFrame) -> pd.DataFrame:
"""Drop customers with no active days — they can't have a daily rate."""
return df[df["active_days"] > 0].copy()
def add_features(df: pd.DataFrame) -> pd.DataFrame:
"""Derive spend per active day."""
return df.assign(spend_per_day=df["spend"] / df["active_days"])
def summarise(df: pd.DataFrame) -> pd.Series:
"""The model-ready summary the downstream step consumes."""
return df["spend_per_day"].describe()
# The pipeline is the composition of the stages.
result = summarise(add_features(clean(raw)))
print(result[["count", "mean", "max"]])
```
Each stage can be read in isolation, tested in isolation (Chapter 7), and reused in isolation — `clean` is now available to any other analysis without dragging the feature engineering along. Read top to bottom, the stage names alone document how raw data becomes a model-ready summary, which the monolithic cell never did.
::: {.callout-note}
## Data Science Bridge
You already build pipelines — just not at this scale. `Pipeline([("scale", StandardScaler()), ("model", LogisticRegression())])` chains transformers and an estimator into a single object so that preprocessing and fitting move together and, crucially, so that scaling fitted on the training fold can't leak into validation. A data pipeline is that same idea scaled up from the modelling steps to the entire workflow: each stage is a step, and the datasets flowing between them are what the `Pipeline` passes implicitly between transformers.
Where it breaks down: an `sklearn` `Pipeline` lives inside one process and one `.fit()` call, holding everything in memory. A workflow pipeline usually spans processes, persisted files, and even schedules — the cleaning might run nightly and the training weekly — so it needs explicit intermediate artefacts and an orchestrator to run the stages in order, not just method chaining. The principle transfers; the machinery is heavier.
:::
## Idempotency and intermediate artefacts {#sec-idempotency}
Once stages are explicit, two properties make a pipeline cheap to live with. The first is **idempotency**: running a stage twice produces the same result as running it once, with no surprising side effects — so a re-run is always safe. The second is **persisting intermediate artefacts**: a stage writes its output to disk (under `data/interim/` or `data/processed/`, per the previous chapter) so that downstream stages, and future re-runs, can read it instead of recomputing it.
Together these make re-runs partial rather than all-or-nothing. If the raw load is expensive but unchanged, you cache it once and never pay for it again until the input changes.
```{python}
#| label: cached-stage
#| echo: true
import tempfile
from pathlib import Path
cache_dir = Path(tempfile.mkdtemp())
def build_features(raw: pd.DataFrame, cache: Path = cache_dir / "features.csv") -> pd.DataFrame:
"""Compute features once; reuse the cached artefact on later runs."""
if cache.exists():
print("loading cached features")
return pd.read_csv(cache)
print("computing features")
out = add_features(clean(raw))
out.to_csv(cache, index=False)
return out
first = build_features(raw) # computes and caches
second = build_features(raw) # reads the cache, skips the work
# Compare the values, not the whole Series: a CSV round-trip doesn't
# preserve the original (gappy) index left by `clean`, so the cached
# and recomputed results match in value even though their indexes differ.
same = np.allclose(first["spend_per_day"].to_numpy(),
second["spend_per_day"].to_numpy())
print(f"same values from cache: {same}")
```
The first call computes and writes the artefact; the second reads it back and skips the work entirely. In a real pipeline an orchestrator handles this caching for you, invalidating a stage's cache when its inputs or code change — but the principle is the one shown here: compute once, persist, reuse.
## Validation gates {#sec-validation-gates}
The seams between stages are the right place to check that the data is what the next stage expects. A **validation gate** asserts the schema, ranges, and assumptions at a boundary — the right columns are present, no nulls where they're forbidden, values in a plausible range, no target leaking into the features — and fails loudly when they don't hold. Catching bad data at the boundary turns a cryptic error three stages downstream into a clear message at the point the problem entered.
```python
# A productionised gate, using pandera to declare the contract as a schema
import pandera.pandas as pa
schema = pa.DataFrameSchema({
"spend": pa.Column(float, pa.Check.ge(0)),
"active_days": pa.Column(int, pa.Check.in_range(0, 366)),
})
def validate(df):
return schema.validate(df) # raises with a precise message on any breach
```
This is testing applied to *data* rather than code (Chapter 7): you're not checking that the cleaning function is correct, but that what arrived at this stage satisfies the contract the next stage relies on. The same exit-code discipline from Chapter 4 applies — a gate that fails should stop the pipeline, not let bad data flow on. Lightweight gates can be plain assertions; `pandera` and Great Expectations formalise them into reusable, documented schemas.
## Orchestrating the stages {#sec-orchestration}
Something has to run the stages in the right order, and re-run only what's needed. For a linear or simply branching workflow, the `make` from Chapter 4 is enough — each stage a target that depends on the artefacts of the stages before it:
```makefile
data/processed/features.csv: data/raw/customers.csv src/features.py
python -m customer_value.features
models/model.pkl: data/processed/features.csv src/train.py
python -m customer_value.train
```
Because each target declares its dependencies, `make` rebuilds a stage only when its inputs or code have changed — the file-level version of the caching above. When pipelines grow complex, scheduled, or need retries and monitoring, purpose-built orchestrators (Snakemake, Prefect, Dagster, Airflow) take over, but the model is the same directed graph of stages. We return to running these automatically in *Continuous integration* and on a schedule in *Deployment*; the companion volume, *Thinking in Uncertainty*, covers the statistical side of pipeline design, such as guarding against train–serve skew.
::: {.callout-tip}
## Author's Note
The monolithic cell blurs the stages together, and that blurring feels like a virtue while you're exploring — it's all in front of you, and you can change anything and re-run instantly. The cost is hidden until the work has to repeat: every run becomes all-or-nothing, every change risks the whole, and the only documentation of how data becomes a model is 400 lines you have to read in full.
Explicit stages invert this. The cost of a change becomes *local* — fix the cleaning, re-run from cleaning, keep the cached raw load — and the pipeline becomes legible, because the stage names and their dependencies *are* the documentation of the workflow. None of this argues against the exploratory monolith; that's the right tool for discovery, when you don't yet know what the stages are. The pipeline is what you build once you do — when the workflow has to run again, on new data, reliably, without you in the room to nurse it through 400 lines.
:::
## Summary {#sec-data-pipelines-summary}
A pipeline turns a monolithic analysis into stages you can run, test, and reuse in pieces:
1. **Break the monolith into composable stages.** Each stage is a single-responsibility function taking data in and returning data out; the pipeline is their composition, and every stage is independently testable and reusable.
2. **Make stages idempotent and cache their outputs.** Re-running is then safe and cheap, and you recompute only the stages whose inputs or code have changed.
3. **Validate at the seams.** A gate that checks schema, ranges, and assumptions between stages catches bad data where it enters, not three stages later — testing the data, not the code.
4. **Orchestrate the graph.** `make` runs simple pipelines and rebuilds only what changed; dedicated orchestrators take over when workflows grow scheduled and complex.
The next chapter addresses the values that thread through every stage but should live in none of them: *configuration and secrets*.
## Exercises {#sec-data-pipelines-exercises}
1. Take a monolithic notebook cell or script of your own that does several things and break it into composable stage functions, each taking and returning data. Run them as a pipeline. Which stage turned out to be useful on its own, in a context you hadn't anticipated?
2. Add a validation gate between two stages: assert the schema and ranges the next stage expects (with plain assertions, or with `pandera`), and make it fail loudly on bad input. What specific bad data from a past project would this gate have caught at the boundary?
3. Make one expensive stage idempotent and cached: persist its output to `data/interim/`, and skip the computation when the artefact already exists. Re-run the pipeline twice and confirm the second run skips the cached stage.
4. **Conceptual:** The Data Science Bridge compares a workflow pipeline to an `sklearn` `Pipeline`. Give one way the analogy holds and one way it breaks down. What does a workflow pipeline need that an `sklearn` `Pipeline` does not?
5. **Conceptual:** Not every analysis needs a pipeline framework. Describe a piece of work for which a single script (or even a notebook) is the right tool, and name the signal that tells you a workflow has outgrown it and needs explicit stages and orchestration.