Salta al contenuto principale

Progettare una Piattaforma di Trading Algoritmico ad Alto Throughput su AWS: Architettura per 15.000 Utenti Concorrenti

Indice dei contenuti
Un’architettura production-realistica per una piattaforma di trading algoritmico cloud-native: 15.000 utenti concorrenti, acknowledgement degli ordini sotto il secondo, streaming real-time dei dati di mercato, CQRS, event sourcing, e il bridge async-sync che tiene tutto insieme.

I problemi più difficili nel software finanziario non sono algoritmici. Sono architetturali. Una piattaforma di trading deve essere corretta prima di essere veloce, consistente prima di essere elegante, e auditabile prima di tutto il resto. Deve anche colmare un disallineamento fondamentale al cuore di ogni sistema di trading mai costruito: gli utenti interagiscono in modo sincrono (cliccano “compra” e si aspettano una risposta), ma i sistemi sottostanti sono intrinsecamente asincroni. Le API dei broker consegnano i fill tramite callback. I dati di mercato arrivano come uno stream di push continuo. I calcoli di rischio avvengono in pipeline parallele. Lo stato delle posizioni è per natura eventually consistent.

Questo articolo descrive un’architettura completa per una piattaforma di trading algoritmico teorica ma production-realistica costruita su AWS. Il sistema serve 15.000 utenti concorrenti, gestisce l’esecuzione degli ordini con acknowledgement sotto il secondo, fa streaming real-time dei dati di mercato, applica controlli di rischio pre-trade e mantiene uno stato coerente delle posizioni su servizi distribuiti. Descriverò ogni layer, i pattern che lo fanno funzionare e i tradeoff coinvolti.


La Tensione Centrale: Utenti Sincroni, Mercati Asincroni
#

Prima di qualsiasi diagramma, il problema centrale merita una formulazione precisa.

Quando un utente invia un ordine tramite la UI web:

  1. Il browser effettua una HTTP POST sincrona. L’utente si aspetta una risposta immediata.
  2. Il backend deve validare l’ordine, eseguire i controlli di rischio pre-trade e pubblicarlo su una coda di elaborazione.
  3. L’Execution Engine lo raccoglie e lo invia al broker tramite il Broker Gateway.
  4. L’API del broker è intrinsecamente callback-based. La chiamata di invio ritorna immediatamente. La conferma di esecuzione (fill, fill parziale, o rifiuto) arriva in seguito tramite un callback su un thread o una connessione diversi.
  5. Nel frattempo, l’utente sta osservando il widget dello stato dell’ordine aggiornarsi in tempo reale.

Non esiste una risposta unica e corretta a questo problema, ma esiste una risposta architetturale ben definita: separare l’acknowledgement di ricezione dall’acknowledgement di esecuzione, usare correlation ID per tracciare lo stato attraverso la pipeline asincrona, e servire lo stato di posizioni e ordini da una materialized view piuttosto che dallo stream di eventi live.

Queste tre idee compaiono in ogni layer di ciò che segue.


Panoramica del Sistema
#

flowchart TD
    CDN[CloudFront CDN\nStatic Assets] --> Users([15,000 Concurrent Users])
    Users --> WAF[AWS WAF]
    WAF --> ALB[External ALB\nTLS 1.3]
    ALB --> WebTier[Web Tier\nECS Fargate\n8 tasks min / 40 max]

    WebTier --> SessionCache[(ElastiCache Redis\nSession + Rate Limit)]
    WebTier --> APIGW[Internal API Gateway]

    APIGW --> OMS[Order Management\nService]
    APIGW --> MDS[Market Data\nService]
    APIGW --> PS[Position\nService]
    APIGW --> RS[Risk\nService]

    OMS --> MSK[(Amazon MSK\nKafka)]
    MDS --> Kinesis[(Amazon Kinesis\nData Streams)]
    RS --> MSK

    MSK --> ExecEngine[Execution\nEngine]
    MSK --> AuditSvc[Audit\nService]
    MSK --> NotifSvc[Notification\nService]

    ExecEngine --> BrokerGW[Broker\nGateway]
    BrokerGW --> ExtBroker([External Broker\nCallback API])

    BrokerGW --> MSK
    Kinesis --> MDS

    PS --> DDB[(DynamoDB\nPosition State)]
    OMS --> Aurora[(Aurora PostgreSQL\nOrder Ledger)]
    AuditSvc --> S3[(S3\nAudit Archive)]

    NotifSvc --> Redis2[(ElastiCache Redis\nPub/Sub)]
    WebTier --> Redis2

Gli otto servizi primari hanno ognuno una responsabilità singola e delimitata. Non si tratta di microservizi per il gusto di farlo. Ogni confine corrisponde a un modello di consistenza, un profilo di scaling e una preoccupazione operativa genuinamente diversi.

ServizioResponsabilitàModello di Consistenza
Web TierSessioni utente, SSE, REST APIStateless
Order ManagementCiclo di vita degli ordini, validazioneStrong (transazionale)
Risk ServiceControlli pre-tradeEventually consistent in lettura, sync in scrittura
Execution EngineInvio al broker, gestione dei fillAt-least-once con idempotenza
Broker GatewayBridge async/sync verso l’API del brokerStateful, single-writer
Market Data ServiceIngestione e distribuzione dello streamBest-effort, ordinato
Position ServiceMaterializzazione di posizioni e P&LEventually consistent
Audit ServiceLog eventi immutabileAppend-only

Principi SOLID in Pratica
#

Prima di approfondire ogni servizio, vale la pena dichiarare quali principi di design governano i confini del sistema. Non sono riferimenti decorativi. Ogni principio risolve una specifica decisione architetturale.

