Skip to main content

Python Async/Await in Practice: asyncio, FastAPI, and Common Pitfalls

Table of Contents
Async/await is not magic. It is cooperative multitasking for I/O-bound work, built on a single-threaded event loop. Getting it wrong does not crash your program; it silently makes it slower and harder to debug. This post covers what the event loop actually does, how to use asyncio correctly, the FastAPI async model, and the mistakes that cost teams weeks.

Python’s async/await syntax landed in Python 3.5 and became genuinely production-ready in Python 3.7 with asyncio.run. Today it powers FastAPI, aiohttp, and most of the modern Python async ecosystem. But a large fraction of the async code I see in the wild has subtle bugs: missing await, calls to blocking libraries inside async functions, or CPU-heavy work that brings the event loop to a halt.

This post is a systematic walkthrough of how async Python actually works, when it helps, when it does not, and the patterns that make async code correct and maintainable.

The Event Loop: What It Actually Does
#

The event loop is a single OS thread that runs an infinite loop. On each iteration it checks: are any I/O operations complete? If yes, resume the coroutine that was waiting for them.

sequenceDiagram
    participant EL as Event Loop
    participant C1 as Coroutine 1
    participant C2 as Coroutine 2
    participant IO as OS / Network

    EL->>C1: resume
    C1->>IO: await socket.read()
    C1-->>EL: suspend (waiting for IO)
    EL->>C2: resume
    C2->>IO: await asyncio.sleep(1)
    C2-->>EL: suspend
    IO-->>EL: socket data ready
    EL->>C1: resume with data
    C1-->>EL: done

The critical insight: there is no parallelism here. Only one coroutine runs at any given instant. The speedup comes from the fact that while coroutine 1 is waiting for a network response, the event loop runs coroutine 2 instead of blocking the thread. If coroutine 2 does CPU work for 500ms instead of awaiting something, coroutine 1’s latency grows by 500ms regardless of how fast the network is.

Basic Coroutines
#

An async def function is a coroutine function. Calling it returns a coroutine object; it does not execute anything until you await it or schedule it.

basics.py
import asyncio


async def fetch_user(user_id: int) -> dict:
    # Simulate a database query.
    await asyncio.sleep(0.05)
    return {"id": user_id, "name": "Alice"}


async def main() -> None:
    user = await fetch_user(1)
    print(user)


asyncio.run(main())
Warning

Calling fetch_user(1) without await does nothing and returns a coroutine object. Python will emit a RuntimeWarning: coroutine 'fetch_user' was never awaited in newer versions, but older code often silently discards the coroutine. This is one of the hardest bugs to spot in async code: a function appears to run but its result is never used and its I/O never happens.

asyncio.gather: Running Coroutines Concurrently
#

The most common real-world async pattern is fetching from multiple sources concurrently and waiting for all of them to finish. asyncio.gather is the primary tool for this.

gather_example.py
import asyncio
import time


