From Naive API Client to Production-Ready: Adding Circuit Breakers and Retry Logic

The Scenario

You ask your AI to write a Python script that fetches customer data from an external REST API and writes it to a database. The model produces a clean, readable script — but it only works on a sunny day. The moment the API is slow, rate-limited, or down, the script crashes with an unhandled exception and takes your entire pipeline with it.

The Raw AI Draft

Here is what a model like GPT-4 or Claude typically generates on the first attempt. It works, it reads well, and it will destroy your production system.

Before — The Naive AI Draft
import requests
import sqlite3

def fetch_customers():
  response = requests.get("https://api.example.com/customers",
                          headers={"Authorization": "Bearer sk-1234567890abcdef"})
  data = response.json()

  conn = sqlite3.connect("customers.db")
  cursor = conn.cursor()
  cursor.execute("CREATE TABLE IF NOT EXISTS customers (id TEXT, name TEXT, email TEXT)")

  for customer in data:
      cursor.execute("INSERT INTO customers VALUES (?, ?, ?)",
                     (customer["id"], customer["name"], customer["email"]))

  conn.commit()
  conn.close()
  print(f"Imported {len(data)} customers")

fetch_customers()

The Code Smells

⚠️Code Smells — What's Wrong Here?
  • Hardcoded API key in source code"Bearer sk-1234567890abcdef" is committed to version control. Anyone with repo access has your API key. This is a security incident waiting to happen.
  • No error handling around the HTTP request — A network timeout, DNS failure, or 500 response will raise an unhandled exception and crash the script with a stack trace.
  • No retry logic for transient failures — If the API returns a 429 (rate limit) or 503 (temporarily unavailable), the script dies instead of waiting and retrying.
  • No request timeout configuredrequests.get() with no timeout can hang indefinitely, blocking your entire pipeline on a single slow response.
  • Blind INSERT without conflict handling — Running the script twice duplicates all rows. There is no idempotency — every execution corrupts the data further.
  • Database connection never closed on error — If the INSERT loop throws an exception, conn.close() is never called, leaking the database connection.
  • No loggingprint() with no timestamp, level, or structure. In production, you need to know when something failed and how badly, not just that it printed a line.
  • No circuit breaker — If the API is down, the script hammers it repeatedly on every scheduled run, making the outage worse and potentially getting your IP banned.

The Best Practices

Environment Variables for Secrets. API keys, database paths, and endpoint URLs must come from environment variables or a secrets manager — never from source code. This prevents accidental exposure via version control and enables different configurations per environment (dev/staging/prod).

Exponential Backoff with Jitter. When a request fails with a transient error (429, 503, timeout), wait before retrying — and double the wait time with each attempt. This gives the remote service time to recover instead of flooding it with retry storms. Adding random jitter prevents the "thundering herd" problem when multiple clients retry simultaneously.

The Circuit Breaker Pattern. Track consecutive failures to a service. After a threshold (e.g., 5 failures), "open" the circuit — refuse all requests immediately instead of wasting time on calls that will fail. After a cooldown period, allow one test request through ("half-open"). If it succeeds, close the circuit and resume normal operation. This protects your system from cascading failures.

Idempotent Database Operations. Use INSERT OR REPLACE (SQLite) or ON CONFLICT ... DO UPDATE (PostgreSQL) so that re-running the script produces the same result. Scripts in production will be re-run — after crashes, during recovery, or on schedule. Idempotency must be the default.

Structured Logging. Replace print() with Python's logging module. Include timestamps, log levels (INFO, WARNING, ERROR), and contextual data. This makes production log analysis possible with tools like Datadog, ELK, or even basic grep.

Connection Lifecycle Management. Use context managers (with statements) or try/finally blocks to guarantee that database connections, HTTP clients, and file handles are closed — even when exceptions occur.

The Refactored Code

After — Production-Ready
import httpx
import sqlite3
import os
import time
import logging
from dataclasses import dataclass

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

# Configuration from environment — never hardcode secrets
API_URL = os.environ["CUSTOMER_API_URL"]
API_KEY = os.environ["CUSTOMER_API_KEY"]
DB_PATH = os.getenv("DB_PATH", "customers.db")