Single Responsibility Principle: il Broker Gateway ha esattamente un compito: fare da bridge tra l’event loop asyncio e l’API callback-based del broker. Non valida ordini, non calcola il rischio, non aggiorna posizioni. Qualsiasi logica oltre il bridge appartiene altrove.

Open/Closed Principle: l’Execution Engine è chiuso alla modifica ma aperto all’estensione tramite un protocollo BrokerAdapter. Aggiungere supporto per un nuovo broker (Interactive Brokers, Alpaca, Binance) significa implementare l’interfaccia dell’adapter, non toccare il motore. La stessa logica vale per lo Strategy Engine, che usa un protocollo SignalGenerator che le strategie implementano.

Liskov Substitution Principle: tutti i broker adapter sono intercambiabili al confine dell’interfaccia BrokerAdapter. Un adapter per il paper trading, uno per il trading live e uno per il replay si comportano in modo identico dalla prospettiva dell’Execution Engine. La sostituzione è verificata da una suite di contract test condivisa.

Interface Segregation Principle: l’Order Management Service espone tre interfacce separate: una per il piazzamento degli ordini (lato scrittura), una per le query sugli ordini (lato lettura) e una per gli eventi di cambio di stato interno consumati dai servizi downstream. I client che eseguono solo query sugli ordini non dipendono mai dall’interfaccia di scrittura.

Dependency Inversion Principle: nessun servizio importa direttamente un SDK broker concreto, uno specifico driver di database o un client specifico per le code di messaggi. Tutte le dipendenze infrastrutturali sono iniettate tramite interfacce astratte. I unit test dell’Execution Engine girano interamente in memoria con un broker adapter finto.


Il Bridge Async/Sync: il Broker Gateway
#

Questo è il componente tecnicamente più interessante del sistema, e quello più comunemente implementato in modo sbagliato.

Le API dei broker (Interactive Brokers TWS, la maggior parte delle implementazioni FIX) sono intrinsecamente callback-based. Si invia un ordine, e la conferma arriva in seguito su un callback diverso. L’implementazione ingenua usa thread e blocking queue. L’implementazione moderna e corretta usa asyncio di Python con esplicit Future bridging e call_soon_threadsafe.

Pattern 1: Da Sincrono ad Asincrono (Invio Ordine)
#

L’utente invia un ordine tramite HTTP POST. Il web tier chiama l’Order Management Service in modo sincrono. L’OMS valida, persiste l’ordine e pubblica un comando su Kafka. Questa è la transizione sync-to-async: un’azione utente sincrona convertita in un comando asincrono. La risposta HTTP porta un order ID e lo stato PENDING. Il widget di stato dell’ordine dell’utente si iscrive tramite SSE per i successivi cambiamenti di stato.

broker_gateway/gateway.py
import asyncio
from dataclasses import dataclass
from typing import Protocol

class BrokerAdapter(Protocol):
    """Abstract broker interface. Concrete adapters implement this."""
    def submit_order(self, order_id: str, symbol: str,
                     side: str, qty: int, order_type: str) -> None: ...
    def cancel_order(self, order_id: str) -> None: ...
    def set_callbacks(self, on_fill: callable, on_reject: callable) -> None: ...

@dataclass
class OrderFill:
    order_id: str
    filled_qty: int
    avg_price: float
    timestamp: str

class BrokerGateway:
    """
    Bridges the broker's callback-based API (running on a separate thread
    or event loop) into asyncio Futures consumable by the Execution Engine.

    This solves the sync-to-async problem: the Execution Engine awaits
    a Future. The Future is resolved when the broker fires its callback,
    using call_soon_threadsafe to cross the thread boundary safely.
    """

    def __init__(self, adapter: BrokerAdapter):
        self._adapter = adapter
        self._pending: dict[str, asyncio.Future[OrderFill]] = {}
        self._loop: asyncio.AbstractEventLoop | None = None

    def start(self, loop: asyncio.AbstractEventLoop) -> None:
        self._loop = loop
        self._adapter.set_callbacks(
            on_fill=self._on_fill_callback,
            on_reject=self._on_reject_callback,
        )

    async def submit(self, order_id: str, symbol: str,
                     side: str, qty: int, order_type: str,
                     timeout: float = 10.0) -> OrderFill:
        """
        Submits order to broker and awaits confirmation.
        Returns when the broker confirms the fill or raises on timeout/reject.
        """
        loop = asyncio.get_running_loop()
        future: asyncio.Future[OrderFill] = loop.create_future()
        self._pending[order_id] = future

        # Non-blocking submission
        self._adapter.submit_order(order_id, symbol, side, qty, order_type)

        try:
            return await asyncio.wait_for(future, timeout=timeout)
        except asyncio.TimeoutError:
            self._pending.pop(order_id, None)
            raise BrokerTimeoutError(order_id)

    def _on_fill_callback(self, order_id: str, filled_qty: int,
                           avg_price: float, timestamp: str) -> None:
        """
        Called by the broker SDK on its own thread.
        Must NOT touch asyncio primitives directly.
        call_soon_threadsafe schedules the resolution in the event loop.
        """
        future = self._pending.pop(order_id, None)
        if future and not future.done():
            fill = OrderFill(order_id, filled_qty, avg_price, timestamp)
            self._loop.call_soon_threadsafe(future.set_result, fill)

    def _on_reject_callback(self, order_id: str, reason: str) -> None:
        future = self._pending.pop(order_id, None)
        if future and not future.done():
            exc = OrderRejectedException(order_id, reason)
            self._loop.call_soon_threadsafe(future.set_exception, exc)

Il dettaglio critico è call_soon_threadsafe. L’SDK del broker scatena i callback dal proprio thread. Chiamare future.set_result() direttamente da un thread esterno causerebbe race condition negli internals di asyncio. call_soon_threadsafe pianifica la risoluzione all’interno dell’event loop sul suo thread, rendendo l’operazione sicura.

