I problemi più difficili nel software finanziario non sono algoritmici. Sono architetturali. Una piattaforma di trading deve essere corretta prima di essere veloce, consistente prima di essere elegante, e auditabile prima di tutto il resto. Deve anche colmare un disallineamento fondamentale al cuore di ogni sistema di trading mai costruito: gli utenti interagiscono in modo sincrono (cliccano “compra” e si aspettano una risposta), ma i sistemi sottostanti sono intrinsecamente asincroni. Le API dei broker consegnano i fill tramite callback. I dati di mercato arrivano come uno stream di push continuo. I calcoli di rischio avvengono in pipeline parallele. Lo stato delle posizioni è per natura eventually consistent.
Questo articolo descrive un’architettura completa per una piattaforma di trading algoritmico teorica ma production-realistica costruita su AWS. Il sistema serve 15.000 utenti concorrenti, gestisce l’esecuzione degli ordini con acknowledgement sotto il secondo, fa streaming real-time dei dati di mercato, applica controlli di rischio pre-trade e mantiene uno stato coerente delle posizioni su servizi distribuiti. Descriverò ogni layer, i pattern che lo fanno funzionare e i tradeoff coinvolti.
La Tensione Centrale: Utenti Sincroni, Mercati Asincroni#
Prima di qualsiasi diagramma, il problema centrale merita una formulazione precisa.
Quando un utente invia un ordine tramite la UI web:
- Il browser effettua una HTTP POST sincrona. L’utente si aspetta una risposta immediata.
- Il backend deve validare l’ordine, eseguire i controlli di rischio pre-trade e pubblicarlo su una coda di elaborazione.
- L’Execution Engine lo raccoglie e lo invia al broker tramite il Broker Gateway.
- L’API del broker è intrinsecamente callback-based. La chiamata di invio ritorna immediatamente. La conferma di esecuzione (fill, fill parziale, o rifiuto) arriva in seguito tramite un callback su un thread o una connessione diversi.
- Nel frattempo, l’utente sta osservando il widget dello stato dell’ordine aggiornarsi in tempo reale.
Non esiste una risposta unica e corretta a questo problema, ma esiste una risposta architetturale ben definita: separare l’acknowledgement di ricezione dall’acknowledgement di esecuzione, usare correlation ID per tracciare lo stato attraverso la pipeline asincrona, e servire lo stato di posizioni e ordini da una materialized view piuttosto che dallo stream di eventi live.
Queste tre idee compaiono in ogni layer di ciò che segue.
Panoramica del Sistema#
flowchart TD
CDN[CloudFront CDN\nStatic Assets] --> Users([15,000 Concurrent Users])
Users --> WAF[AWS WAF]
WAF --> ALB[External ALB\nTLS 1.3]
ALB --> WebTier[Web Tier\nECS Fargate\n8 tasks min / 40 max]
WebTier --> SessionCache[(ElastiCache Redis\nSession + Rate Limit)]
WebTier --> APIGW[Internal API Gateway]
APIGW --> OMS[Order Management\nService]
APIGW --> MDS[Market Data\nService]
APIGW --> PS[Position\nService]
APIGW --> RS[Risk\nService]
OMS --> MSK[(Amazon MSK\nKafka)]
MDS --> Kinesis[(Amazon Kinesis\nData Streams)]
RS --> MSK
MSK --> ExecEngine[Execution\nEngine]
MSK --> AuditSvc[Audit\nService]
MSK --> NotifSvc[Notification\nService]
ExecEngine --> BrokerGW[Broker\nGateway]
BrokerGW --> ExtBroker([External Broker\nCallback API])
BrokerGW --> MSK
Kinesis --> MDS
PS --> DDB[(DynamoDB\nPosition State)]
OMS --> Aurora[(Aurora PostgreSQL\nOrder Ledger)]
AuditSvc --> S3[(S3\nAudit Archive)]
NotifSvc --> Redis2[(ElastiCache Redis\nPub/Sub)]
WebTier --> Redis2
Gli otto servizi primari hanno ognuno una responsabilità singola e delimitata. Non si tratta di microservizi per il gusto di farlo. Ogni confine corrisponde a un modello di consistenza, un profilo di scaling e una preoccupazione operativa genuinamente diversi.
| Servizio | Responsabilità | Modello di Consistenza |
|---|---|---|
| Web Tier | Sessioni utente, SSE, REST API | Stateless |
| Order Management | Ciclo di vita degli ordini, validazione | Strong (transazionale) |
| Risk Service | Controlli pre-trade | Eventually consistent in lettura, sync in scrittura |
| Execution Engine | Invio al broker, gestione dei fill | At-least-once con idempotenza |
| Broker Gateway | Bridge async/sync verso l’API del broker | Stateful, single-writer |
| Market Data Service | Ingestione e distribuzione dello stream | Best-effort, ordinato |
| Position Service | Materializzazione di posizioni e P&L | Eventually consistent |
| Audit Service | Log eventi immutabile | Append-only |
Principi SOLID in Pratica#
Prima di approfondire ogni servizio, vale la pena dichiarare quali principi di design governano i confini del sistema. Non sono riferimenti decorativi. Ogni principio risolve una specifica decisione architetturale.
Single Responsibility Principle: il Broker Gateway ha esattamente un compito: fare da bridge tra l’event loop asyncio e l’API callback-based del broker. Non valida ordini, non calcola il rischio, non aggiorna posizioni. Qualsiasi logica oltre il bridge appartiene altrove.
Open/Closed Principle: l’Execution Engine è chiuso alla modifica ma aperto all’estensione tramite un protocollo BrokerAdapter. Aggiungere supporto per un nuovo broker (Interactive Brokers, Alpaca, Binance) significa implementare l’interfaccia dell’adapter, non toccare il motore. La stessa logica vale per lo Strategy Engine, che usa un protocollo SignalGenerator che le strategie implementano.
Liskov Substitution Principle: tutti i broker adapter sono intercambiabili al confine dell’interfaccia BrokerAdapter. Un adapter per il paper trading, uno per il trading live e uno per il replay si comportano in modo identico dalla prospettiva dell’Execution Engine. La sostituzione è verificata da una suite di contract test condivisa.
Interface Segregation Principle: l’Order Management Service espone tre interfacce separate: una per il piazzamento degli ordini (lato scrittura), una per le query sugli ordini (lato lettura) e una per gli eventi di cambio di stato interno consumati dai servizi downstream. I client che eseguono solo query sugli ordini non dipendono mai dall’interfaccia di scrittura.
Dependency Inversion Principle: nessun servizio importa direttamente un SDK broker concreto, uno specifico driver di database o un client specifico per le code di messaggi. Tutte le dipendenze infrastrutturali sono iniettate tramite interfacce astratte. I unit test dell’Execution Engine girano interamente in memoria con un broker adapter finto.
Il Bridge Async/Sync: il Broker Gateway#
Questo è il componente tecnicamente più interessante del sistema, e quello più comunemente implementato in modo sbagliato.
Le API dei broker (Interactive Brokers TWS, la maggior parte delle implementazioni FIX) sono intrinsecamente callback-based. Si invia un ordine, e la conferma arriva in seguito su un callback diverso. L’implementazione ingenua usa thread e blocking queue. L’implementazione moderna e corretta usa asyncio di Python con esplicit Future bridging e call_soon_threadsafe.
Pattern 1: Da Sincrono ad Asincrono (Invio Ordine)#
L’utente invia un ordine tramite HTTP POST. Il web tier chiama l’Order Management Service in modo sincrono. L’OMS valida, persiste l’ordine e pubblica un comando su Kafka. Questa è la transizione sync-to-async: un’azione utente sincrona convertita in un comando asincrono. La risposta HTTP porta un order ID e lo stato PENDING. Il widget di stato dell’ordine dell’utente si iscrive tramite SSE per i successivi cambiamenti di stato.
import asyncio
from dataclasses import dataclass
from typing import Protocol
class BrokerAdapter(Protocol):
"""Abstract broker interface. Concrete adapters implement this."""
def submit_order(self, order_id: str, symbol: str,
side: str, qty: int, order_type: str) -> None: ...
def cancel_order(self, order_id: str) -> None: ...
def set_callbacks(self, on_fill: callable, on_reject: callable) -> None: ...
@dataclass
class OrderFill:
order_id: str
filled_qty: int
avg_price: float
timestamp: str
class BrokerGateway:
"""
Bridges the broker's callback-based API (running on a separate thread
or event loop) into asyncio Futures consumable by the Execution Engine.
This solves the sync-to-async problem: the Execution Engine awaits
a Future. The Future is resolved when the broker fires its callback,
using call_soon_threadsafe to cross the thread boundary safely.
"""
def __init__(self, adapter: BrokerAdapter):
self._adapter = adapter
self._pending: dict[str, asyncio.Future[OrderFill]] = {}
self._loop: asyncio.AbstractEventLoop | None = None
def start(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop
self._adapter.set_callbacks(
on_fill=self._on_fill_callback,
on_reject=self._on_reject_callback,
)
async def submit(self, order_id: str, symbol: str,
side: str, qty: int, order_type: str,
timeout: float = 10.0) -> OrderFill:
"""
Submits order to broker and awaits confirmation.
Returns when the broker confirms the fill or raises on timeout/reject.
"""
loop = asyncio.get_running_loop()
future: asyncio.Future[OrderFill] = loop.create_future()
self._pending[order_id] = future
# Non-blocking submission
self._adapter.submit_order(order_id, symbol, side, qty, order_type)
try:
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
self._pending.pop(order_id, None)
raise BrokerTimeoutError(order_id)
def _on_fill_callback(self, order_id: str, filled_qty: int,
avg_price: float, timestamp: str) -> None:
"""
Called by the broker SDK on its own thread.
Must NOT touch asyncio primitives directly.
call_soon_threadsafe schedules the resolution in the event loop.
"""
future = self._pending.pop(order_id, None)
if future and not future.done():
fill = OrderFill(order_id, filled_qty, avg_price, timestamp)
self._loop.call_soon_threadsafe(future.set_result, fill)
def _on_reject_callback(self, order_id: str, reason: str) -> None:
future = self._pending.pop(order_id, None)
if future and not future.done():
exc = OrderRejectedException(order_id, reason)
self._loop.call_soon_threadsafe(future.set_exception, exc)Il dettaglio critico è call_soon_threadsafe. L’SDK del broker scatena i callback dal proprio thread. Chiamare future.set_result() direttamente da un thread esterno causerebbe race condition negli internals di asyncio. call_soon_threadsafe pianifica la risoluzione all’interno dell’event loop sul suo thread, rendendo l’operazione sicura.
Pattern 2: Da Asincrono a Sincrono (Query delle Posizioni)#
Il problema opposto è altrettanto comune. I dati di mercato e i fill degli ordini arrivano come stream asincrono. Una query sulle posizioni di un utente richiede una risposta coerente e sincrona.
La soluzione è il pattern della materialized view. Il Position Service si iscrive agli eventi di fill da Kafka, aggiorna una tabella DynamoDB in quasi-real-time e serve le query sulle posizioni direttamente da DynamoDB. L’utente non interroga mai lo stream di eventi live. Interroga uno snapshot pre-calcolato e indicizzato.
sequenceDiagram
participant BrokerGW as Broker Gateway
participant Kafka as Amazon MSK (Kafka)
participant PosSvc as Position Service
participant DDB as DynamoDB
participant WebTier as Web Tier
participant User
BrokerGW->>Kafka: publish order.filled event
Kafka->>PosSvc: consume order.filled
PosSvc->>DDB: update_item (atomic increment qty, recalc P&L)
DDB-->>PosSvc: ok
Note over PosSvc,DDB: Async pipeline, eventually consistent
User->>WebTier: GET /api/positions
WebTier->>PosSvc: getPositions(userId)
PosSvc->>DDB: query by userId
DDB-->>PosSvc: position snapshot
PosSvc-->>WebTier: positions
WebTier-->>User: 200 OK (consistent snapshot)
Note over WebTier,User: Sync request served from materialized view
Il bridge async-to-sync qui è semplicemente un consumer Kafka che scrive su uno store interrogabile. Le decisioni chiave sono: la scrittura su DynamoDB deve essere idempotente (l’evento porta un fill ID univoco usato come chiave di idempotenza), e la scrittura deve essere atomica per i campi di quantità e P&L per evitare aggiornamenti parziali.
Pattern 3: Request-Response con Correlation ID#
Per le operazioni che richiedono una risposta con l’aspetto sincrono da un sistema intrinsecamente asincrono (come richiedere una quotazione real-time prima dell’invio dell’ordine), il sistema usa il pattern del correlation ID su Kafka.
sequenceDiagram
participant WebTier as Web Tier
participant OMS as Order Management Service
participant Kafka
participant ExecEngine as Execution Engine
participant BrokerGW as Broker Gateway
WebTier->>OMS: POST /orders (sync HTTP)
OMS->>OMS: validate + persist (status=PENDING)
OMS->>Kafka: publish order.submitted (correlation_id=X)
OMS-->>WebTier: 202 Accepted (order_id, status=PENDING)
Note over WebTier: User UI subscribes via SSE for status updates
Kafka->>ExecEngine: consume order.submitted
ExecEngine->>BrokerGW: await submit(order, timeout=10s)
BrokerGW->>BrokerGW: call_soon_threadsafe awaits fill callback
BrokerGW-->>ExecEngine: OrderFill
ExecEngine->>Kafka: publish order.filled (correlation_id=X)
Kafka->>OMS: consume order.filled -> update order status
Kafka->>PosSvc: consume order.filled -> update positions
Kafka->>NotifSvc: consume order.filled -> push to user SSE
NotifSvc-->>WebTier: SSE event: order X filled
WebTier-->>User: real-time status update
L’utente invia un ordine e riceve immediatamente un 202 Accepted con un order ID. Non aspetta l’esecuzione del broker. Il canale SSE invia la notifica di fill quando arriva. Dal punto di vista dell’utente, l’esperienza è quasi real-time. Dal punto di vista del sistema, ogni componente fa esattamente ciò per cui è ottimizzato.
Order Management Service: Separazione di Comando e Query#
L’Order Management Service implementa CQRS (Command Query Responsibility Segregation) in modo esplicito. Le operazioni di scrittura (piazzare un ordine, cancellarlo, modificarlo) passano attraverso un command handler che persiste su Aurora PostgreSQL e pubblica eventi su Kafka. Le operazioni di lettura (ottenere un ordine, listare gli ordini, storico degli ordini) interrogano un read model mantenuto da un processo di proiezione che consuma gli stessi eventi Kafka.
flowchart LR
subgraph Write Side
CMD[Command Handler] --> Validate[Validator]
Validate --> RiskCheck[Risk Pre-Check\nSync call to Risk Service]
RiskCheck --> Persist[(Aurora PostgreSQL\nOrder Ledger)]
RiskCheck --> Publish[Kafka Publisher]
end
subgraph Read Side
Projection[Projection Process\nKafka Consumer] --> ReadDB[(Aurora Read Replica\nOrder Read Model)]
QueryHandler[Query Handler] --> ReadDB
end
subgraph External
Publish --> KafkaTopic[(order.commands\ntopic)]
KafkaTopic --> Projection
end
from dataclasses import dataclass, field
from enum import Enum
from typing import Protocol
import uuid
class OrderSide(str, Enum):
BUY = "buy"
SELL = "sell"
class OrderType(str, Enum):
MARKET = "market"
LIMIT = "limit"
STOP = "stop"
@dataclass(frozen=True)
class PlaceOrderCommand:
"""Immutable command. Implements the Command Pattern."""
user_id: str
symbol: str
side: OrderSide
qty: int
order_type: OrderType
limit_price: float | None = None
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
class CommandHandler(Protocol):
"""ISP: command handler interface segregated from query interface."""
async def handle(self, command: PlaceOrderCommand) -> str: ...
class PlaceOrderHandler:
"""
Single Responsibility: coordinates validation, risk check, and persistence.
Dependency Inversion: depends on abstractions, not concrete implementations.
"""
def __init__(
self,
validator: "OrderValidator",
risk_client: "RiskServiceClient",
repository: "OrderRepository",
publisher: "KafkaPublisher",
):
self._validator = validator
self._risk = risk_client
self._repo = repository
self._publisher = publisher
async def handle(self, cmd: PlaceOrderCommand) -> str:
# Validate structure
self._validator.validate(cmd)
# Synchronous pre-trade risk check (blocking, must pass before persist)
risk_result = await self._risk.check(cmd)
if not risk_result.approved:
raise RiskViolationError(risk_result.reason)
# Persist with PENDING status
order_id = await self._repo.insert(cmd)
# Publish command event (fire-and-forget, broker will confirm later)
await self._publisher.publish("order.submitted", {
"order_id": order_id,
"correlation_id": cmd.correlation_id,
"user_id": cmd.user_id,
"symbol": cmd.symbol,
"side": cmd.side,
"qty": cmd.qty,
"order_type": cmd.order_type,
"limit_price": cmd.limit_price,
})
return order_idIl controllo di rischio pre-trade è deliberatamente sincrono qui. L’ordine non deve essere persistito né pubblicato se viola i limiti di rischio. Questo è l’unico punto nel flusso degli ordini in cui il sistema si blocca.
Risk Engine: Controlli Pre-Trade in Tempo Reale#
Il Risk Engine applica i limiti prima che qualsiasi ordine raggiunga il broker. Mantiene le tabelle dei limiti in Redis (letture veloci) e si iscrive agli eventi di posizione e dati di mercato per tenere aggiornate le esposizioni calcolate.
flowchart TD
subgraph Risk Engine
RiskAPI[Risk API\nSync gRPC]
LimitStore[(Redis\nLimit Tables)]
ExposureCalc[Exposure Calculator\nAsync Consumer]
RulesEngine[Rules Engine\nOpen/Closed via Rule Protocol]
RiskAPI --> RulesEngine
RulesEngine --> LimitStore
ExposureCalc --> LimitStore
end
OMS[Order Management\nService] -->|gRPC check| RiskAPI
MSK[(Kafka)] -->|order.filled\nposition.updated| ExposureCalc
MktData[(Kinesis)] -->|price.updated| ExposureCalc
Le regole di rischio seguono l’Open/Closed Principle: il motore esegue qualsiasi oggetto che implementa il protocollo RiskRule. Aggiungere una nuova regola non richiede alcuna modifica al motore stesso.
from typing import Protocol
class RiskRule(Protocol):
"""Open for extension: implement this to add new risk controls."""
name: str
async def evaluate(self, cmd: "PlaceOrderCommand",
context: "RiskContext") -> "RuleResult": ...
class MaxOrderSizeRule:
name = "max_order_size"
async def evaluate(self, cmd, context):
limit = await context.limits.get(cmd.user_id, "max_order_size")
notional = cmd.qty * context.last_price(cmd.symbol)
if notional > limit:
return RuleResult(approved=False,
reason=f"Order notional {notional:.2f} exceeds limit {limit:.2f}")
return RuleResult(approved=True)
class MaxConcentrationRule:
name = "max_concentration"
async def evaluate(self, cmd, context):
current_exposure = await context.exposure(cmd.user_id, cmd.symbol)
new_qty = current_exposure.qty + (cmd.qty if cmd.side == "buy" else -cmd.qty)
portfolio_value = await context.portfolio_value(cmd.user_id)
concentration = abs(new_qty * context.last_price(cmd.symbol)) / portfolio_value
if concentration > 0.20: # 20% max single-name concentration
return RuleResult(approved=False,
reason=f"Concentration {concentration:.1%} exceeds 20% limit")
return RuleResult(approved=True)
class RiskEngine:
"""Closed for modification. Add rules by passing them at construction."""
def __init__(self, rules: list[RiskRule], context_factory: "RiskContextFactory"):
self._rules = rules
self._context_factory = context_factory
async def check(self, cmd: "PlaceOrderCommand") -> "RiskResult":
context = await self._context_factory.build(cmd.user_id)
for rule in self._rules:
result = await rule.evaluate(cmd, context)
if not result.approved:
return RiskResult(approved=False, reason=f"[{rule.name}] {result.reason}")
return RiskResult(approved=True)Il controllo di rischio gira in meno di 5 millisecondi per cache Redis ben scaldate. Se Redis non è disponibile, il motore fallisce in modalità chiusa: tutti gli ordini vengono rifiutati fino al recupero della cache. Questa è la modalità di fallimento corretta per un sistema di rischio.
Architettura dei Dati di Mercato: Kinesis e Fan-Out#
I dati di mercato sono il componente ad alto throughput. Al picco, il sistema ingestisce aggiornamenti di prezzo per migliaia di simboli su più exchange simultaneamente. L’architettura separa l’ingestione (feed grezzo su Kinesis) dalla distribuzione (fan-out agli iscritti).
flowchart TD
subgraph Ingestion
Feed1[Exchange Feed A\nWebSocket] --> Normalizer1[Normalizer Lambda]
Feed2[Exchange Feed B\nFIX] --> Normalizer2[Normalizer Lambda]
Feed3[Exchange Feed C\nREST Poll] --> Normalizer3[Normalizer Lambda]
Normalizer1 --> Kinesis[(Kinesis Data Streams\n8 shards)]
Normalizer2 --> Kinesis
Normalizer3 --> Kinesis
end
subgraph Distribution
Kinesis --> MDSConsumer[Market Data Service\nKinesis Consumer]
MDSConsumer --> PriceCache[(Redis\nLast Price Cache\nTTL 5s)]
MDSConsumer --> CandleAgg[Candle Aggregator\n1m 5m 1h OHLCV]
MDSConsumer --> RiskFeed[Risk Engine Feed\nvia SQS]
MDSConsumer --> SSEBroadcast[SSE Broadcaster\nper-symbol channels]
end
subgraph User Delivery
SSEBroadcast --> RedisPS[(Redis Pub/Sub\nper-symbol channels)]
WebTier[Web Tier\nSSE endpoint] --> RedisPS
Users([Users with\nactive subscriptions]) --> WebTier
end
Le Lambda normalizzatrici convertono i formati specifici di ciascun exchange in uno schema PriceUpdate canonico prima di scrivere su Kinesis. Ogni shard gestisce un range non sovrapposto di simboli (partizionati per hash del simbolo). Questo garantisce l’ordinamento per simbolo abilitando al contempo il parallelismo orizzontale.
import asyncio
import json
from dataclasses import dataclass
@dataclass(frozen=True)
class PriceUpdate:
symbol: str
bid: float
ask: float
last: float
volume: int
timestamp_ms: int
exchange: str
class MarketDataConsumer:
"""
Async consumer for Kinesis records.
Applies the Observer Pattern: registered handlers are called for each update.
Handlers are segregated by interest (ISP): risk only subscribes to price,
candle builder subscribes to price+volume, SSE broadcast subscribes to all.
"""
def __init__(self, kinesis_client, redis_client, publisher):
self._kinesis = kinesis_client
self._redis = redis_client
self._publisher = publisher
self._handlers: list[callable] = []
def register(self, handler: callable) -> None:
"""Open/Closed: add handlers without modifying the consumer."""
self._handlers.append(handler)
async def run(self, stream_name: str, shard_id: str) -> None:
iterator = await self._get_shard_iterator(stream_name, shard_id)
while True:
records = await self._kinesis.get_records(ShardIterator=iterator,
Limit=500)
for record in records["Records"]:
update = PriceUpdate(**json.loads(record["Data"]))
await self._process(update)
iterator = records["NextShardIterator"]
if not records["Records"]:
await asyncio.sleep(0.1) # Kinesis polling interval
async def _process(self, update: PriceUpdate) -> None:
# Update last price cache (used by risk engine)
await self._redis.setex(
f"price:{update.symbol}",
5, # TTL: stale prices are dangerous
f"{update.last}",
)
# Fan out to all registered handlers concurrently
await asyncio.gather(
*[handler(update) for handler in self._handlers],
return_exceptions=True, # One failing handler must not block others
)Il return_exceptions=True in asyncio.gather non è opzionale. Un bug nell’aggregatore di candele non deve impedire al feed di rischio di ricevere aggiornamenti di prezzo. I fallimenti di ogni handler vengono loggati ma non propagati.
Web Tier: Scalare a 15.000 Utenti Concorrenti#
Servire 15.000 utenti concorrenti richiede di pensare attentamente a cosa significhi davvero “concorrente”. La maggior parte di quelle connessioni è inattiva la maggior parte del tempo: un utente con la dashboard di trading aperta mantiene una connessione SSE per i dati live ma non sta effettivamente effettuando chiamate API. Il numero di connessioni è quindi molto più alto del tasso di richieste.
flowchart TD
CF[CloudFront\nStatic SPA\nCache-Control: immutable] --> ALB
ALB[External ALB\nHTTP/2\nSticky for SSE] --> ECS
subgraph ECS Fargate Cluster
ECS[Web Service\n8 min / 40 max tasks\n2 vCPU / 4 GB each]
ECS --> Uvicorn[Uvicorn ASGI\n4 workers per task]
end
ECS --> SessionRedis[(ElastiCache Redis\nSession Store\nCluster Mode 3 shards)]
ECS --> RateLimit[(ElastiCache Redis\nRate Limiter\nSliding Window per user)]
ECS --> SSERedis[(ElastiCache Redis\nPub/Sub\nMarket Data Fan-out)]
ECS --> Services[Internal Services\nvia Service Discovery]
subgraph Autoscaling
CW[CloudWatch\nMetrics] --> ASG[ECS Service\nAutoscaling]
ASG --> ECS
end
Decisioni chiave per il web tier a questa scala:
ASGI invece di WSGI. Django con Uvicorn in modalità ASGI gestisce connessioni SSE concorrenti in modo efficiente. Un server WSGI sincrono creerebbe un thread per connessione, esaurendo le risorse a questo livello di concorrenza. ASGI usa il multitasking cooperativo: l’attesa su SSE o I/O su database cede l’event loop ad altre richieste.
SSE invece di WebSocket per i dati di mercato. SSE è unidirezionale (dal server al client), che è tutto ciò di cui la consegna dei dati di mercato ha bisogno. Funziona su HTTP/2, sopravvive ai reconnect del load balancer ed è più semplice da implementare e debuggare rispetto a WebSocket a questa scala. L’invio degli ordini usa normali richieste POST.
Sticky session sull’ALB solo per SSE. Le connessioni SSE sono di lunga durata e legate a un task specifico. La duration-based stickiness dell’ALB (cookie da 1 ora) garantisce che le connessioni SSE non vengano instradata verso un task diverso a metà stream. Le normali richieste API usano il round-robin.
Rate limiting al web tier, non ai servizi. Ogni task applica un rate limit a finestra scorrevole per utente tramite Redis prima di fare proxy verso i servizi interni. Questo impedisce ai singoli utenti di saturare i servizi downstream con script automatizzati.
import asyncio
import json
from django.http import StreamingHttpResponse
async def market_data_stream(request, symbols: str):
"""
SSE endpoint. Each active subscription is a long-lived async generator.
The ALB sticky session ensures this generator stays on one task.
"""
symbol_list = symbols.split(",")
user_id = request.user.id
pubsub = request.app.redis.pubsub()
# Subscribe to per-symbol Redis channels
channels = [f"mkt:{sym}" for sym in symbol_list]
await pubsub.subscribe(*channels)
async def event_generator():
try:
# Send initial snapshot from cache
for symbol in symbol_list:
price = await request.app.redis.get(f"price:{symbol}")
if price:
yield f"data: {json.dumps({'symbol': symbol, 'price': float(price)})}\n\n"
# Stream live updates
async for message in pubsub.listen():
if message["type"] == "message":
yield f"data: {message['data'].decode()}\n\n"
# Heartbeat to detect dead connections
yield ": heartbeat\n\n"
await asyncio.sleep(15)
except asyncio.CancelledError:
pass
finally:
await pubsub.unsubscribe(*channels)
response = StreamingHttpResponse(
event_generator(),
content_type="text/event-stream",
)
response["Cache-Control"] = "no-cache"
response["X-Accel-Buffering"] = "no"
return responseLa riga SSE solo-commento (: heartbeat) ha due scopi: rileva le connessioni morte sui proxy che scadono le connessioni inattive, e impedisce all’idle timeout dell’ALB di terminare lo stream. L’intervallo deve essere inferiore all’idle timeout dell’ALB (default 60 secondi, raccomandati 30 secondi per SSE).
Policy di autoscaling. Il servizio ECS scala su una metrica composita: CPU media sopra il 60% O numero medio di connessioni attive sopra 3.000 per task innesca lo scale-out. Lo scale-in richiede che entrambe le metriche scendano sotto le soglie per 5 minuti consecutivi per prevenire il thrashing.
Event Sourcing e il Servizio di Audit#
Ogni ordine, fill, cancellazione, modifica e decisione di rischio è un evento. Il Servizio di Audit consuma tutti i topic Kafka e scrive record immutabili su S3 in formato Parquet, partizionati per data e tipo di evento. Questo serve due scopi: conformità normativa (molte giurisdizioni richiedono una conservazione di 7 anni dei record di trading) e replay degli eventi (lo stato di qualsiasi servizio può essere ricostruito da zero riproducendo i suoi eventi).
import pyarrow as pa
import pyarrow.parquet as pq
from dataclasses import asdict
class AuditConsumer:
"""
Append-only event log. No updates, no deletes.
Implements the Repository Pattern over S3 as the backing store.
"""
def __init__(self, s3_client, bucket: str):
self._s3 = s3_client
self._bucket = bucket
self._buffer: list[dict] = []
self._flush_interval_s = 30
self._max_buffer_size = 10_000
async def consume(self, event: dict) -> None:
self._buffer.append(event)
if len(self._buffer) >= self._max_buffer_size:
await self._flush()
async def _flush(self) -> None:
if not self._buffer:
return
table = pa.Table.from_pylist(self._buffer)
key = self._partition_key(self._buffer[0])
# Write to S3 as Parquet
buf = pa.BufferOutputStream()
pq.write_table(table, buf, compression="snappy")
await self._s3.put_object(
Bucket=self._bucket,
Key=key,
Body=buf.getvalue().to_pybytes(),
)
self._buffer.clear()
def _partition_key(self, event: dict) -> str:
ts = event["timestamp"][:10] # YYYY-MM-DD
event_type = event["type"]
return f"events/dt={ts}/type={event_type}/{event['id']}.parquet"Parquet con compressione Snappy ottiene circa 10:1 di compressione sui dati JSON degli eventi. Lo schema di partizione S3 (dt=YYYY-MM-DD/type=event_type) è compatibile con Athena per query SQL ad-hoc senza nessuna pipeline ETL.
Architettura dei Dati: Aurora, DynamoDB e Redis#
Dati diversi hanno caratteristiche diverse. Usare un unico database per tutto è l’errore architetturale più comune nelle piattaforme di trading.
flowchart LR
subgraph Transactional
Aurora[(Aurora PostgreSQL\nServerless v2\nWriter + 2 Read Replicas)]
OMS --> Aurora
Note1[Orders, users, accounts\nStrong consistency\nACID transactions]
end
subgraph Hot State
DDB[(DynamoDB\nOn-Demand\nDAX cache optional)]
PosSvc --> DDB
Note2[Positions, open orders\nms-level reads\nAtomic increments]
end
subgraph Cache
Redis[(ElastiCache Redis\nCluster Mode\n3 primary shards)]
Note3[Last prices, sessions\nrate limits, pub/sub\nTTL enforced]
end
subgraph Cold Archive
S3[(S3 + Glacier\nIntelligent Tiering)]
Note4[Audit events, backfill\nhistorical data\nAthena queryable]
end
Aurora PostgreSQL gestisce il ciclo di vita degli ordini, gli account utente e lo storico delle transazioni. Le scritture vanno all’istanza writer. Le operazioni intensive di lettura (storico ordini, estratti conto) usano le read replica. Aurora Serverless v2 scala automaticamente le ACU (Aurora Capacity Unit) tra 0,5 e 64 ACU in base al carico, eliminando i costi di over-provisioning nelle ore fuori mercato.
DynamoDB gestisce le posizioni e lo stato degli ordini aperti. Il record di posizione per una coppia utente-simbolo viene aggiornato atomicamente a ogni fill usando UpdateItem con operazioni ADD. Questo è il modo corretto di gestire aggiornamenti di fill concorrenti senza transazioni: la semantica di incremento atomico previene gli aggiornamenti persi anche con più consumer del Position Service in esecuzione in parallelo.
class PositionRepository:
"""
Dependency Inversion: depends on abstract DynamoDB client, not boto3 directly.
Atomic update pattern: no read-modify-write race conditions.
"""
def __init__(self, table):
self._table = table
async def apply_fill(self, user_id: str, symbol: str,
fill_qty: int, avg_price: float,
fill_id: str, side: str) -> None:
signed_qty = fill_qty if side == "buy" else -fill_qty
notional = fill_qty * avg_price
await self._table.update_item(
Key={"pk": f"USER#{user_id}", "sk": f"POS#{symbol}"},
UpdateExpression=(
"ADD qty :q, total_cost :c "
"SET last_updated = :ts, last_fill_id = :fid"
),
ExpressionAttributeValues={
":q": signed_qty,
":c": notional if side == "buy" else -notional,
":ts": datetime.utcnow().isoformat(),
":fid": fill_id,
":prev_fid": fill_id,
},
# Idempotency: skip if this fill was already applied
ConditionExpression="attribute_not_exists(last_fill_id) OR last_fill_id <> :prev_fid",
)La ConditionExpression fornisce l’idempotenza. Se il consumer si blocca dopo aver applicato un fill ma prima di fare commit dell’offset Kafka, lo stesso evento di fill viene riconsegnato. La condizione impedisce che venga applicato due volte.
Circuit Breaker e Pattern di Resilienza#
Una piattaforma di trading che fallisce in modalità aperta è peggio di una che fallisce in modalità chiusa. Ogni chiamata esterna (broker gateway, servizio di rischio, dati di mercato) è avvolta in un circuit breaker.
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal: calls pass through
OPEN = "open" # Tripped: calls fail fast
HALF_OPEN = "half_open" # Recovery probe
class CircuitBreaker:
"""
Single Responsibility: manages open/closed state for a single dependency.
Fails fast when the dependency is unhealthy rather than queuing up calls.
"""
def __init__(self, failure_threshold: int = 5,
recovery_timeout_s: float = 30.0,
success_threshold: int = 2):
self._failure_threshold = failure_threshold
self._recovery_timeout = recovery_timeout_s
self._success_threshold = success_threshold
self._failures = 0
self._successes = 0
self._last_failure_time: float | None = None
self._state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
if self._state == CircuitState.OPEN:
if time.monotonic() - self._last_failure_time > self._recovery_timeout:
self._state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as exc:
self._on_failure()
raise
def _on_success(self):
if self._state == CircuitState.HALF_OPEN:
self._successes += 1
if self._successes >= self._success_threshold:
self._state = CircuitState.CLOSED
self._failures = 0
self._successes = 0
else:
self._failures = 0
def _on_failure(self):
self._failures += 1
self._last_failure_time = time.monotonic()
if self._failures >= self._failure_threshold:
self._state = CircuitState.OPENQuando il circuit breaker del broker gateway si apre, l’Execution Engine inizia immediatamente a pubblicare gli ordini su un topic Kafka dead_letter con stato BROKER_UNAVAILABLE, e il servizio di notifica invia alert agli utenti coinvolti. Il sistema degrada in modo controllato invece di fallire silenziosamente.
Infrastructure as Code#
Il sistema completo si distribuisce tramite Terraform. Ogni servizio ha la propria task definition ECS, e l’infrastruttura condivisa (MSK, ElastiCache, Aurora, VPC) è gestita da un modulo foundation separato.
flowchart TD
subgraph VPC 10.0.0.0/16
subgraph Public Subnets
ALBExt[External ALB]
end
subgraph Private App Subnets
ECSCluster[ECS Cluster\n8 services]
ALBInt[Internal ALB\nservice mesh]
end
subgraph Private Data Subnets
Aurora[(Aurora\nMulti-AZ)]
DDB_EP[DynamoDB VPC\nEndpoint]
Redis[(ElastiCache\nMulti-AZ)]
MSK[(Amazon MSK\n3 broker nodes\n3 AZs)]
end
end
IGW[Internet Gateway] --> ALBExt
ECSCluster --> ALBInt
ALBInt --> ECSCluster
ECSCluster --> Aurora
ECSCluster --> Redis
ECSCluster --> MSK
ECSCluster --> DDB_EP
Kinesis_EP[Kinesis VPC\nEndpoint] --> ECSCluster
S3_EP[S3 VPC\nEndpoint] --> ECSCluster
MSK gira su 3 nodi broker su 3 Availability Zone con un replication factor di 3 per tutti i topic critici (order.submitted, order.filled, position.updated). Il topic market.price usa replication factor 2 e acks=1 (il producer non aspetta la replica sui follower) perché la perdita occasionale di aggiornamenti di prezzo è accettabile. Perdere un evento di fill non lo è.
Topic e configurazioni:
| Topic | Partizioni | Replica | Acks | Retention |
|---|---|---|---|---|
| order.submitted | 12 | 3 | all | 7 giorni |
| order.filled | 12 | 3 | all | 7 giorni |
| position.updated | 12 | 3 | all | 7 giorni |
| market.price | 64 | 2 | 1 | 1 ora |
| audit.events | 4 | 3 | all | 30 giorni |
Osservabilità#
Un sistema distribuito di questa complessità è impossibile da operare senza osservabilità strutturata fin dal primo giorno.
Logging strutturato: ogni riga di log è JSON con trace_id, user_id, service, level e correlation_id. Il correlation ID che traccia un ordine dalla sottomissione al fill appare in ogni riga di log della catena. Una singola query Athena sull’archivio log S3 può ricostruire la timeline completa di qualsiasi ordine.
Distributed tracing: i trace X-Ray di AWS coprono il web tier, OMS, Risk Engine ed Execution Engine. Gli header del messaggio Kafka trasportano il contesto del trace in modo che lo span continui sul lato consumer, non solo sul lato producer.
Metriche custom: oltre alle metriche standard di ECS/Lambda/Aurora, ogni servizio pubblica metriche a livello applicativo tramite CloudWatch Embedded Metric Format:
- Latenza di sottomissione ordine (p50, p95, p99)
- Latenza del controllo di rischio
- Latenza di fill del broker gateway (dalla sottomissione al callback di fill)
- Lag di aggiornamento posizione (tempo tra l’evento di fill e la consistenza della materialized view)
- Consumer lag di Kafka per consumer group
- Numero di connessioni SSE per task del web tier
La metrica di latenza di fill del broker gateway merita attenzione particolare. Un picco qui indica problemi lato broker prima che causino fallimenti visibili degli ordini. Un alert su latenza di fill p95 sopra 2 secondi scatena una notifica on-call e aumenta automaticamente il parametro timeout del Broker Gateway da 10 a 30 secondi, dando al broker più tempo prima che il circuit si apra.
Riepilogo dei Design Pattern#
| Pattern | Dove Applicato | Perche |
|---|---|---|
| Command Pattern | PlaceOrderCommand, CancelOrderCommand | Incapsula l’intento dell’utente come oggetto immutabile, disaccoppia mittente e handler |
| Observer Pattern | Market Data Consumer, Risk Feed | Piu handler notificati degli aggiornamenti di prezzo senza accoppiamento reciproco |
| Adapter Pattern | Protocollo BrokerAdapter, normalizzatori exchange | API specifiche del broker convertite in un’interfaccia uniforme |
| Repository Pattern | OrderRepository, PositionRepository | Logica di persistenza isolata dalla logica di business, iniettabile e testabile |
| Circuit Breaker | Tutte le chiamate esterne | Fail fast, prevenzione dei fallimenti a cascata, degradazione controllata |
| CQRS | Order Management Service | Modelli di scrittura e lettura separati, scaling indipendente, letture consistenti dalle proiezioni |
| Materialized View | Position Service | Fill asincroni materializzati in uno snapshot interrogabile, bridge async-to-sync |
| Correlation ID | Flusso ordini end-to-end | Traccia un’azione utente sincrona attraverso una pipeline asincrona |
Lezioni da Questo Design#
Correttezza prima della performance. Il sistema accetta un overhead di 200-250ms per i controlli di rischio pre-trade su ogni ordine. Non si tratta di un problema di performance. Un ordine che bypassa i controlli di rischio per un’ottimizzazione della latenza e un problema esistenziale. I vincoli di correttezza non sono negoziabili.
Il confine asincrono deve essere esplicito. Ogni punto in cui il sistema attraversa da sincrono ad asincrono (HTTP a Kafka, Kafka a broker callback, broker callback a asyncio Future) e una giuntura esplicita e nominata nel codice. Le transizioni async implicite sono il luogo in cui vivono i bug sottili: off-by-one nell’ordinamento degli eventi, race condition nella risoluzione dei Future, eventi persi negli scenari di restart.
L’idempotenza non e opzionale. La consegna at-least-once (l’unica garanzia sicura per Kafka senza overhead significativo di performance) significa che ogni consumer deve essere idempotente. La chiave di idempotenza della ConditionExpression dell’aggiornamento di posizione, la deduplicazione dell’evento di fill nell’OMS e la chiave S3 content-addressable dell’evento di audit seguono tutti lo stesso principio: applicare lo stesso evento due volte deve produrre lo stesso risultato che applicarlo una volta.
Fallire in modalita chiusa, non aperta. I sistemi di rischio, i circuit breaker e i rate limiter falliscono tutti nella direzione che impedisce azioni pericolose. Se il servizio di rischio non e raggiungibile, gli ordini vengono rifiutati. Se il circuit del broker si apre, gli ordini vanno in dead letter. Se il session store Redis e down, gli utenti vengono disconnessi. La disponibilita e importante, ma un sistema finanziario che prende decisioni non controllate sotto fallimento e peggio di uno che si rifiuta di agire.
Scalare il percorso di lettura, non quello di scrittura. 15.000 utenti fanno molto piu lettura (visualizzare posizioni, controllare lo stato degli ordini, guardare i dati di mercato) che scrittura (inviare ordini). Il pattern CQRS, le read replica di Aurora, DynamoDB come position cache e il pattern della materialized view servono tutti lo stesso obiettivo: il percorso di scrittura e piccolo, ben controllato e transazionale. Il percorso di lettura scala orizzontalmente senza contesa.
Riferimenti#
- AWS MSK (Managed Kafka)
- DynamoDB Atomic Counters
- asyncio call_soon_threadsafe
- CQRS Pattern (Martin Fowler)
- Circuit Breaker Pattern
- Event Sourcing Pattern
- Aurora Serverless v2
- Kinesis Data Streams
Vuoi approfondire l’integrazione AI, il platform engineering o i sistemi backend? Offro sessioni di coaching 1:1 calibrate in base alla tua esperienza e ai tuoi obiettivi. Dai un’occhiata alla pagina coaching.