From God-Function to Pipeline: Decomposing a Monolithic Data Processing Script

The Scenario

You ask your AI to write a Python script that reads sales data from a CSV, cleans malformed rows, calculates regional totals, enriches each record by calling an external API, and writes the final results to a new CSV. The model produces a single function that does all of this in 200 sequential lines. It works on the first run. It is also untestable, unreadable, and impossible to debug when row 4,387 causes a crash somewhere in the middle.

The Raw AI Draft

Here is what a model like GPT-4 or Claude typically generates on the first attempt. It runs end-to-end, but every concern is tangled into one monolithic block.

Before — The Naive AI Draft
import csv
import requests

def process_sales_data(input_file, output_file):
  # Read CSV
  results = []
  with open(input_file, "r") as f:
      reader = csv.DictReader(f)
      rows = list(reader)

  # Clean data
  cleaned = []
  for row in rows:
      if not row["amount"]:
          continue
      try:
          row["amount"] = float(row["amount"])
      except ValueError:
          continue
      if row["amount"] < 0:
          continue
      row["region"] = row.get("region", "").strip().upper()
      if not row["region"]:
          row["region"] = "UNKNOWN"
      cleaned.append(row)

  # Calculate regional totals
  region_totals = {}
  for row in cleaned:
      region = row["region"]
      if region not in region_totals:
          region_totals[region] = 0
      region_totals[region] += row["amount"]

  # Enrich with API data
  for row in cleaned:
      try:
          response = requests.get(
              f"https://api.example.com/regions/{row['region']}",
              headers={"Authorization": "Bearer sk-secret123"}
          )
          data = response.json()
          row["region_name"] = data.get("name", "")
          row["currency"] = data.get("currency", "USD")
      except Exception:
          row["region_name"] = ""
          row["currency"] = "USD"

  # Add regional totals to each row
  for row in cleaned:
      row["region_total"] = region_totals.get(row["region"], 0)

  # Write output
  with open(output_file, "w", newline="") as f:
      fieldnames = ["region", "amount", "region_name", "currency", "region_total"]
      writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
      writer.writeheader()
      writer.writerows(cleaned)

  print(f"Processed {len(cleaned)} rows from {len(rows)} total")

process_sales_data("sales.csv", "output.csv")

The Code Smells

⚠️Code Smells — What's Wrong Here?
  • Single function doing five unrelated jobs — Reading CSV, cleaning data, aggregating totals, calling an API, and writing output are five distinct concerns crammed into one function. You cannot test the cleaning logic without also reading a file, calling an API, and writing output.
  • Impossible to unit test — To test whether the cleaning step correctly drops rows with negative amounts, you need a real CSV file, a real API endpoint, and a real output file. There is no way to test one step in isolation.
  • No data model — Records are raw dictionaries mutated in place throughout the function. If the API adds a new field or the CSV changes a column name, the breakage surfaces silently as missing keys or wrong values — not as clear errors.
  • API called once per row with no caching — If 500 rows share the same 10 regions, the script makes 500 API calls instead of 10. This wastes time, money, and API quota.
  • Hardcoded API key in source code"Bearer sk-secret123" is embedded directly in the function and will be committed to version control.
  • Bare except clause swallows all errorsexcept Exception catches everything including KeyboardInterrupt and SystemExit. The API could be returning 404s for every request, and the script would silently replace all region names with empty strings without any indication something is wrong.
  • No observability — When the script finishes, it prints a single line. There is no visibility into how many rows were dropped during cleaning, how many API calls failed, or whether the enrichment step succeeded at all.
  • Hardcoded file paths in the callprocess_sales_data("sales.csv", "output.csv") at module level means the script can only ever process one specific file pair. Changing the input requires editing the source code.

The Best Practices

The Single Responsibility Principle. Each function should do exactly one thing. "Read a CSV" is one thing. "Validate and normalize a row" is another. "Call an API to enrich a record" is a third. When each function has a single responsibility, it can be tested independently, reused in other pipelines, and debugged without understanding the entire system. If a function's name requires the word "and," it has too many responsibilities.