Pattern 2: Da Asincrono a Sincrono (Query delle Posizioni)
#

Il problema opposto è altrettanto comune. I dati di mercato e i fill degli ordini arrivano come stream asincrono. Una query sulle posizioni di un utente richiede una risposta coerente e sincrona.

La soluzione è il pattern della materialized view. Il Position Service si iscrive agli eventi di fill da Kafka, aggiorna una tabella DynamoDB in quasi-real-time e serve le query sulle posizioni direttamente da DynamoDB. L’utente non interroga mai lo stream di eventi live. Interroga uno snapshot pre-calcolato e indicizzato.

sequenceDiagram
    participant BrokerGW as Broker Gateway
    participant Kafka as Amazon MSK (Kafka)
    participant PosSvc as Position Service
    participant DDB as DynamoDB
    participant WebTier as Web Tier
    participant User

    BrokerGW->>Kafka: publish order.filled event
    Kafka->>PosSvc: consume order.filled
    PosSvc->>DDB: update_item (atomic increment qty, recalc P&L)
    DDB-->>PosSvc: ok

    Note over PosSvc,DDB: Async pipeline, eventually consistent

    User->>WebTier: GET /api/positions
    WebTier->>PosSvc: getPositions(userId)
    PosSvc->>DDB: query by userId
    DDB-->>PosSvc: position snapshot
    PosSvc-->>WebTier: positions
    WebTier-->>User: 200 OK (consistent snapshot)

    Note over WebTier,User: Sync request served from materialized view

Il bridge async-to-sync qui è semplicemente un consumer Kafka che scrive su uno store interrogabile. Le decisioni chiave sono: la scrittura su DynamoDB deve essere idempotente (l’evento porta un fill ID univoco usato come chiave di idempotenza), e la scrittura deve essere atomica per i campi di quantità e P&L per evitare aggiornamenti parziali.

Pattern 3: Request-Response con Correlation ID
#

Per le operazioni che richiedono una risposta con l’aspetto sincrono da un sistema intrinsecamente asincrono (come richiedere una quotazione real-time prima dell’invio dell’ordine), il sistema usa il pattern del correlation ID su Kafka.

sequenceDiagram
    participant WebTier as Web Tier
    participant OMS as Order Management Service
    participant Kafka
    participant ExecEngine as Execution Engine
    participant BrokerGW as Broker Gateway

    WebTier->>OMS: POST /orders (sync HTTP)
    OMS->>OMS: validate + persist (status=PENDING)
    OMS->>Kafka: publish order.submitted (correlation_id=X)
    OMS-->>WebTier: 202 Accepted (order_id, status=PENDING)

    Note over WebTier: User UI subscribes via SSE for status updates

    Kafka->>ExecEngine: consume order.submitted
    ExecEngine->>BrokerGW: await submit(order, timeout=10s)
    BrokerGW->>BrokerGW: call_soon_threadsafe awaits fill callback

    BrokerGW-->>ExecEngine: OrderFill
    ExecEngine->>Kafka: publish order.filled (correlation_id=X)

    Kafka->>OMS: consume order.filled -> update order status
    Kafka->>PosSvc: consume order.filled -> update positions
    Kafka->>NotifSvc: consume order.filled -> push to user SSE
    NotifSvc-->>WebTier: SSE event: order X filled
    WebTier-->>User: real-time status update

L’utente invia un ordine e riceve immediatamente un 202 Accepted con un order ID. Non aspetta l’esecuzione del broker. Il canale SSE invia la notifica di fill quando arriva. Dal punto di vista dell’utente, l’esperienza è quasi real-time. Dal punto di vista del sistema, ogni componente fa esattamente ciò per cui è ottimizzato.


Order Management Service: Separazione di Comando e Query
#

L’Order Management Service implementa CQRS (Command Query Responsibility Segregation) in modo esplicito. Le operazioni di scrittura (piazzare un ordine, cancellarlo, modificarlo) passano attraverso un command handler che persiste su Aurora PostgreSQL e pubblica eventi su Kafka. Le operazioni di lettura (ottenere un ordine, listare gli ordini, storico degli ordini) interrogano un read model mantenuto da un processo di proiezione che consuma gli stessi eventi Kafka.

flowchart LR
    subgraph Write Side
        CMD[Command Handler] --> Validate[Validator]
        Validate --> RiskCheck[Risk Pre-Check\nSync call to Risk Service]
        RiskCheck --> Persist[(Aurora PostgreSQL\nOrder Ledger)]
        RiskCheck --> Publish[Kafka Publisher]
    end

    subgraph Read Side
        Projection[Projection Process\nKafka Consumer] --> ReadDB[(Aurora Read Replica\nOrder Read Model)]
        QueryHandler[Query Handler] --> ReadDB
    end

    subgraph External
        Publish --> KafkaTopic[(order.commands\ntopic)]
        KafkaTopic --> Projection
    end
oms/commands.py
from dataclasses import dataclass, field
from enum import Enum
from typing import Protocol
import uuid

class OrderSide(str, Enum):
    BUY = "buy"
    SELL = "sell"

class OrderType(str, Enum):
    MARKET = "market"
    LIMIT = "limit"
    STOP = "stop"

@dataclass(frozen=True)
class PlaceOrderCommand:
    """Immutable command. Implements the Command Pattern."""
    user_id: str
    symbol: str
    side: OrderSide
    qty: int
    order_type: OrderType
    limit_price: float | None = None
    correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))

class CommandHandler(Protocol):
    """ISP: command handler interface segregated from query interface."""
    async def handle(self, command: PlaceOrderCommand) -> str: ...