# Circuit breaker state
@dataclass
class CircuitBreaker:
  """Prevents cascading failures by stopping calls to a failing service."""
  failure_count: int = 0
  threshold: int = 5
  reset_timeout: float = 60.0
  last_failure_time: float = 0.0
  state: str = "closed"  # closed = normal, open = blocking, half-open = testing

  def record_failure(self) -> None:
      self.failure_count += 1
      self.last_failure_time = time.time()
      if self.failure_count >= self.threshold:
          self.state = "open"
          logger.warning("Circuit breaker OPEN — too many failures, blocking requests")

  def record_success(self) -> None:
      self.failure_count = 0
      self.state = "closed"

  def can_execute(self) -> bool:
      if self.state == "closed":
          return True
      if self.state == "open":
          # Check if enough time has passed to try again
          if time.time() - self.last_failure_time > self.reset_timeout:
              self.state = "half-open"
              logger.info("Circuit breaker HALF-OPEN — testing with one request")
              return True
          return False
      return True  # half-open: allow one test request


def fetch_with_retry(client: httpx.Client, url: str, breaker: CircuitBreaker,
                   max_retries: int = 3, backoff_base: float = 1.0) -> dict:
  """Fetch URL with exponential backoff and circuit breaker protection."""
  if not breaker.can_execute():
      raise RuntimeError("Circuit breaker is OPEN — refusing request to protect system")

  for attempt in range(max_retries):
      try:
          response = client.get(url)
          response.raise_for_status()

          # Success: reset circuit breaker
          breaker.record_success()
          return response.json()

      except httpx.HTTPStatusError as e:
          if e.response.status_code == 429:
              # Rate limited: respect Retry-After header
              wait = int(e.response.headers.get("Retry-After", backoff_base * (2 ** attempt)))
              logger.warning(f"Rate limited. Waiting {wait}s (attempt {attempt + 1}/{max_retries})")
              time.sleep(wait)
          elif e.response.status_code >= 500:
              # Server error: retry with backoff
              wait = backoff_base * (2 ** attempt)
              logger.warning(f"Server error {e.response.status_code}. Retrying in {wait}s")
              breaker.record_failure()
              time.sleep(wait)
          else:
              # Client error (4xx): do not retry, this is our fault
              logger.error(f"Client error {e.response.status_code}: {e.response.text}")
              raise

      except httpx.TransportError as e:
          # Network-level failure: timeout, DNS, connection refused
          wait = backoff_base * (2 ** attempt)
          logger.warning(f"Transport error: {e}. Retrying in {wait}s")
          breaker.record_failure()
          time.sleep(wait)

  raise RuntimeError(f"All {max_retries} retry attempts exhausted for {url}")


def import_customers() -> int:
  """Fetch customers from API and upsert into local database."""
  breaker = CircuitBreaker()

  with httpx.Client(
      headers={"Authorization": f"Bearer {API_KEY}"},
      timeout=httpx.Timeout(10.0, connect=5.0),
  ) as client:
      data = fetch_with_retry(client, f"{API_URL}/customers", breaker)

  conn = sqlite3.connect(DB_PATH)
  try:
      cursor = conn.cursor()
      cursor.execute("""
          CREATE TABLE IF NOT EXISTS customers (
              id TEXT PRIMARY KEY,
              name TEXT NOT NULL,
              email TEXT NOT NULL
          )
      """)
      # Upsert instead of blind insert — idempotent on re-runs
      for customer in data:
          cursor.execute(
              "INSERT OR REPLACE INTO customers (id, name, email) VALUES (?, ?, ?)",
              (customer["id"], customer["name"], customer["email"]),
          )
      conn.commit()
      count = len(data)
      logger.info(f"Successfully imported {count} customers")
      return count
  finally:
      conn.close()


if __name__ == "__main__":
  import_customers()

The Benchmarks

📊Production Resilience Metrics
MetricBeforeAfterImprovement
Survives API timeoutNo — crashesYes — retries with backoff∞ (from broken to working)
Handles rate limiting (429)CrashesWaits and retries100% recovery
Idempotent re-runsDuplicates all rowsUpserts cleanlyZero data corruption
Secret exposure riskKey in source codeEnvironment variablesEliminated
Connection leak riskLeaks on errorGuaranteed cleanupEliminated

The Prompt Tip

💡Prompt Tip — Feed This to Your AI

Write a Python script that fetches data from a REST API and stores it in a SQLite database. Requirements: use httpx instead of requests. Read the API URL and API key from environment variables. Add a circuit breaker class that tracks failures, opens after 5 consecutive failures, and resets after 60 seconds. Implement retry logic with exponential backoff that handles 429 rate limits (respecting the Retry-After header), 5xx server errors, and network timeouts. Set explicit request timeouts. Use INSERT OR REPLACE for idempotent database writes. Use Python's logging module instead of print. Manage all connections with context managers or try/finally blocks. Add type hints and docstrings to all functions.