The Pipeline Pattern. Structure your data processing as a sequence of discrete stages, where each stage takes an input, transforms it, and passes the result to the next stage. This pattern — sometimes called "chain" or "pipes and filters" — makes the data flow explicit and visible. Each stage is a pure function (or close to it), which means you can test it with synthetic inputs, swap stages in and out, and add logging or metrics between stages without modifying any stage's internals.

Dataclasses as Domain Models. Replace raw dictionaries with Python dataclasses. A SalesRecord(region="US", amount=42.0) is self-documenting: the field names are explicit, type hints enable static analysis, and default values enforce invariants. Dictionaries provide none of these guarantees — a typo like row["ammount"] silently produces a KeyError at runtime instead of being caught by an IDE or type checker.

Response Caching for Repeated API Calls. If multiple records share the same region, the region metadata is identical for all of them. Cache the API response on first fetch and serve subsequent lookups from the cache. This is not premature optimization — it is the difference between 10 API calls and 500, which directly impacts runtime, cost, and rate limit headroom.

Pipeline Statistics for Observability. Use a statistics dataclass that is threaded through the pipeline. Each stage updates the stats object with what it processed, what it dropped, and what failed. At the end, you have a complete report. This replaces the common anti-pattern of debugging a data pipeline by adding print() statements, running it, and then removing them.

Dependency Injection for External Services. Do not create HTTP clients inside the function that needs them. Pass the client as a parameter. This makes testing trivial — swap in a mock client — and decouples the function from a specific HTTP library, authentication scheme, or endpoint configuration.

The Refactored Code

After — Production-Ready
import csv
import os
import logging
from dataclasses import dataclass, field, asdict

import httpx

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)


# --- Data Models ---

@dataclass
class SalesRecord:
  """A single validated sales record."""
  region: str
  amount: float
  region_name: str = ""
  currency: str = "USD"
  region_total: float = 0.0


@dataclass
class PipelineStats:
  """Tracks metrics across the pipeline for observability."""
  rows_read: int = 0
  rows_dropped: int = 0
  rows_enriched: int = 0
  enrichment_errors: int = 0


# --- Stage 1: Read ---

def read_csv(file_path: str) -> list[dict]:
  """Read raw rows from a CSV file. Pure I/O — no transformation."""
  with open(file_path, "r", encoding="utf-8") as f:
      rows = list(csv.DictReader(f))
  logger.info(f"Read {len(rows)} rows from {file_path}")
  return rows


# --- Stage 2: Clean ---

def clean_record(raw: dict) -> SalesRecord | None:
  """Validate and normalize a single raw row. Returns None if invalid."""
  raw_amount = raw.get("amount", "")
  if not raw_amount:
      return None
  try:
      amount = float(raw_amount)
  except ValueError:
      return None
  if amount < 0:
      return None

  region = raw.get("region", "").strip().upper() or "UNKNOWN"
  return SalesRecord(region=region, amount=amount)


def clean_all(raw_rows: list[dict], stats: PipelineStats) -> list[SalesRecord]:
  """Clean all rows and track how many were dropped."""
  stats.rows_read = len(raw_rows)
  cleaned = []
  for row in raw_rows:
      record = clean_record(row)
      if record is not None:
          cleaned.append(record)
      else:
          stats.rows_dropped += 1
  logger.info(f"Cleaned: {len(cleaned)} valid, {stats.rows_dropped} dropped")
  return cleaned


# --- Stage 3: Aggregate ---

def compute_region_totals(records: list[SalesRecord]) -> dict[str, float]:
  """Calculate total sales per region."""
  totals: dict[str, float] = {}
  for record in records:
      totals[record.region] = totals.get(record.region, 0) + record.amount
  return totals


def attach_totals(records: list[SalesRecord], totals: dict[str, float]) -> None:
  """Mutate records in place to include their regional total."""
  for record in records:
      record.region_total = totals.get(record.region, 0.0)


# --- Stage 4: Enrich ---