class PlaceOrderHandler:
    """
    Single Responsibility: coordinates validation, risk check, and persistence.
    Dependency Inversion: depends on abstractions, not concrete implementations.
    """

    def __init__(
        self,
        validator: "OrderValidator",
        risk_client: "RiskServiceClient",
        repository: "OrderRepository",
        publisher: "KafkaPublisher",
    ):
        self._validator = validator
        self._risk = risk_client
        self._repo = repository
        self._publisher = publisher

    async def handle(self, cmd: PlaceOrderCommand) -> str:
        # Validate structure
        self._validator.validate(cmd)

        # Synchronous pre-trade risk check (blocking, must pass before persist)
        risk_result = await self._risk.check(cmd)
        if not risk_result.approved:
            raise RiskViolationError(risk_result.reason)

        # Persist with PENDING status
        order_id = await self._repo.insert(cmd)

        # Publish command event (fire-and-forget, broker will confirm later)
        await self._publisher.publish("order.submitted", {
            "order_id": order_id,
            "correlation_id": cmd.correlation_id,
            "user_id": cmd.user_id,
            "symbol": cmd.symbol,
            "side": cmd.side,
            "qty": cmd.qty,
            "order_type": cmd.order_type,
            "limit_price": cmd.limit_price,
        })

        return order_id

Il controllo di rischio pre-trade è deliberatamente sincrono qui. L’ordine non deve essere persistito né pubblicato se viola i limiti di rischio. Questo è l’unico punto nel flusso degli ordini in cui il sistema si blocca.


Risk Engine: Controlli Pre-Trade in Tempo Reale
#

Il Risk Engine applica i limiti prima che qualsiasi ordine raggiunga il broker. Mantiene le tabelle dei limiti in Redis (letture veloci) e si iscrive agli eventi di posizione e dati di mercato per tenere aggiornate le esposizioni calcolate.

flowchart TD
    subgraph Risk Engine
        RiskAPI[Risk API\nSync gRPC]
        LimitStore[(Redis\nLimit Tables)]
        ExposureCalc[Exposure Calculator\nAsync Consumer]
        RulesEngine[Rules Engine\nOpen/Closed via Rule Protocol]

        RiskAPI --> RulesEngine
        RulesEngine --> LimitStore
        ExposureCalc --> LimitStore
    end

    OMS[Order Management\nService] -->|gRPC check| RiskAPI
    MSK[(Kafka)] -->|order.filled\nposition.updated| ExposureCalc
    MktData[(Kinesis)] -->|price.updated| ExposureCalc

Le regole di rischio seguono l’Open/Closed Principle: il motore esegue qualsiasi oggetto che implementa il protocollo RiskRule. Aggiungere una nuova regola non richiede alcuna modifica al motore stesso.

risk/engine.py
from typing import Protocol

class RiskRule(Protocol):
    """Open for extension: implement this to add new risk controls."""
    name: str
    async def evaluate(self, cmd: "PlaceOrderCommand",
                       context: "RiskContext") -> "RuleResult": ...

class MaxOrderSizeRule:
    name = "max_order_size"

    async def evaluate(self, cmd, context):
        limit = await context.limits.get(cmd.user_id, "max_order_size")
        notional = cmd.qty * context.last_price(cmd.symbol)
        if notional > limit:
            return RuleResult(approved=False,
                              reason=f"Order notional {notional:.2f} exceeds limit {limit:.2f}")
        return RuleResult(approved=True)

class MaxConcentrationRule:
    name = "max_concentration"

    async def evaluate(self, cmd, context):
        current_exposure = await context.exposure(cmd.user_id, cmd.symbol)
        new_qty = current_exposure.qty + (cmd.qty if cmd.side == "buy" else -cmd.qty)
        portfolio_value = await context.portfolio_value(cmd.user_id)
        concentration = abs(new_qty * context.last_price(cmd.symbol)) / portfolio_value
        if concentration > 0.20:  # 20% max single-name concentration
            return RuleResult(approved=False,
                              reason=f"Concentration {concentration:.1%} exceeds 20% limit")
        return RuleResult(approved=True)

class RiskEngine:
    """Closed for modification. Add rules by passing them at construction."""

    def __init__(self, rules: list[RiskRule], context_factory: "RiskContextFactory"):
        self._rules = rules
        self._context_factory = context_factory

    async def check(self, cmd: "PlaceOrderCommand") -> "RiskResult":
        context = await self._context_factory.build(cmd.user_id)
        for rule in self._rules:
            result = await rule.evaluate(cmd, context)
            if not result.approved:
                return RiskResult(approved=False, reason=f"[{rule.name}] {result.reason}")
        return RiskResult(approved=True)

Il controllo di rischio gira in meno di 5 millisecondi per cache Redis ben scaldate. Se Redis non è disponibile, il motore fallisce in modalità chiusa: tutti gli ordini vengono rifiutati fino al recupero della cache. Questa è la modalità di fallimento corretta per un sistema di rischio.


Architettura dei Dati di Mercato: Kinesis e Fan-Out
#

I dati di mercato sono il componente ad alto throughput. Al picco, il sistema ingestisce aggiornamenti di prezzo per migliaia di simboli su più exchange simultaneamente. L’architettura separa l’ingestione (feed grezzo su Kinesis) dalla distribuzione (fan-out agli iscritti).