async def fetch_from_service(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{name} responded after {delay}s"


async def main() -> None:
    start = time.monotonic()

    # Sequential: total time = 0.3 + 0.2 + 0.4 = 0.9s
    # r1 = await fetch_from_service("users", 0.3)
    # r2 = await fetch_from_service("orders", 0.2)
    # r3 = await fetch_from_service("inventory", 0.4)

    # Concurrent with gather: total time = max(0.3, 0.2, 0.4) = ~0.4s
    r1, r2, r3 = await asyncio.gather(
        fetch_from_service("users", 0.3),
        fetch_from_service("orders", 0.2),
        fetch_from_service("inventory", 0.4),
    )

    elapsed = time.monotonic() - start
    print(f"All done in {elapsed:.2f}s")
    print(r1, r2, r3)


asyncio.run(main())
Note

asyncio.gather returns results in the same order as the input coroutines, regardless of which finished first. If any coroutine raises an exception, gather cancels the rest and re-raises it by default. Pass return_exceptions=True to collect exceptions as values instead of propagating them.

asyncio.create_task vs gather
#

asyncio.gather runs coroutines concurrently and waits for all of them. asyncio.create_task schedules a coroutine to run in the background and returns a Task handle immediately.

tasks_vs_gather.py
import asyncio


async def background_job() -> None:
    await asyncio.sleep(1)
    print("background job done")


async def main() -> None:
    # Fire-and-forget: start the job but don't wait for it.
    task = asyncio.create_task(background_job())

    # Do other work while background_job runs.
    await asyncio.sleep(0.1)
    print("main work done")

    # Optionally wait for it before shutdown.
    await task


asyncio.run(main())

Use create_task when you want to fire-and-forget or manage task lifecycles explicitly. Use gather when you need all results before continuing.

Important

Always keep a reference to tasks created with create_task. If the task object is garbage-collected before it completes, Python cancels it. The common pattern is to keep tasks in a set and use task.add_done_callback(tasks.discard) to clean them up.

Blocking the Event Loop: The Most Common Mistake
#

The entire async model collapses if you call a blocking function inside a coroutine. time.sleep, requests.get, any CPU-heavy computation, or a synchronous database driver will freeze the event loop for every other coroutine for the duration of that call.

blocking_mistake.py
import asyncio
import time
import requests  # synchronous HTTP library


async def bad_handler() -> str:
    # This blocks the ENTIRE event loop for 2 seconds.
    # No other coroutine can run while this sleeps.
    time.sleep(2)
    response = requests.get("https://api.example.com/data")  # also blocking
    return response.text


async def good_handler() -> str:
    # Non-blocking sleep: yields control to the event loop.
    await asyncio.sleep(2)
    # Non-blocking HTTP: use an async library.
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com/data") as response:
            return await response.text()

When you must call a blocking function (legacy library, CPU work, no async alternative), use run_in_executor to offload it to a thread pool:

run_in_executor.py
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)


def blocking_cpu_work(n: int) -> int:
    # Simulate CPU-bound work.
    time.sleep(0.5)
    return n * n


async def main() -> None:
    loop = asyncio.get_running_loop()
    # Run blocking_cpu_work in a thread without freezing the event loop.
    result = await loop.run_in_executor(executor, blocking_cpu_work, 42)
    print(result)
Tip

For true CPU-bound parallelism (image processing, ML inference, cryptography), use ProcessPoolExecutor instead of ThreadPoolExecutor. Threads share the GIL; processes do not.

Async Context Managers and Async Generators
#

Any resource that needs setup and teardown around I/O can implement __aenter__ and __aexit__.

async_context.py
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator


@asynccontextmanager
async def managed_connection(host: str) -> AsyncGenerator[dict, None]:
    print(f"connecting to {host}")
    await asyncio.sleep(0.01)  # simulate async connect
    conn = {"host": host, "open": True}
    try:
        yield conn
    finally:
        conn["open"] = False
        print(f"closed connection to {host}")


async def streamed_results(query: str) -> AsyncGenerator[dict, None]:
    # Async generator: yields rows as they arrive from the database.
    for i in range(5):
        await asyncio.sleep(0.01)
        yield {"row": i, "query": query}


async def main() -> None:
    async with managed_connection("db.internal") as conn:
        async for row in streamed_results("SELECT * FROM users"):
            print(row)


asyncio.run(main())

FastAPI: A Real Async Endpoint
#

FastAPI is built on Starlette, which uses an async event loop (uvicorn/uvloop). Async endpoint handlers let you make multiple I/O calls concurrently within a single request.

app/main.py
import asyncio
import httpx
from fastapi import FastAPI, HTTPException

app = FastAPI()

SERVICES = {
    "user": "https://internal.example.com/users",
    "order": "https://internal.example.com/orders",
    "inventory": "https://internal.example.com/inventory",
}


async def fetch_json(client: httpx.AsyncClient, url: str) -> dict:
    response = await client.get(url, timeout=2.0)
    response.raise_for_status()
    return response.json()


@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: str) -> dict:
    async with httpx.AsyncClient() as client:
        try:
            user, orders, inventory = await asyncio.gather(
                fetch_json(client, f"{SERVICES['user']}/{user_id}"),
                fetch_json(client, f"{SERVICES['order']}?user_id={user_id}"),
                fetch_json(client, f"{SERVICES['inventory']}?user_id={user_id}"),
                return_exceptions=True,
            )
        except Exception as exc:
            raise HTTPException(status_code=502, detail=str(exc)) from exc

    return {"user": user, "orders": orders, "inventory": inventory}

