From Blocking Loop to Async Pipeline: Parallelizing API Calls with asyncio
The Scenario
You ask your AI to write a Python script that fetches product details from an external API for 500 product IDs. The model produces a tidy for loop that calls the API once per product, waits for the response, then moves to the next. Each call takes roughly 200ms. The math is brutal: 500 calls at 200ms each is 100 seconds of wall-clock time — spent entirely waiting on network I/O while your CPU sits idle.
The Raw AI Draft
Here is what a model like GPT-4 or Claude typically generates on the first attempt. It looks clean, it runs correctly, and it is catastrophically slow.
import requests
def fetch_all_products(product_ids):
results = []
for product_id in product_ids:
response = requests.get(
f"https://api.example.com/products/{product_id}",
headers={"Authorization": "Bearer sk-1234567890abcdef"}
)
data = response.json()
results.append(data)
print(f"Fetched product {product_id}")
return results
product_ids = list(range(1, 501))
products = fetch_all_products(product_ids)
print(f"Done. Fetched {len(products)} products.")The Code Smells
- Sequential I/O in a loop — Every
requests.get()call blocks the entire program until the response arrives. With 500 calls at 200ms each, the script spends 100 seconds doing nothing but waiting on network I/O. The CPU is idle for 99.9% of the runtime. - No concurrency control whatsoever — There is no mechanism to run multiple requests in parallel. Python's
requestslibrary is synchronous by design, so the only option within this code is serial execution. - Hardcoded API key in source code —
"Bearer sk-1234567890abcdef"is embedded directly in the script and will end up in version control. Anyone with repo access has the API key. - No error handling for failed requests — If any single API call returns a 500, 429, or times out, the entire script crashes with an unhandled exception. All 499 remaining products are lost.
- No request timeout —
requests.get()with no timeout will hang indefinitely on a slow or unresponsive server. One stalled connection blocks the entire pipeline forever. - No rate limit awareness — If the API responds with a 429 (Too Many Requests), the script does not pause or back off. It crashes, and if restarted, it will hit the same limit immediately.
- Results accumulate in memory with no structure — Raw dictionaries are appended to a list with no error tracking. There is no way to distinguish successful fetches from failures after the fact.
- No logging —
print()with no timestamp, level, or structure. In production, you cannot filter, aggregate, or alert on print statements.
The Best Practices
Asynchronous I/O with asyncio. When your workload is I/O-bound — waiting on network responses, database queries, or file reads — asyncio lets you run hundreds of operations concurrently on a single thread. Instead of blocking on each HTTP response, your program yields control back to the event loop while waiting, allowing other requests to proceed. For 500 API calls at 200ms each, this collapses wall-clock time from 100 seconds (serial) to roughly 5 seconds (with 20 concurrent connections).
Bounded Concurrency with Semaphores. Unbounded concurrency is as dangerous as no concurrency. If you fire 500 requests simultaneously, you will overwhelm the remote API, exhaust your local file descriptor limit, and likely get rate-limited or banned. asyncio.Semaphore acts as a gatekeeper: it allows at most N coroutines to execute a critical section at the same time. The remaining coroutines wait until a slot opens. This gives you the throughput benefits of parallelism without the blast radius of an uncontrolled stampede.
asyncio.gather for Parallel Execution. asyncio.gather() takes a collection of coroutines and schedules them all onto the event loop. It returns a list of results in the same order as the input — making it easy to correlate responses back to their original requests. Combined with a semaphore, it provides a clean pattern: create all tasks eagerly, gate their actual execution, and collect results as they complete.
Structured Result Types. Instead of returning raw dictionaries with no error context, wrap each API response in a dataclass that tracks success, failure, and the original request identifier. This makes downstream processing robust: you can filter failures, retry specific IDs, generate reports, and log statistics without guessing at the shape of the data.
httpx.AsyncClient for Modern HTTP. The httpx library provides both synchronous and asynchronous HTTP clients with an identical API surface. httpx.AsyncClient supports HTTP/2, configurable timeouts, automatic connection pooling, and cookie persistence. It is the recommended replacement for requests in any async Python project due to its native async/await support.
Retry Logic with Exponential Backoff. Transient failures — 429 rate limits, 503 service unavailable, DNS hiccups — are normal in distributed systems. Retrying immediately just adds more load to a struggling service. Exponential backoff (wait 0.5s, then 1s, then 2s) gives the remote service progressively more time to recover. Respecting the Retry-After header lets the API server itself dictate the cooldown.
The Refactored Code
import asyncio
import os
import logging
from dataclasses import dataclass
import httpx
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# Configuration from environment — never hardcode secrets or endpoints
API_URL = os.environ["PRODUCT_API_URL"]
API_KEY = os.environ["PRODUCT_API_KEY"]
# Concurrency controls — protect the remote service and your own system
MAX_CONCURRENCY = 20 # At most 20 requests in flight at once
REQUEST_TIMEOUT = 10.0 # Per-request timeout in seconds
MAX_RETRIES = 3 # Retry transient failures up to 3 times
BACKOFF_BASE = 0.5 # Base delay for exponential backoff
@dataclass
class FetchResult:
"""Encapsulates the outcome of a single API call."""
product_id: int
data: dict | None = None
error: str | None = None
success: bool = True
async def fetch_product(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
product_id: int,
) -> FetchResult:
"""Fetch a single product with bounded concurrency and retry logic."""
async with semaphore:
# Semaphore limits how many coroutines enter this block simultaneously
for attempt in range(MAX_RETRIES):
try:
response = await client.get(f"{API_URL}/products/{product_id}")
response.raise_for_status()
return FetchResult(product_id=product_id, data=response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
# Rate limited — back off and retry
wait = float(e.response.headers.get(
"Retry-After", BACKOFF_BASE * (2 ** attempt)
))
logger.warning(
f"Rate limited on product {product_id}. "
f"Waiting {wait}s (attempt {attempt + 1}/{MAX_RETRIES})"
)
await asyncio.sleep(wait)
elif e.response.status_code >= 500:
# Server error — retry with backoff
wait = BACKOFF_BASE * (2 ** attempt)
logger.warning(
f"Server error {e.response.status_code} for product {product_id}. "
f"Retrying in {wait}s"
)
await asyncio.sleep(wait)
else:
# Client error (4xx) — do not retry, this is our fault
logger.error(f"Client error for product {product_id}: {e.response.status_code}")
return FetchResult(
product_id=product_id, error=str(e), success=False
)
except httpx.TransportError as e:
# Network failure — retry with backoff
wait = BACKOFF_BASE * (2 ** attempt)
logger.warning(
f"Transport error for product {product_id}: {e}. "
f"Retrying in {wait}s"
)
await asyncio.sleep(wait)
# All retries exhausted for this product
logger.error(f"All {MAX_RETRIES} attempts failed for product {product_id}")
return FetchResult(
product_id=product_id,
error=f"Failed after {MAX_RETRIES} retries",
success=False,
)
async def fetch_all_products(product_ids: list[int]) -> list[FetchResult]:
"""Fetch all products concurrently with bounded parallelism."""
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async with httpx.AsyncClient(
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=httpx.Timeout(REQUEST_TIMEOUT, connect=5.0),
) as client:
# Launch all coroutines at once — the semaphore gates actual execution
tasks = [
fetch_product(client, semaphore, pid)
for pid in product_ids
]
results = await asyncio.gather(*tasks)
succeeded = sum(1 for r in results if r.success)
failed = sum(1 for r in results if not r.success)
logger.info(f"Completed: {succeeded} succeeded, {failed} failed out of {len(results)} total")
return list(results)
if __name__ == "__main__":
import time
product_ids = list(range(1, 501))
start = time.perf_counter()
results = asyncio.run(fetch_all_products(product_ids))
elapsed = time.perf_counter() - start
logger.info(f"Total time: {elapsed:.2f}s")The Benchmarks
| Metric | Before | After | Improvement |
|---|---|---|---|
| Total time for 500 API calls (200ms each) | ~100 seconds | ~5 seconds (20 concurrent) | 20x faster |
| Survives single API failure | No — crashes | Yes — isolates and reports | 100% recovery |
| Handles rate limiting (429) | Crashes | Backs off and retries | Automatic recovery |
| CPU utilization during I/O wait | Blocked idle | Event loop serves other tasks | Near-zero waste |
| Secret exposure risk | Key in source code | Environment variables | Eliminated |
The Prompt Tip
Write a Python script that fetches data from a REST API for a list of 500 IDs concurrently. Requirements: use httpx.AsyncClient instead of requests. Use asyncio with a Semaphore set to 20 to limit concurrent connections. Use asyncio.gather to schedule all requests and collect results. Read the API URL and API key from environment variables. Implement retry logic with exponential backoff for 429 rate limits (respect the Retry-After header), 5xx server errors, and network timeouts. Wrap each result in a dataclass with product_id, data, error, and success fields so failures are tracked individually instead of crashing the script. Set explicit request timeouts via httpx.Timeout. Log progress with Python's logging module instead of print. Measure and log the total elapsed time.