flowchart TD
    subgraph Ingestion
        Feed1[Exchange Feed A\nWebSocket] --> Normalizer1[Normalizer Lambda]
        Feed2[Exchange Feed B\nFIX] --> Normalizer2[Normalizer Lambda]
        Feed3[Exchange Feed C\nREST Poll] --> Normalizer3[Normalizer Lambda]
        Normalizer1 --> Kinesis[(Kinesis Data Streams\n8 shards)]
        Normalizer2 --> Kinesis
        Normalizer3 --> Kinesis
    end

    subgraph Distribution
        Kinesis --> MDSConsumer[Market Data Service\nKinesis Consumer]
        MDSConsumer --> PriceCache[(Redis\nLast Price Cache\nTTL 5s)]
        MDSConsumer --> CandleAgg[Candle Aggregator\n1m 5m 1h OHLCV]
        MDSConsumer --> RiskFeed[Risk Engine Feed\nvia SQS]
        MDSConsumer --> SSEBroadcast[SSE Broadcaster\nper-symbol channels]
    end

    subgraph User Delivery
        SSEBroadcast --> RedisPS[(Redis Pub/Sub\nper-symbol channels)]
        WebTier[Web Tier\nSSE endpoint] --> RedisPS
        Users([Users with\nactive subscriptions]) --> WebTier
    end

Le Lambda normalizzatrici convertono i formati specifici di ciascun exchange in uno schema PriceUpdate canonico prima di scrivere su Kinesis. Ogni shard gestisce un range non sovrapposto di simboli (partizionati per hash del simbolo). Questo garantisce l’ordinamento per simbolo abilitando al contempo il parallelismo orizzontale.

market_data/consumer.py
import asyncio
import json
from dataclasses import dataclass

@dataclass(frozen=True)
class PriceUpdate:
    symbol: str
    bid: float
    ask: float
    last: float
    volume: int
    timestamp_ms: int
    exchange: str

class MarketDataConsumer:
    """
    Async consumer for Kinesis records.
    Applies the Observer Pattern: registered handlers are called for each update.
    Handlers are segregated by interest (ISP): risk only subscribes to price,
    candle builder subscribes to price+volume, SSE broadcast subscribes to all.
    """

    def __init__(self, kinesis_client, redis_client, publisher):
        self._kinesis = kinesis_client
        self._redis = redis_client
        self._publisher = publisher
        self._handlers: list[callable] = []

    def register(self, handler: callable) -> None:
        """Open/Closed: add handlers without modifying the consumer."""
        self._handlers.append(handler)

    async def run(self, stream_name: str, shard_id: str) -> None:
        iterator = await self._get_shard_iterator(stream_name, shard_id)
        while True:
            records = await self._kinesis.get_records(ShardIterator=iterator,
                                                       Limit=500)
            for record in records["Records"]:
                update = PriceUpdate(**json.loads(record["Data"]))
                await self._process(update)

            iterator = records["NextShardIterator"]
            if not records["Records"]:
                await asyncio.sleep(0.1)  # Kinesis polling interval

    async def _process(self, update: PriceUpdate) -> None:
        # Update last price cache (used by risk engine)
        await self._redis.setex(
            f"price:{update.symbol}",
            5,  # TTL: stale prices are dangerous
            f"{update.last}",
        )

        # Fan out to all registered handlers concurrently
        await asyncio.gather(
            *[handler(update) for handler in self._handlers],
            return_exceptions=True,  # One failing handler must not block others
        )

Il return_exceptions=True in asyncio.gather non è opzionale. Un bug nell’aggregatore di candele non deve impedire al feed di rischio di ricevere aggiornamenti di prezzo. I fallimenti di ogni handler vengono loggati ma non propagati.


Web Tier: Scalare a 15.000 Utenti Concorrenti
#

Servire 15.000 utenti concorrenti richiede di pensare attentamente a cosa significhi davvero “concorrente”. La maggior parte di quelle connessioni è inattiva la maggior parte del tempo: un utente con la dashboard di trading aperta mantiene una connessione SSE per i dati live ma non sta effettivamente effettuando chiamate API. Il numero di connessioni è quindi molto più alto del tasso di richieste.

flowchart TD
    CF[CloudFront\nStatic SPA\nCache-Control: immutable] --> ALB
    ALB[External ALB\nHTTP/2\nSticky for SSE] --> ECS

    subgraph ECS Fargate Cluster
        ECS[Web Service\n8 min / 40 max tasks\n2 vCPU / 4 GB each]
        ECS --> Uvicorn[Uvicorn ASGI\n4 workers per task]
    end

    ECS --> SessionRedis[(ElastiCache Redis\nSession Store\nCluster Mode 3 shards)]
    ECS --> RateLimit[(ElastiCache Redis\nRate Limiter\nSliding Window per user)]
    ECS --> SSERedis[(ElastiCache Redis\nPub/Sub\nMarket Data Fan-out)]
    ECS --> Services[Internal Services\nvia Service Discovery]

    subgraph Autoscaling
        CW[CloudWatch\nMetrics] --> ASG[ECS Service\nAutoscaling]
        ASG --> ECS
    end

Decisioni chiave per il web tier a questa scala:

ASGI invece di WSGI. Django con Uvicorn in modalità ASGI gestisce connessioni SSE concorrenti in modo efficiente. Un server WSGI sincrono creerebbe un thread per connessione, esaurendo le risorse a questo livello di concorrenza. ASGI usa il multitasking cooperativo: l’attesa su SSE o I/O su database cede l’event loop ad altre richieste.

SSE invece di WebSocket per i dati di mercato. SSE è unidirezionale (dal server al client), che è tutto ciò di cui la consegna dei dati di mercato ha bisogno. Funziona su HTTP/2, sopravvive ai reconnect del load balancer ed è più semplice da implementare e debuggare rispetto a WebSocket a questa scala. L’invio degli ordini usa normali richieste POST.

Sticky session sull’ALB solo per SSE. Le connessioni SSE sono di lunga durata e legate a un task specifico. La duration-based stickiness dell’ALB (cookie da 1 ora) garantisce che le connessioni SSE non vengano instradata verso un task diverso a metà stream. Le normali richieste API usano il round-robin.