This endpoint fires three HTTP requests simultaneously. Total response time is bounded by the slowest service, not by the sum of all three latencies.

Note

FastAPI allows both sync and async handlers. If you define a handler as def (not async def), FastAPI runs it in a thread pool automatically. This means you can safely use synchronous database drivers in sync handlers. The mistake is defining an async def handler and then calling a blocking library inside it.

When Async is NOT Faster
#

Async gives you concurrency, not speed. It helps when:

  • You are waiting for I/O (network, database, filesystem).
  • You have many concurrent requests that would otherwise idle threads.

It does not help when:

  • Your work is CPU-bound (hashing, compression, ML inference).
  • You have very few concurrent requests (the overhead of the event loop scheduling costs more than it saves).
  • All your I/O is already fast (sub-millisecond in-process operations).
import time
import requests

def fetch_all_sync(urls: list[str]) -> list[str]:
    results = []
    for url in urls:
        # Each call waits for the previous one to complete.
        r = requests.get(url, timeout=2)
        results.append(r.text[:50])
    return results

# 10 URLs x 200ms each = ~2000ms total
start = time.monotonic()
fetch_all_sync(["https://httpbin.org/delay/0.2"] * 10)
print(f"sync: {time.monotonic() - start:.2f}s")  # ~2.0s
import asyncio
import time
import aiohttp

async def fetch_all_async(urls: list[str]) -> list[str]:
    async with aiohttp.ClientSession() as session:
        async def fetch(url: str) -> str:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=2)) as r:
                text = await r.text()
                return text[:50]

        return await asyncio.gather(*[fetch(url) for url in urls])

# 10 URLs x 200ms each, all concurrent = ~200ms total
start = time.monotonic()
asyncio.run(fetch_all_async(["https://httpbin.org/delay/0.2"] * 10))
print(f"async: {time.monotonic() - start:.2f}s")  # ~0.2s
import asyncio
import hashlib
import time

def hash_work(data: bytes, iterations: int) -> bytes:
    # CPU-bound: no I/O to yield on.
    h = data
    for _ in range(iterations):
        h = hashlib.sha256(h).digest()
    return h

async def wrong_approach() -> None:
    # This blocks the event loop for the entire duration.
    # asyncio.gather over CPU work gives zero benefit.
    await asyncio.gather(
        asyncio.to_thread(hash_work, b"data1", 100_000),
        asyncio.to_thread(hash_work, b"data2", 100_000),
    )
    # asyncio.to_thread is correct (uses thread pool),
    # but for true CPU parallelism use ProcessPoolExecutor.

Common Mistakes
#

Missing await on a coroutine call
result = some_async_function() without await gives you a coroutine object, not the result. The function body never executes. Python 3.10+ emits a warning, but in production logs these are often swallowed. Always await coroutine calls, or explicitly schedule them with create_task.
Calling time.sleep or requests.get inside async functions
Any blocking call inside an async function freezes the entire event loop. Under a FastAPI server, this means all concurrent requests freeze until your one blocking call returns. Replace time.sleep with await asyncio.sleep, and replace requests with httpx (async mode) or aiohttp.
Calling asyncio.run inside a running loop
asyncio.run creates and runs a new event loop. Calling it from within an already-running loop (e.g., from a Jupyter notebook or from inside a FastAPI handler) raises RuntimeError: This event loop is already running. Use await directly, or in Jupyter use the nest_asyncio library as a workaround.
Using async without an async-compatible library
Making your handler async def while using a synchronous ORM (like SQLAlchemy Core without async support) or a synchronous Redis client gives you the overhead of the event loop with none of the benefits. Audit every I/O library in your async service: if it is synchronous, run it in a thread pool or replace it with an async-native alternative.
Not handling task exceptions
A task created with create_task that raises an unhandled exception will not propagate that exception to the parent coroutine. The exception is stored in the task object and Python emits a warning when the task is garbage-collected. Always attach a done callback or await the task at some point to surface exceptions.

If you want to go deeper on any of this, I offer 1:1 coaching sessions for engineers working on AI integration, cloud architecture, and platform engineering. Book a session (50 EUR / 60 min) or reach out at manuel.fedele+website@gmail.com.

Related