def enrich_records(
  records: list[SalesRecord],
  client: httpx.Client,
  api_url: str,
  stats: PipelineStats,
) -> None:
  """Enrich each record with region metadata from an external API."""
  # Cache API responses — same region data does not change between rows
  cache: dict[str, dict] = {}
  for record in records:
      if record.region in cache:
          data = cache[record.region]
      else:
          try:
              response = client.get(f"{api_url}/regions/{record.region}")
              response.raise_for_status()
              data = response.json()
              cache[record.region] = data
              stats.rows_enriched += 1
          except (httpx.HTTPStatusError, httpx.TransportError) as e:
              logger.warning(f"Enrichment failed for region {record.region}: {e}")
              data = {}
              stats.enrichment_errors += 1

      record.region_name = data.get("name", "")
      record.currency = data.get("currency", "USD")


# --- Stage 5: Write ---

def write_csv(records: list[SalesRecord], file_path: str) -> None:
  """Write final records to a CSV file."""
  if not records:
      logger.warning("No records to write")
      return
  with open(file_path, "w", newline="", encoding="utf-8") as f:
      fieldnames = list(asdict(records[0]).keys())
      writer = csv.DictWriter(f, fieldnames=fieldnames)
      writer.writeheader()
      for record in records:
          writer.writerow(asdict(record))
  logger.info(f"Wrote {len(records)} records to {file_path}")


# --- Pipeline Orchestrator ---

def run_pipeline(input_file: str, output_file: str) -> PipelineStats:
  """Execute the full data processing pipeline in discrete stages."""
  api_url = os.environ["REGION_API_URL"]
  api_key = os.environ["REGION_API_KEY"]

  stats = PipelineStats()

  # Stage 1: Read raw data
  raw_rows = read_csv(input_file)

  # Stage 2: Clean and validate
  records = clean_all(raw_rows, stats)

  # Stage 3: Aggregate regional totals
  totals = compute_region_totals(records)
  attach_totals(records, totals)

  # Stage 4: Enrich with external data
  with httpx.Client(
      headers={"Authorization": f"Bearer {api_key}"},
      timeout=httpx.Timeout(10.0, connect=5.0),
  ) as client:
      enrich_records(records, client, api_url, stats)

  # Stage 5: Write results
  write_csv(records, output_file)

  logger.info(
      f"Pipeline complete: {stats.rows_read} read, "
      f"{stats.rows_dropped} dropped, "
      f"{stats.rows_enriched} enriched, "
      f"{stats.enrichment_errors} enrichment errors"
  )
  return stats


if __name__ == "__main__":
  run_pipeline("sales.csv", "output.csv")

The Benchmarks

📊Code Quality and Maintainability Metrics
MetricBeforeAfterImprovement
Functions that can be unit tested in isolation0 out of 17 out of 7100% testable
Redundant API calls (500 rows, 10 regions)500 calls10 calls (cached)50x fewer API calls
Time to locate a bug in data cleaningRead entire 200-line functionRead clean_record (8 lines)25x less code to review
Pipeline observability1 print statementFull stats: read, dropped, enriched, errorsComplete visibility
Lines per function (average)~60 lines~12 lines5x more focused

The Prompt Tip

💡Prompt Tip — Feed This to Your AI

Write a Python data processing script that reads sales data from a CSV, cleans it, calculates regional totals, enriches records from an API, and writes results to a new CSV. Requirements: decompose the logic into separate functions — one for reading, one for cleaning a single record, one for cleaning all records, one for aggregation, one for API enrichment, and one for writing. Define a SalesRecord dataclass with typed fields instead of using raw dictionaries. Define a PipelineStats dataclass to track rows read, rows dropped, rows enriched, and enrichment errors. Cache API responses by region so each unique region is fetched only once. Pass the httpx.Client as a parameter to the enrichment function instead of creating it inside. Read the API URL and key from environment variables. Use Python's logging module for structured output. Create a run_pipeline function that orchestrates all stages in sequence. Add type hints and docstrings to every function.