Rate limiting al web tier, non ai servizi. Ogni task applica un rate limit a finestra scorrevole per utente tramite Redis prima di fare proxy verso i servizi interni. Questo impedisce ai singoli utenti di saturare i servizi downstream con script automatizzati.

web/sse.py
import asyncio
import json
from django.http import StreamingHttpResponse

async def market_data_stream(request, symbols: str):
    """
    SSE endpoint. Each active subscription is a long-lived async generator.
    The ALB sticky session ensures this generator stays on one task.
    """
    symbol_list = symbols.split(",")
    user_id = request.user.id
    pubsub = request.app.redis.pubsub()

    # Subscribe to per-symbol Redis channels
    channels = [f"mkt:{sym}" for sym in symbol_list]
    await pubsub.subscribe(*channels)

    async def event_generator():
        try:
            # Send initial snapshot from cache
            for symbol in symbol_list:
                price = await request.app.redis.get(f"price:{symbol}")
                if price:
                    yield f"data: {json.dumps({'symbol': symbol, 'price': float(price)})}\n\n"

            # Stream live updates
            async for message in pubsub.listen():
                if message["type"] == "message":
                    yield f"data: {message['data'].decode()}\n\n"

                # Heartbeat to detect dead connections
                yield ": heartbeat\n\n"
                await asyncio.sleep(15)

        except asyncio.CancelledError:
            pass
        finally:
            await pubsub.unsubscribe(*channels)

    response = StreamingHttpResponse(
        event_generator(),
        content_type="text/event-stream",
    )
    response["Cache-Control"] = "no-cache"
    response["X-Accel-Buffering"] = "no"
    return response

La riga SSE solo-commento (: heartbeat) ha due scopi: rileva le connessioni morte sui proxy che scadono le connessioni inattive, e impedisce all’idle timeout dell’ALB di terminare lo stream. L’intervallo deve essere inferiore all’idle timeout dell’ALB (default 60 secondi, raccomandati 30 secondi per SSE).

Policy di autoscaling. Il servizio ECS scala su una metrica composita: CPU media sopra il 60% O numero medio di connessioni attive sopra 3.000 per task innesca lo scale-out. Lo scale-in richiede che entrambe le metriche scendano sotto le soglie per 5 minuti consecutivi per prevenire il thrashing.


Event Sourcing e il Servizio di Audit
#

Ogni ordine, fill, cancellazione, modifica e decisione di rischio è un evento. Il Servizio di Audit consuma tutti i topic Kafka e scrive record immutabili su S3 in formato Parquet, partizionati per data e tipo di evento. Questo serve due scopi: conformità normativa (molte giurisdizioni richiedono una conservazione di 7 anni dei record di trading) e replay degli eventi (lo stato di qualsiasi servizio può essere ricostruito da zero riproducendo i suoi eventi).

audit/consumer.py
import pyarrow as pa
import pyarrow.parquet as pq
from dataclasses import asdict

class AuditConsumer:
    """
    Append-only event log. No updates, no deletes.
    Implements the Repository Pattern over S3 as the backing store.
    """

    def __init__(self, s3_client, bucket: str):
        self._s3 = s3_client
        self._bucket = bucket
        self._buffer: list[dict] = []
        self._flush_interval_s = 30
        self._max_buffer_size = 10_000

    async def consume(self, event: dict) -> None:
        self._buffer.append(event)
        if len(self._buffer) >= self._max_buffer_size:
            await self._flush()

    async def _flush(self) -> None:
        if not self._buffer:
            return

        table = pa.Table.from_pylist(self._buffer)
        key = self._partition_key(self._buffer[0])

        # Write to S3 as Parquet
        buf = pa.BufferOutputStream()
        pq.write_table(table, buf, compression="snappy")

        await self._s3.put_object(
            Bucket=self._bucket,
            Key=key,
            Body=buf.getvalue().to_pybytes(),
        )
        self._buffer.clear()

    def _partition_key(self, event: dict) -> str:
        ts = event["timestamp"][:10]  # YYYY-MM-DD
        event_type = event["type"]
        return f"events/dt={ts}/type={event_type}/{event['id']}.parquet"

Parquet con compressione Snappy ottiene circa 10:1 di compressione sui dati JSON degli eventi. Lo schema di partizione S3 (dt=YYYY-MM-DD/type=event_type) è compatibile con Athena per query SQL ad-hoc senza nessuna pipeline ETL.


Architettura dei Dati: Aurora, DynamoDB e Redis
#

Dati diversi hanno caratteristiche diverse. Usare un unico database per tutto è l’errore architetturale più comune nelle piattaforme di trading.

flowchart LR
    subgraph Transactional
        Aurora[(Aurora PostgreSQL\nServerless v2\nWriter + 2 Read Replicas)]
        OMS --> Aurora
        Note1[Orders, users, accounts\nStrong consistency\nACID transactions]
    end

    subgraph Hot State
        DDB[(DynamoDB\nOn-Demand\nDAX cache optional)]
        PosSvc --> DDB
        Note2[Positions, open orders\nms-level reads\nAtomic increments]
    end

    subgraph Cache
        Redis[(ElastiCache Redis\nCluster Mode\n3 primary shards)]
        Note3[Last prices, sessions\nrate limits, pub/sub\nTTL enforced]
    end

    subgraph Cold Archive
        S3[(S3 + Glacier\nIntelligent Tiering)]
        Note4[Audit events, backfill\nhistorical data\nAthena queryable]
    end

