La sintassi async/await di Python e’ arrivata in Python 3.5 ed e’ diventata genuinamente pronta per la produzione in Python 3.7 con asyncio.run. Oggi alimenta FastAPI, aiohttp e la maggior parte del moderno ecosistema async Python. Ma una grande frazione del codice async che vedo in giro ha bug sottili: await mancante, chiamate a librerie bloccanti dentro funzioni async, o lavoro CPU-intensivo che blocca l’event loop.
Questo post e’ un percorso sistematico su come funziona davvero l’async Python, quando aiuta, quando non aiuta, e i pattern che rendono il codice async corretto e manutenibile.
L’Event Loop: Cosa Fa Davvero#
L’event loop e’ un singolo thread OS che esegue un loop infinito. Ad ogni iterazione controlla: ci sono operazioni I/O completate? Se si’, riprendi la coroutine che le stava aspettando.
sequenceDiagram
participant EL as Event Loop
participant C1 as Coroutine 1
participant C2 as Coroutine 2
participant IO as OS / Network
EL->>C1: riprendi
C1->>IO: await socket.read()
C1-->>EL: sospendi (attesa IO)
EL->>C2: riprendi
C2->>IO: await asyncio.sleep(1)
C2-->>EL: sospendi
IO-->>EL: dati socket pronti
EL->>C1: riprendi con dati
C1-->>EL: completata
L’insight critico: non c’e’ parallelismo qui. Solo una coroutine e’ in esecuzione in qualsiasi istante. Il guadagno di velocita’ viene dal fatto che mentre la coroutine 1 aspetta una risposta di rete, l’event loop esegue la coroutine 2 invece di bloccare il thread. Se la coroutine 2 fa lavoro CPU per 500ms invece di fare await su qualcosa, la latenza della coroutine 1 cresce di 500ms indipendentemente da quanto sia veloce la rete.
Coroutine di Base#
Una funzione async def e’ una funzione coroutine. Chiamarla restituisce un oggetto coroutine; non esegue nulla fino a quando non la si await o la si schedula.
import asyncio
async def fetch_user(user_id: int) -> dict:
# Simula una query al database.
await asyncio.sleep(0.05)
return {"id": user_id, "name": "Alice"}
async def main() -> None:
user = await fetch_user(1)
print(user)
asyncio.run(main())Chiamare fetch_user(1) senza await non fa nulla e restituisce un oggetto coroutine. Python emette un RuntimeWarning: coroutine 'fetch_user' was never awaited nelle versioni piu’ recenti, ma il codice piu’ vecchio spesso scarta silenziosamente la coroutine. Questo e’ uno dei bug piu’ difficili da individuare nel codice async: una funzione sembra essere in esecuzione ma il suo risultato non viene mai usato e il suo I/O non avviene mai.
asyncio.gather: Eseguire Coroutine Concorrentemente#
Il pattern async del mondo reale piu’ comune e’ recuperare da piu’ sorgenti in modo concorrente e aspettare che tutte finiscano. asyncio.gather e’ lo strumento principale per questo.
import asyncio
import time
async def fetch_from_service(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} ha risposto dopo {delay}s"
async def main() -> None:
start = time.monotonic()
# Sequenziale: tempo totale = 0.3 + 0.2 + 0.4 = 0.9s
# r1 = await fetch_from_service("users", 0.3)
# r2 = await fetch_from_service("orders", 0.2)
# r3 = await fetch_from_service("inventory", 0.4)
# Concorrente con gather: tempo totale = max(0.3, 0.2, 0.4) = ~0.4s
r1, r2, r3 = await asyncio.gather(
fetch_from_service("users", 0.3),
fetch_from_service("orders", 0.2),
fetch_from_service("inventory", 0.4),
)
elapsed = time.monotonic() - start
print(f"Tutto fatto in {elapsed:.2f}s")
print(r1, r2, r3)
asyncio.run(main())asyncio.gather restituisce i risultati nello stesso ordine delle coroutine in input, indipendentemente da quale ha finito per prima. Se una coroutine solleva un’eccezione, gather annulla le altre e la ri-solleva per default. Passa return_exceptions=True per raccogliere le eccezioni come valori invece di propagarle.
asyncio.create_task vs gather#
asyncio.gather esegue le coroutine concorrentemente e aspetta tutte. asyncio.create_task schedula una coroutine per eseguire in background e restituisce immediatamente un handle Task.
import asyncio
async def background_job() -> None:
await asyncio.sleep(1)
print("job in background completato")
async def main() -> None:
# Fire-and-forget: avvia il job ma non aspettarlo.
task = asyncio.create_task(background_job())
# Fai altro lavoro mentre background_job e' in esecuzione.
await asyncio.sleep(0.1)
print("lavoro principale completato")
# Opzionalmente aspettalo prima dello shutdown.
await task
asyncio.run(main())Usa create_task quando vuoi fire-and-forget o gestire esplicitamente il ciclo di vita dei task. Usa gather quando hai bisogno di tutti i risultati prima di continuare.
Mantieni sempre un riferimento ai task creati con create_task. Se l’oggetto task viene raccolto dal garbage collector prima che si completi, Python lo annulla. Il pattern comune e’ tenere i task in un set e usare task.add_done_callback(tasks.discard) per pulirli.
Bloccare l’Event Loop: L’Errore Piu’ Comune#
L’intero modello async crolla se chiami una funzione bloccante dentro una coroutine. time.sleep, requests.get, qualsiasi computazione CPU-intensive, o un driver di database sincrono bloccheranno l’event loop per tutte le altre coroutine per la durata di quella chiamata.
import asyncio
import time
import requests # libreria HTTP sincrona
async def bad_handler() -> str:
# Questo blocca l'INTERO event loop per 2 secondi.
# Nessun'altra coroutine puo' girare mentre questo dorme.
time.sleep(2)
response = requests.get("https://api.example.com/data") # anche bloccante
return response.text
async def good_handler() -> str:
# Sleep non bloccante: cede il controllo all'event loop.
await asyncio.sleep(2)
# HTTP non bloccante: usa una libreria async.
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as response:
return await response.text()Quando devi chiamare una funzione bloccante (libreria legacy, lavoro CPU, nessuna alternativa async), usa run_in_executor per scaricarla su un thread pool:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def blocking_cpu_work(n: int) -> int:
# Simula lavoro CPU-bound.
time.sleep(0.5)
return n * n
async def main() -> None:
loop = asyncio.get_running_loop()
# Esegui blocking_cpu_work in un thread senza bloccare l'event loop.
result = await loop.run_in_executor(executor, blocking_cpu_work, 42)
print(result)Per il vero parallelismo CPU-bound (elaborazione immagini, inferenza ML, crittografia), usa ProcessPoolExecutor invece di ThreadPoolExecutor. I thread condividono il GIL; i processi no.
Context Manager Async e Generator Async#
Qualsiasi risorsa che necessita di setup e teardown attorno all’I/O puo’ implementare __aenter__ e __aexit__.
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
@asynccontextmanager
async def managed_connection(host: str) -> AsyncGenerator[dict, None]:
print(f"connessione a {host}")
await asyncio.sleep(0.01) # simula connessione async
conn = {"host": host, "open": True}
try:
yield conn
finally:
conn["open"] = False
print(f"connessione a {host} chiusa")
async def streamed_results(query: str) -> AsyncGenerator[dict, None]:
# Generator async: restituisce righe man mano che arrivano dal database.
for i in range(5):
await asyncio.sleep(0.01)
yield {"row": i, "query": query}
async def main() -> None:
async with managed_connection("db.internal") as conn:
async for row in streamed_results("SELECT * FROM users"):
print(row)
asyncio.run(main())FastAPI: Un Endpoint Async Reale#
FastAPI e’ costruito su Starlette, che usa un event loop async (uvicorn/uvloop). Gli handler di endpoint async ti permettono di fare piu’ chiamate I/O concorrentemente all’interno di una singola richiesta.
import asyncio
import httpx
from fastapi import FastAPI, HTTPException
app = FastAPI()
SERVICES = {
"user": "https://internal.example.com/users",
"order": "https://internal.example.com/orders",
"inventory": "https://internal.example.com/inventory",
}
async def fetch_json(client: httpx.AsyncClient, url: str) -> dict:
response = await client.get(url, timeout=2.0)
response.raise_for_status()
return response.json()
@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: str) -> dict:
async with httpx.AsyncClient() as client:
try:
user, orders, inventory = await asyncio.gather(
fetch_json(client, f"{SERVICES['user']}/{user_id}"),
fetch_json(client, f"{SERVICES['order']}?user_id={user_id}"),
fetch_json(client, f"{SERVICES['inventory']}?user_id={user_id}"),
return_exceptions=True,
)
except Exception as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return {"user": user, "orders": orders, "inventory": inventory}Questo endpoint lancia tre richieste HTTP simultaneamente. Il tempo di risposta totale e’ limitato dal servizio piu’ lento, non dalla somma delle tre latenze.
FastAPI permette handler sia sync che async. Se definisci un handler come def (non async def), FastAPI lo esegue automaticamente in un thread pool. Questo significa che puoi usare driver di database sincroni in handler sync. L’errore e’ definire un handler async def e poi chiamare una libreria bloccante al suo interno.
Quando Async NON E’ Piu’ Veloce#
Async ti da concorrenza, non velocita’. Aiuta quando:
- Stai aspettando I/O (rete, database, filesystem).
- Hai molte richieste concorrenti che altrimenti bloccherebbero i thread.
Non aiuta quando:
- Il lavoro e’ CPU-bound (hashing, compressione, inferenza ML).
- Hai pochissime richieste concorrenti (l’overhead dello scheduling dell’event loop costa piu’ di quello che risparmia).
- Il tuo I/O e’ gia’ veloce (operazioni in-process sub-millisecondo).
import time
import requests
def fetch_all_sync(urls: list[str]) -> list[str]:
results = []
for url in urls:
# Ogni chiamata aspetta che la precedente si completi.
r = requests.get(url, timeout=2)
results.append(r.text[:50])
return results
# 10 URL x 200ms ciascuno = ~2000ms totali
start = time.monotonic()
fetch_all_sync(["https://httpbin.org/delay/0.2"] * 10)
print(f"sync: {time.monotonic() - start:.2f}s") # ~2.0simport asyncio
import time
import aiohttp
async def fetch_all_async(urls: list[str]) -> list[str]:
async with aiohttp.ClientSession() as session:
async def fetch(url: str) -> str:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=2)) as r:
text = await r.text()
return text[:50]
return await asyncio.gather(*[fetch(url) for url in urls])
# 10 URL x 200ms ciascuno, tutti concorrenti = ~200ms totali
start = time.monotonic()
asyncio.run(fetch_all_async(["https://httpbin.org/delay/0.2"] * 10))
print(f"async: {time.monotonic() - start:.2f}s") # ~0.2simport asyncio
import hashlib
import time
def hash_work(data: bytes, iterations: int) -> bytes:
# CPU-bound: nessun I/O su cui cedere.
h = data
for _ in range(iterations):
h = hashlib.sha256(h).digest()
return h
async def wrong_approach() -> None:
# Questo blocca l'event loop per tutta la durata.
# asyncio.gather su lavoro CPU non da' nessun beneficio.
await asyncio.gather(
asyncio.to_thread(hash_work, b"data1", 100_000),
asyncio.to_thread(hash_work, b"data2", 100_000),
)
# asyncio.to_thread e' corretto (usa thread pool),
# ma per il vero parallelismo CPU usa ProcessPoolExecutor.Errori Comuni#
await mancante su una chiamata a coroutine
result = some_async_function() senza await ti da un oggetto coroutine, non il risultato. Il corpo della funzione non viene mai eseguito. Python 3.10+ emette un warning, ma nei log di produzione questi sono spesso inghiottiti. Fai sempre await sulle chiamate a coroutine, oppure schedulale esplicitamente con create_task.Chiamare time.sleep o requests.get dentro funzioni async
time.sleep con await asyncio.sleep, e sostituisci requests con httpx (modalita’ async) o aiohttp.Chiamare asyncio.run dentro un loop gia' in esecuzione
asyncio.run crea ed esegue un nuovo event loop. Chiamarlo dall’interno di un loop gia’ in esecuzione (ad es. da un notebook Jupyter o dall’interno di un handler FastAPI) solleva RuntimeError: This event loop is already running. Usa direttamente await, oppure in Jupyter usa la libreria nest_asyncio come soluzione temporanea.Usare async senza una libreria compatibile async
async def mentre usi un ORM sincrono (come SQLAlchemy Core senza supporto async) o un client Redis sincrono ti da l’overhead dell’event loop senza nessuno dei benefici. Verifica ogni libreria I/O nel tuo servizio async: se e’ sincrona, eseguila in un thread pool o sostituiscila con un’alternativa async-nativa.Non gestire le eccezioni dei task
create_task che solleva un’eccezione non gestita non propaghera’ quell’eccezione alla coroutine padre. L’eccezione e’ memorizzata nell’oggetto task e Python emette un warning quando il task viene raccolto dal garbage collector. Aggiungi sempre un done callback o fai await del task ad un certo punto per far emergere le eccezioni.Se vuoi approfondire uno di questi argomenti, offro sessioni di coaching 1:1 per ingegneri che lavorano su integrazione AI, architettura cloud e platform engineering. Prenota una sessione (50 EUR / 60 min) o scrivimi a manuel.fedele+website@gmail.com.