From God-Function to Pipeline: Decomposing a Monolithic Data Processing Script
The Scenario
You ask your AI to write a Python script that reads sales data from a CSV, cleans malformed rows, calculates regional totals, enriches each record by calling an external API, and writes the final results to a new CSV. The model produces a single function that does all of this in 200 sequential lines. It works on the first run. It is also untestable, unreadable, and impossible to debug when row 4,387 causes a crash somewhere in the middle.
The Raw AI Draft
Here is what a model like GPT-4 or Claude typically generates on the first attempt. It runs end-to-end, but every concern is tangled into one monolithic block.
import csv
import requests
def process_sales_data(input_file, output_file):
# Read CSV
results = []
with open(input_file, "r") as f:
reader = csv.DictReader(f)
rows = list(reader)
# Clean data
cleaned = []
for row in rows:
if not row["amount"]:
continue
try:
row["amount"] = float(row["amount"])
except ValueError:
continue
if row["amount"] < 0:
continue
row["region"] = row.get("region", "").strip().upper()
if not row["region"]:
row["region"] = "UNKNOWN"
cleaned.append(row)
# Calculate regional totals
region_totals = {}
for row in cleaned:
region = row["region"]
if region not in region_totals:
region_totals[region] = 0
region_totals[region] += row["amount"]
# Enrich with API data
for row in cleaned:
try:
response = requests.get(
f"https://api.example.com/regions/{row['region']}",
headers={"Authorization": "Bearer sk-secret123"}
)
data = response.json()
row["region_name"] = data.get("name", "")
row["currency"] = data.get("currency", "USD")
except Exception:
row["region_name"] = ""
row["currency"] = "USD"
# Add regional totals to each row
for row in cleaned:
row["region_total"] = region_totals.get(row["region"], 0)
# Write output
with open(output_file, "w", newline="") as f:
fieldnames = ["region", "amount", "region_name", "currency", "region_total"]
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(cleaned)
print(f"Processed {len(cleaned)} rows from {len(rows)} total")
process_sales_data("sales.csv", "output.csv")The Code Smells
- Single function doing five unrelated jobs — Reading CSV, cleaning data, aggregating totals, calling an API, and writing output are five distinct concerns crammed into one function. You cannot test the cleaning logic without also reading a file, calling an API, and writing output.
- Impossible to unit test — To test whether the cleaning step correctly drops rows with negative amounts, you need a real CSV file, a real API endpoint, and a real output file. There is no way to test one step in isolation.
- No data model — Records are raw dictionaries mutated in place throughout the function. If the API adds a new field or the CSV changes a column name, the breakage surfaces silently as missing keys or wrong values — not as clear errors.
- API called once per row with no caching — If 500 rows share the same 10 regions, the script makes 500 API calls instead of 10. This wastes time, money, and API quota.
- Hardcoded API key in source code —
"Bearer sk-secret123"is embedded directly in the function and will be committed to version control. - Bare except clause swallows all errors —
except Exceptioncatches everything includingKeyboardInterruptandSystemExit. The API could be returning 404s for every request, and the script would silently replace all region names with empty strings without any indication something is wrong. - No observability — When the script finishes, it prints a single line. There is no visibility into how many rows were dropped during cleaning, how many API calls failed, or whether the enrichment step succeeded at all.
- Hardcoded file paths in the call —
process_sales_data("sales.csv", "output.csv")at module level means the script can only ever process one specific file pair. Changing the input requires editing the source code.
The Best Practices
The Single Responsibility Principle. Each function should do exactly one thing. "Read a CSV" is one thing. "Validate and normalize a row" is another. "Call an API to enrich a record" is a third. When each function has a single responsibility, it can be tested independently, reused in other pipelines, and debugged without understanding the entire system. If a function's name requires the word "and," it has too many responsibilities.
The Pipeline Pattern. Structure your data processing as a sequence of discrete stages, where each stage takes an input, transforms it, and passes the result to the next stage. This pattern — sometimes called "chain" or "pipes and filters" — makes the data flow explicit and visible. Each stage is a pure function (or close to it), which means you can test it with synthetic inputs, swap stages in and out, and add logging or metrics between stages without modifying any stage's internals.
Dataclasses as Domain Models. Replace raw dictionaries with Python dataclasses. A SalesRecord(region="US", amount=42.0) is self-documenting: the field names are explicit, type hints enable static analysis, and default values enforce invariants. Dictionaries provide none of these guarantees — a typo like row["ammount"] silently produces a KeyError at runtime instead of being caught by an IDE or type checker.
Response Caching for Repeated API Calls. If multiple records share the same region, the region metadata is identical for all of them. Cache the API response on first fetch and serve subsequent lookups from the cache. This is not premature optimization — it is the difference between 10 API calls and 500, which directly impacts runtime, cost, and rate limit headroom.
Pipeline Statistics for Observability. Use a statistics dataclass that is threaded through the pipeline. Each stage updates the stats object with what it processed, what it dropped, and what failed. At the end, you have a complete report. This replaces the common anti-pattern of debugging a data pipeline by adding print() statements, running it, and then removing them.
Dependency Injection for External Services. Do not create HTTP clients inside the function that needs them. Pass the client as a parameter. This makes testing trivial — swap in a mock client — and decouples the function from a specific HTTP library, authentication scheme, or endpoint configuration.
The Refactored Code
import csv
import os
import logging
from dataclasses import dataclass, field, asdict
import httpx
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# --- Data Models ---
@dataclass
class SalesRecord:
"""A single validated sales record."""
region: str
amount: float
region_name: str = ""
currency: str = "USD"
region_total: float = 0.0
@dataclass
class PipelineStats:
"""Tracks metrics across the pipeline for observability."""
rows_read: int = 0
rows_dropped: int = 0
rows_enriched: int = 0
enrichment_errors: int = 0
# --- Stage 1: Read ---
def read_csv(file_path: str) -> list[dict]:
"""Read raw rows from a CSV file. Pure I/O — no transformation."""
with open(file_path, "r", encoding="utf-8") as f:
rows = list(csv.DictReader(f))
logger.info(f"Read {len(rows)} rows from {file_path}")
return rows
# --- Stage 2: Clean ---
def clean_record(raw: dict) -> SalesRecord | None:
"""Validate and normalize a single raw row. Returns None if invalid."""
raw_amount = raw.get("amount", "")
if not raw_amount:
return None
try:
amount = float(raw_amount)
except ValueError:
return None
if amount < 0:
return None
region = raw.get("region", "").strip().upper() or "UNKNOWN"
return SalesRecord(region=region, amount=amount)
def clean_all(raw_rows: list[dict], stats: PipelineStats) -> list[SalesRecord]:
"""Clean all rows and track how many were dropped."""
stats.rows_read = len(raw_rows)
cleaned = []
for row in raw_rows:
record = clean_record(row)
if record is not None:
cleaned.append(record)
else:
stats.rows_dropped += 1
logger.info(f"Cleaned: {len(cleaned)} valid, {stats.rows_dropped} dropped")
return cleaned
# --- Stage 3: Aggregate ---
def compute_region_totals(records: list[SalesRecord]) -> dict[str, float]:
"""Calculate total sales per region."""
totals: dict[str, float] = {}
for record in records:
totals[record.region] = totals.get(record.region, 0) + record.amount
return totals
def attach_totals(records: list[SalesRecord], totals: dict[str, float]) -> None:
"""Mutate records in place to include their regional total."""
for record in records:
record.region_total = totals.get(record.region, 0.0)
# --- Stage 4: Enrich ---
def enrich_records(
records: list[SalesRecord],
client: httpx.Client,
api_url: str,
stats: PipelineStats,
) -> None:
"""Enrich each record with region metadata from an external API."""
# Cache API responses — same region data does not change between rows
cache: dict[str, dict] = {}
for record in records:
if record.region in cache:
data = cache[record.region]
else:
try:
response = client.get(f"{api_url}/regions/{record.region}")
response.raise_for_status()
data = response.json()
cache[record.region] = data
stats.rows_enriched += 1
except (httpx.HTTPStatusError, httpx.TransportError) as e:
logger.warning(f"Enrichment failed for region {record.region}: {e}")
data = {}
stats.enrichment_errors += 1
record.region_name = data.get("name", "")
record.currency = data.get("currency", "USD")
# --- Stage 5: Write ---
def write_csv(records: list[SalesRecord], file_path: str) -> None:
"""Write final records to a CSV file."""
if not records:
logger.warning("No records to write")
return
with open(file_path, "w", newline="", encoding="utf-8") as f:
fieldnames = list(asdict(records[0]).keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for record in records:
writer.writerow(asdict(record))
logger.info(f"Wrote {len(records)} records to {file_path}")
# --- Pipeline Orchestrator ---
def run_pipeline(input_file: str, output_file: str) -> PipelineStats:
"""Execute the full data processing pipeline in discrete stages."""
api_url = os.environ["REGION_API_URL"]
api_key = os.environ["REGION_API_KEY"]
stats = PipelineStats()
# Stage 1: Read raw data
raw_rows = read_csv(input_file)
# Stage 2: Clean and validate
records = clean_all(raw_rows, stats)
# Stage 3: Aggregate regional totals
totals = compute_region_totals(records)
attach_totals(records, totals)
# Stage 4: Enrich with external data
with httpx.Client(
headers={"Authorization": f"Bearer {api_key}"},
timeout=httpx.Timeout(10.0, connect=5.0),
) as client:
enrich_records(records, client, api_url, stats)
# Stage 5: Write results
write_csv(records, output_file)
logger.info(
f"Pipeline complete: {stats.rows_read} read, "
f"{stats.rows_dropped} dropped, "
f"{stats.rows_enriched} enriched, "
f"{stats.enrichment_errors} enrichment errors"
)
return stats
if __name__ == "__main__":
run_pipeline("sales.csv", "output.csv")The Benchmarks
| Metric | Before | After | Improvement |
|---|---|---|---|
| Functions that can be unit tested in isolation | 0 out of 1 | 7 out of 7 | 100% testable |
| Redundant API calls (500 rows, 10 regions) | 500 calls | 10 calls (cached) | 50x fewer API calls |
| Time to locate a bug in data cleaning | Read entire 200-line function | Read clean_record (8 lines) | 25x less code to review |
| Pipeline observability | 1 print statement | Full stats: read, dropped, enriched, errors | Complete visibility |
| Lines per function (average) | ~60 lines | ~12 lines | 5x more focused |
The Prompt Tip
Write a Python data processing script that reads sales data from a CSV, cleans it, calculates regional totals, enriches records from an API, and writes results to a new CSV. Requirements: decompose the logic into separate functions — one for reading, one for cleaning a single record, one for cleaning all records, one for aggregation, one for API enrichment, and one for writing. Define a SalesRecord dataclass with typed fields instead of using raw dictionaries. Define a PipelineStats dataclass to track rows read, rows dropped, rows enriched, and enrichment errors. Cache API responses by region so each unique region is fetched only once. Pass the httpx.Client as a parameter to the enrichment function instead of creating it inside. Read the API URL and key from environment variables. Use Python's logging module for structured output. Create a run_pipeline function that orchestrates all stages in sequence. Add type hints and docstrings to every function.