Aurora PostgreSQL gestisce il ciclo di vita degli ordini, gli account utente e lo storico delle transazioni. Le scritture vanno all’istanza writer. Le operazioni intensive di lettura (storico ordini, estratti conto) usano le read replica. Aurora Serverless v2 scala automaticamente le ACU (Aurora Capacity Unit) tra 0,5 e 64 ACU in base al carico, eliminando i costi di over-provisioning nelle ore fuori mercato.

DynamoDB gestisce le posizioni e lo stato degli ordini aperti. Il record di posizione per una coppia utente-simbolo viene aggiornato atomicamente a ogni fill usando UpdateItem con operazioni ADD. Questo è il modo corretto di gestire aggiornamenti di fill concorrenti senza transazioni: la semantica di incremento atomico previene gli aggiornamenti persi anche con più consumer del Position Service in esecuzione in parallelo.

position/repository.py
class PositionRepository:
    """
    Dependency Inversion: depends on abstract DynamoDB client, not boto3 directly.
    Atomic update pattern: no read-modify-write race conditions.
    """

    def __init__(self, table):
        self._table = table

    async def apply_fill(self, user_id: str, symbol: str,
                         fill_qty: int, avg_price: float,
                         fill_id: str, side: str) -> None:
        signed_qty = fill_qty if side == "buy" else -fill_qty
        notional = fill_qty * avg_price

        await self._table.update_item(
            Key={"pk": f"USER#{user_id}", "sk": f"POS#{symbol}"},
            UpdateExpression=(
                "ADD qty :q, total_cost :c "
                "SET last_updated = :ts, last_fill_id = :fid"
            ),
            ExpressionAttributeValues={
                ":q": signed_qty,
                ":c": notional if side == "buy" else -notional,
                ":ts": datetime.utcnow().isoformat(),
                ":fid": fill_id,
                ":prev_fid": fill_id,
            },
            # Idempotency: skip if this fill was already applied
            ConditionExpression="attribute_not_exists(last_fill_id) OR last_fill_id <> :prev_fid",
        )

La ConditionExpression fornisce l’idempotenza. Se il consumer si blocca dopo aver applicato un fill ma prima di fare commit dell’offset Kafka, lo stesso evento di fill viene riconsegnato. La condizione impedisce che venga applicato due volte.


Circuit Breaker e Pattern di Resilienza
#

Una piattaforma di trading che fallisce in modalità aperta è peggio di una che fallisce in modalità chiusa. Ogni chiamata esterna (broker gateway, servizio di rischio, dati di mercato) è avvolta in un circuit breaker.

resilience/circuit_breaker.py
import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal: calls pass through
    OPEN = "open"          # Tripped: calls fail fast
    HALF_OPEN = "half_open"  # Recovery probe

class CircuitBreaker:
    """
    Single Responsibility: manages open/closed state for a single dependency.
    Fails fast when the dependency is unhealthy rather than queuing up calls.
    """

    def __init__(self, failure_threshold: int = 5,
                 recovery_timeout_s: float = 30.0,
                 success_threshold: int = 2):
        self._failure_threshold = failure_threshold
        self._recovery_timeout = recovery_timeout_s
        self._success_threshold = success_threshold
        self._failures = 0
        self._successes = 0
        self._last_failure_time: float | None = None
        self._state = CircuitState.CLOSED

    async def call(self, func, *args, **kwargs):
        if self._state == CircuitState.OPEN:
            if time.monotonic() - self._last_failure_time > self._recovery_timeout:
                self._state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError("Circuit breaker is OPEN")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as exc:
            self._on_failure()
            raise

    def _on_success(self):
        if self._state == CircuitState.HALF_OPEN:
            self._successes += 1
            if self._successes >= self._success_threshold:
                self._state = CircuitState.CLOSED
                self._failures = 0
                self._successes = 0
        else:
            self._failures = 0

    def _on_failure(self):
        self._failures += 1
        self._last_failure_time = time.monotonic()
        if self._failures >= self._failure_threshold:
            self._state = CircuitState.OPEN

Quando il circuit breaker del broker gateway si apre, l’Execution Engine inizia immediatamente a pubblicare gli ordini su un topic Kafka dead_letter con stato BROKER_UNAVAILABLE, e il servizio di notifica invia alert agli utenti coinvolti. Il sistema degrada in modo controllato invece di fallire silenziosamente.


Infrastructure as Code
#

Il sistema completo si distribuisce tramite Terraform. Ogni servizio ha la propria task definition ECS, e l’infrastruttura condivisa (MSK, ElastiCache, Aurora, VPC) è gestita da un modulo foundation separato.

flowchart TD
    subgraph VPC 10.0.0.0/16
        subgraph Public Subnets
            ALBExt[External ALB]
        end
        subgraph Private App Subnets
            ECSCluster[ECS Cluster\n8 services]
            ALBInt[Internal ALB\nservice mesh]
        end
        subgraph Private Data Subnets
            Aurora[(Aurora\nMulti-AZ)]
            DDB_EP[DynamoDB VPC\nEndpoint]
            Redis[(ElastiCache\nMulti-AZ)]
            MSK[(Amazon MSK\n3 broker nodes\n3 AZs)]
        end
    end

    IGW[Internet Gateway] --> ALBExt
    ECSCluster --> ALBInt
    ALBInt --> ECSCluster
    ECSCluster --> Aurora
    ECSCluster --> Redis
    ECSCluster --> MSK
    ECSCluster --> DDB_EP
    Kinesis_EP[Kinesis VPC\nEndpoint] --> ECSCluster
    S3_EP[S3 VPC\nEndpoint] --> ECSCluster

MSK gira su 3 nodi broker su 3 Availability Zone con un replication factor di 3 per tutti i topic critici (order.submitted, order.filled, position.updated). Il topic market.price usa replication factor 2 e acks=1 (il producer non aspetta la replica sui follower) perché la perdita occasionale di aggiornamenti di prezzo è accettabile. Perdere un evento di fill non lo è.

Topic e configurazioni:

TopicPartizioniReplicaAcksRetention
order.submitted123all7 giorni
order.filled123all7 giorni
position.updated123all7 giorni
market.price64211 ora
audit.events43all30 giorni

Osservabilità
#

Un sistema distribuito di questa complessità è impossibile da operare senza osservabilità strutturata fin dal primo giorno.

Logging strutturato: ogni riga di log è JSON con trace_id, user_id, service, level e correlation_id. Il correlation ID che traccia un ordine dalla sottomissione al fill appare in ogni riga di log della catena. Una singola query Athena sull’archivio log S3 può ricostruire la timeline completa di qualsiasi ordine.

Distributed tracing: i trace X-Ray di AWS coprono il web tier, OMS, Risk Engine ed Execution Engine. Gli header del messaggio Kafka trasportano il contesto del trace in modo che lo span continui sul lato consumer, non solo sul lato producer.

Metriche custom: oltre alle metriche standard di ECS/Lambda/Aurora, ogni servizio pubblica metriche a livello applicativo tramite CloudWatch Embedded Metric Format:

  • Latenza di sottomissione ordine (p50, p95, p99)
  • Latenza del controllo di rischio
  • Latenza di fill del broker gateway (dalla sottomissione al callback di fill)
  • Lag di aggiornamento posizione (tempo tra l’evento di fill e la consistenza della materialized view)
  • Consumer lag di Kafka per consumer group
  • Numero di connessioni SSE per task del web tier

La metrica di latenza di fill del broker gateway merita attenzione particolare. Un picco qui indica problemi lato broker prima che causino fallimenti visibili degli ordini. Un alert su latenza di fill p95 sopra 2 secondi scatena una notifica on-call e aumenta automaticamente il parametro timeout del Broker Gateway da 10 a 30 secondi, dando al broker più tempo prima che il circuit si apra.


Riepilogo dei Design Pattern
#

PatternDove ApplicatoPerche
Command PatternPlaceOrderCommand, CancelOrderCommandIncapsula l’intento dell’utente come oggetto immutabile, disaccoppia mittente e handler
Observer PatternMarket Data Consumer, Risk FeedPiu handler notificati degli aggiornamenti di prezzo senza accoppiamento reciproco
Adapter PatternProtocollo BrokerAdapter, normalizzatori exchangeAPI specifiche del broker convertite in un’interfaccia uniforme
Repository PatternOrderRepository, PositionRepositoryLogica di persistenza isolata dalla logica di business, iniettabile e testabile
Circuit BreakerTutte le chiamate esterneFail fast, prevenzione dei fallimenti a cascata, degradazione controllata
CQRSOrder Management ServiceModelli di scrittura e lettura separati, scaling indipendente, letture consistenti dalle proiezioni
Materialized ViewPosition ServiceFill asincroni materializzati in uno snapshot interrogabile, bridge async-to-sync
Correlation IDFlusso ordini end-to-endTraccia un’azione utente sincrona attraverso una pipeline asincrona

Lezioni da Questo Design
#

Correttezza prima della performance. Il sistema accetta un overhead di 200-250ms per i controlli di rischio pre-trade su ogni ordine. Non si tratta di un problema di performance. Un ordine che bypassa i controlli di rischio per un’ottimizzazione della latenza e un problema esistenziale. I vincoli di correttezza non sono negoziabili.

Il confine asincrono deve essere esplicito. Ogni punto in cui il sistema attraversa da sincrono ad asincrono (HTTP a Kafka, Kafka a broker callback, broker callback a asyncio Future) e una giuntura esplicita e nominata nel codice. Le transizioni async implicite sono il luogo in cui vivono i bug sottili: off-by-one nell’ordinamento degli eventi, race condition nella risoluzione dei Future, eventi persi negli scenari di restart.

L’idempotenza non e opzionale. La consegna at-least-once (l’unica garanzia sicura per Kafka senza overhead significativo di performance) significa che ogni consumer deve essere idempotente. La chiave di idempotenza della ConditionExpression dell’aggiornamento di posizione, la deduplicazione dell’evento di fill nell’OMS e la chiave S3 content-addressable dell’evento di audit seguono tutti lo stesso principio: applicare lo stesso evento due volte deve produrre lo stesso risultato che applicarlo una volta.

Fallire in modalita chiusa, non aperta. I sistemi di rischio, i circuit breaker e i rate limiter falliscono tutti nella direzione che impedisce azioni pericolose. Se il servizio di rischio non e raggiungibile, gli ordini vengono rifiutati. Se il circuit del broker si apre, gli ordini vanno in dead letter. Se il session store Redis e down, gli utenti vengono disconnessi. La disponibilita e importante, ma un sistema finanziario che prende decisioni non controllate sotto fallimento e peggio di uno che si rifiuta di agire.

Scalare il percorso di lettura, non quello di scrittura. 15.000 utenti fanno molto piu lettura (visualizzare posizioni, controllare lo stato degli ordini, guardare i dati di mercato) che scrittura (inviare ordini). Il pattern CQRS, le read replica di Aurora, DynamoDB come position cache e il pattern della materialized view servono tutti lo stesso obiettivo: il percorso di scrittura e piccolo, ben controllato e transazionale. Il percorso di lettura scala orizzontalmente senza contesa.


Riferimenti
#


Vuoi approfondire l’integrazione AI, il platform engineering o i sistemi backend? Offro sessioni di coaching 1:1 calibrate in base alla tua esperienza e ai tuoi obiettivi. Dai un’occhiata alla pagina coaching.

Articoli correlati