Skip to main content

Designing a High-Throughput Algorithmic Trading Platform on AWS: Architecture for 15,000 Concurrent Users

Table of Contents
A production-realistic architecture for a cloud-native algorithmic trading platform: 15,000 concurrent users, sub-second order acknowledgement, real-time market data streaming, CQRS, event sourcing, and the async-sync bridge that holds it all together.

The hardest problems in financial software are not algorithmic. They are architectural. A trading platform must be correct before it is fast, consistent before it is clever, and auditable before it is elegant. It must also bridge a fundamental mismatch that sits at the heart of every trading system ever built: users interact synchronously (they click “buy” and expect a response), but the systems underneath are inherently asynchronous. Broker APIs deliver fills via callbacks. Market data arrives as a continuous push stream. Risk calculations happen in parallel pipelines. Position state is eventually consistent by nature.

This post describes a complete architecture for a theoretical but production-realistic algorithmic trading platform built on AWS. The system serves 15,000 concurrent users, handles order execution with sub-second acknowledgement, streams real-time market data, enforces pre-trade risk controls, and maintains consistent position state across distributed services. I will walk through every layer, the patterns that make it work, and the tradeoffs involved.


The Core Tension: Synchronous Users, Asynchronous Markets
#

Before any diagram, the central problem deserves a precise statement.

When a user submits an order through the web UI:

  1. Their browser makes a synchronous HTTP POST. They expect a response immediately.
  2. The backend must validate the order, run pre-trade risk checks, and publish it to a processing queue.
  3. The Order Execution Service picks it up and submits it to the broker via the Broker Gateway.
  4. The broker’s API is inherently callback-based. The submission call returns immediately. The execution confirmation (fill, partial fill, or rejection) arrives later via a callback on a different thread or connection.
  5. The user, meanwhile, is watching their order status widget update in real time.

There is no single correct answer to this problem, but there is a well-defined architectural response: separate the acknowledgement of receipt from the acknowledgement of execution, use correlation IDs to track state through the async pipeline, and serve position and order state from a materialized view rather than from the live event stream.

These three ideas appear in every layer of what follows.


System Overview
#

flowchart TD
    CDN[CloudFront CDN\nStatic Assets] --> Users([15,000 Concurrent Users])
    Users --> WAF[AWS WAF]
    WAF --> ALB[External ALB\nTLS 1.3]
    ALB --> WebTier[Web Tier\nECS Fargate\n8 tasks min / 40 max]

    WebTier --> SessionCache[(ElastiCache Redis\nSession + Rate Limit)]
    WebTier --> APIGW[Internal API Gateway]

    APIGW --> OMS[Order Management\nService]
    APIGW --> MDS[Market Data\nService]
    APIGW --> PS[Position\nService]
    APIGW --> RS[Risk\nService]

    OMS --> MSK[(Amazon MSK\nKafka)]
    MDS --> Kinesis[(Amazon Kinesis\nData Streams)]
    RS --> MSK

    MSK --> ExecEngine[Execution\nEngine]
    MSK --> AuditSvc[Audit\nService]
    MSK --> NotifSvc[Notification\nService]

    ExecEngine --> BrokerGW[Broker\nGateway]
    BrokerGW --> ExtBroker([External Broker\nCallback API])

    BrokerGW --> MSK
    Kinesis --> MDS

    PS --> DDB[(DynamoDB\nPosition State)]
    OMS --> Aurora[(Aurora PostgreSQL\nOrder Ledger)]
    AuditSvc --> S3[(S3\nAudit Archive)]

    NotifSvc --> Redis2[(ElastiCache Redis\nPub/Sub)]
    WebTier --> Redis2

The eight primary services each have a single, bounded responsibility. This is not microservices for its own sake. Each boundary maps to a genuinely different consistency model, scaling profile, and operational concern.

ServiceResponsibilityConsistency Model
Web TierUser sessions, SSE, REST APIStateless
Order ManagementOrder lifecycle, validationStrong (transactional)
Risk ServicePre-trade checksEventually consistent read, sync write
Execution EngineBroker submission, fill handlingAt-least-once with idempotency
Broker GatewayAsync/sync bridge to broker APIStateful, single-writer
Market Data ServiceStream ingestion and distributionBest-effort, ordered
Position ServicePosition and P&L materializationEventually consistent
Audit ServiceImmutable event logAppend-only

SOLID Principles in Practice
#

Before going deeper into each service, it is worth stating which design principles govern the system boundaries. These are not decorative references. Each principle resolves a specific architectural decision.

Single Responsibility Principle: The Broker Gateway has exactly one job: bridging between the asyncio event loop and the broker’s callback-based API. It does not validate orders, does not compute risk, and does not update positions. Any logic beyond the bridge belongs elsewhere.

Open/Closed Principle: The Execution Engine is closed for modification but open for extension via a BrokerAdapter protocol. Adding support for a new broker (Interactive Brokers, Alpaca, Binance) means implementing the adapter interface, not touching the engine. The Strategy Engine works identically for signals, using a SignalGenerator protocol that strategies implement.

Liskov Substitution Principle: All broker adapters are interchangeable at the BrokerAdapter interface boundary. A paper trading adapter, a live trading adapter, and a replay adapter all behave identically from the Execution Engine’s perspective. Substitution is verified by a shared contract test suite.

Interface Segregation Principle: The Order Management Service exposes three separate interfaces: one for order placement (write-side), one for order query (read-side), and one for internal state change events consumed by downstream services. Clients that only query orders never depend on the write interface.

Dependency Inversion Principle: No service imports a concrete broker SDK, a specific database driver, or a specific message queue client directly. All infrastructure dependencies are injected via abstract interfaces. The Execution Engine’s unit tests run entirely in memory with a fake broker adapter.


The Async/Sync Bridge: The Broker Gateway
#

This is the most technically interesting component in the system, and the one most commonly implemented incorrectly.

Broker APIs (Interactive Brokers TWS, most FIX implementations) are inherently callback-based. You submit an order, and the confirmation arrives sometime later on a different callback. The naive implementation uses threads and blocking queues. The modern correct implementation uses Python’s asyncio with explicit Future bridging and call_soon_threadsafe.

Pattern 1: Sync-to-Async (Order Submission)
#

The user submits an order via HTTP POST. The web tier calls the Order Management Service synchronously. The OMS validates, persists the order, and publishes a command to Kafka. This is the sync-to-async transition: a synchronous user action converted into an asynchronous command. The HTTP response carries an order ID and status PENDING. The user’s order status widget subscribes via SSE for subsequent state changes.

broker_gateway/gateway.py
import asyncio
from dataclasses import dataclass
from typing import Protocol

class BrokerAdapter(Protocol):
    """Abstract broker interface. Concrete adapters implement this."""
    def submit_order(self, order_id: str, symbol: str,
                     side: str, qty: int, order_type: str) -> None: ...
    def cancel_order(self, order_id: str) -> None: ...
    def set_callbacks(self, on_fill: callable, on_reject: callable) -> None: ...

@dataclass
class OrderFill:
    order_id: str
    filled_qty: int
    avg_price: float
    timestamp: str

class BrokerGateway:
    """
    Bridges the broker's callback-based API (running on a separate thread
    or event loop) into asyncio Futures consumable by the Execution Engine.

    This solves the sync-to-async problem: the Execution Engine awaits
    a Future. The Future is resolved when the broker fires its callback,
    using call_soon_threadsafe to cross the thread boundary safely.
    """

    def __init__(self, adapter: BrokerAdapter):
        self._adapter = adapter
        self._pending: dict[str, asyncio.Future[OrderFill]] = {}
        self._loop: asyncio.AbstractEventLoop | None = None

    def start(self, loop: asyncio.AbstractEventLoop) -> None:
        self._loop = loop
        self._adapter.set_callbacks(
            on_fill=self._on_fill_callback,
            on_reject=self._on_reject_callback,
        )

    async def submit(self, order_id: str, symbol: str,
                     side: str, qty: int, order_type: str,
                     timeout: float = 10.0) -> OrderFill:
        """
        Submits order to broker and awaits confirmation.
        Returns when the broker confirms the fill or raises on timeout/reject.
        """
        loop = asyncio.get_running_loop()
        future: asyncio.Future[OrderFill] = loop.create_future()
        self._pending[order_id] = future

        # Non-blocking submission
        self._adapter.submit_order(order_id, symbol, side, qty, order_type)

        try:
            return await asyncio.wait_for(future, timeout=timeout)
        except asyncio.TimeoutError:
            self._pending.pop(order_id, None)
            raise BrokerTimeoutError(order_id)

    def _on_fill_callback(self, order_id: str, filled_qty: int,
                           avg_price: float, timestamp: str) -> None:
        """
        Called by the broker SDK on its own thread.
        Must NOT touch asyncio primitives directly.
        call_soon_threadsafe schedules the resolution in the event loop.
        """
        future = self._pending.pop(order_id, None)
        if future and not future.done():
            fill = OrderFill(order_id, filled_qty, avg_price, timestamp)
            self._loop.call_soon_threadsafe(future.set_result, fill)

    def _on_reject_callback(self, order_id: str, reason: str) -> None:
        future = self._pending.pop(order_id, None)
        if future and not future.done():
            exc = OrderRejectedException(order_id, reason)
            self._loop.call_soon_threadsafe(future.set_exception, exc)

The critical detail is call_soon_threadsafe. The broker SDK fires callbacks from its own thread. Calling future.set_result() directly from a foreign thread would cause race conditions inside the asyncio internals. call_soon_threadsafe schedules the resolution to happen inside the event loop on its own thread, making the operation safe.

Pattern 2: Async-to-Sync (Position Queries)
#

The opposite problem is equally common. Market data and order fills arrive as an async stream. A user’s position query needs a consistent, synchronous answer.

The solution is the materialized view pattern. The Position Service subscribes to fill events from Kafka, updates a DynamoDB table in near-real-time, and serves position queries directly from DynamoDB. The user never queries the live event stream. They query a pre-computed, indexed snapshot.

sequenceDiagram
    participant BrokerGW as Broker Gateway
    participant Kafka as Amazon MSK (Kafka)
    participant PosSvc as Position Service
    participant DDB as DynamoDB
    participant WebTier as Web Tier
    participant User

    BrokerGW->>Kafka: publish order.filled event
    Kafka->>PosSvc: consume order.filled
    PosSvc->>DDB: update_item (atomic increment qty, recalc P&L)
    DDB-->>PosSvc: ok

    Note over PosSvc,DDB: Async pipeline, eventually consistent

    User->>WebTier: GET /api/positions
    WebTier->>PosSvc: getPositions(userId)
    PosSvc->>DDB: query by userId
    DDB-->>PosSvc: position snapshot
    PosSvc-->>WebTier: positions
    WebTier-->>User: 200 OK (consistent snapshot)

    Note over WebTier,User: Sync request served from materialized view

The async-to-sync bridge here is simply a Kafka consumer that writes to a queryable store. The key decisions are: the DynamoDB write must be idempotent (the event carries a unique fill ID used as the idempotency key), and the write must be atomic for quantity and P&L fields to avoid partial updates.

Pattern 3: Correlation ID Request-Response
#

For operations that require a synchronous-feeling response from an inherently async system (such as requesting a real-time quote before order submission), the system uses the correlation ID pattern over Kafka.

sequenceDiagram
    participant WebTier as Web Tier
    participant OMS as Order Management Service
    participant Kafka
    participant ExecEngine as Execution Engine
    participant BrokerGW as Broker Gateway

    WebTier->>OMS: POST /orders (sync HTTP)
    OMS->>OMS: validate + persist (status=PENDING)
    OMS->>Kafka: publish order.submitted (correlation_id=X)
    OMS-->>WebTier: 202 Accepted (order_id, status=PENDING)

    Note over WebTier: User UI subscribes via SSE for status updates

    Kafka->>ExecEngine: consume order.submitted
    ExecEngine->>BrokerGW: await submit(order, timeout=10s)
    BrokerGW->>BrokerGW: call_soon_threadsafe awaits fill callback

    BrokerGW-->>ExecEngine: OrderFill
    ExecEngine->>Kafka: publish order.filled (correlation_id=X)

    Kafka->>OMS: consume order.filled -> update order status
    Kafka->>PosSvc: consume order.filled -> update positions
    Kafka->>NotifSvc: consume order.filled -> push to user SSE
    NotifSvc-->>WebTier: SSE event: order X filled
    WebTier-->>User: real-time status update

The user submits an order and immediately gets a 202 Accepted with an order ID. They do not wait for broker execution. The SSE channel pushes the fill notification when it arrives. From the user’s perspective, the experience is near-real-time. From the system’s perspective, every component is doing exactly what it does best.


Order Management Service: Command and Query Separation
#

The Order Management Service implements CQRS (Command Query Responsibility Segregation) explicitly. Write operations (place order, cancel order, amend order) go through a command handler that persists to Aurora PostgreSQL and publishes events to Kafka. Read operations (get order, list orders, order history) query a read model maintained by a projection process consuming the same Kafka events.

flowchart LR
    subgraph Write Side
        CMD[Command Handler] --> Validate[Validator]
        Validate --> RiskCheck[Risk Pre-Check\nSync call to Risk Service]
        RiskCheck --> Persist[(Aurora PostgreSQL\nOrder Ledger)]
        RiskCheck --> Publish[Kafka Publisher]
    end

    subgraph Read Side
        Projection[Projection Process\nKafka Consumer] --> ReadDB[(Aurora Read Replica\nOrder Read Model)]
        QueryHandler[Query Handler] --> ReadDB
    end

    subgraph External
        Publish --> KafkaTopic[(order.commands\ntopic)]
        KafkaTopic --> Projection
    end
oms/commands.py
from dataclasses import dataclass, field
from enum import Enum
from typing import Protocol
import uuid

class OrderSide(str, Enum):
    BUY = "buy"
    SELL = "sell"

class OrderType(str, Enum):
    MARKET = "market"
    LIMIT = "limit"
    STOP = "stop"

@dataclass(frozen=True)
class PlaceOrderCommand:
    """Immutable command. Implements the Command Pattern."""
    user_id: str
    symbol: str
    side: OrderSide
    qty: int
    order_type: OrderType
    limit_price: float | None = None
    correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))

class CommandHandler(Protocol):
    """ISP: command handler interface segregated from query interface."""
    async def handle(self, command: PlaceOrderCommand) -> str: ...

class PlaceOrderHandler:
    """
    Single Responsibility: coordinates validation, risk check, and persistence.
    Dependency Inversion: depends on abstractions, not concrete implementations.
    """

    def __init__(
        self,
        validator: "OrderValidator",
        risk_client: "RiskServiceClient",
        repository: "OrderRepository",
        publisher: "KafkaPublisher",
    ):
        self._validator = validator
        self._risk = risk_client
        self._repo = repository
        self._publisher = publisher

    async def handle(self, cmd: PlaceOrderCommand) -> str:
        # Validate structure
        self._validator.validate(cmd)

        # Synchronous pre-trade risk check (blocking, must pass before persist)
        risk_result = await self._risk.check(cmd)
        if not risk_result.approved:
            raise RiskViolationError(risk_result.reason)

        # Persist with PENDING status
        order_id = await self._repo.insert(cmd)

        # Publish command event (fire-and-forget, broker will confirm later)
        await self._publisher.publish("order.submitted", {
            "order_id": order_id,
            "correlation_id": cmd.correlation_id,
            "user_id": cmd.user_id,
            "symbol": cmd.symbol,
            "side": cmd.side,
            "qty": cmd.qty,
            "order_type": cmd.order_type,
            "limit_price": cmd.limit_price,
        })

        return order_id

The pre-trade risk check is deliberately synchronous here. The order must not be persisted or published if it violates risk limits. This is the one place in the order flow where the system blocks.


Risk Engine: Real-Time Pre-Trade Controls
#

The Risk Engine enforces limits before any order reaches the broker. It maintains limit tables in Redis (fast reads) and subscribes to position and market data events to keep computed exposures current.

flowchart TD
    subgraph Risk Engine
        RiskAPI[Risk API\nSync gRPC]
        LimitStore[(Redis\nLimit Tables)]
        ExposureCalc[Exposure Calculator\nAsync Consumer]
        RulesEngine[Rules Engine\nOpen/Closed via Rule Protocol]

        RiskAPI --> RulesEngine
        RulesEngine --> LimitStore
        ExposureCalc --> LimitStore
    end

    OMS[Order Management\nService] -->|gRPC check| RiskAPI
    MSK[(Kafka)] -->|order.filled\nposition.updated| ExposureCalc
    MktData[(Kinesis)] -->|price.updated| ExposureCalc

Risk rules follow the Open/Closed Principle: the engine executes any object implementing the RiskRule protocol. Adding a new rule requires no modification to the engine itself.

risk/engine.py
from typing import Protocol

class RiskRule(Protocol):
    """Open for extension: implement this to add new risk controls."""
    name: str
    async def evaluate(self, cmd: "PlaceOrderCommand",
                       context: "RiskContext") -> "RuleResult": ...

class MaxOrderSizeRule:
    name = "max_order_size"

    async def evaluate(self, cmd, context):
        limit = await context.limits.get(cmd.user_id, "max_order_size")
        notional = cmd.qty * context.last_price(cmd.symbol)
        if notional > limit:
            return RuleResult(approved=False,
                              reason=f"Order notional {notional:.2f} exceeds limit {limit:.2f}")
        return RuleResult(approved=True)

class MaxConcentrationRule:
    name = "max_concentration"

    async def evaluate(self, cmd, context):
        current_exposure = await context.exposure(cmd.user_id, cmd.symbol)
        new_qty = current_exposure.qty + (cmd.qty if cmd.side == "buy" else -cmd.qty)
        portfolio_value = await context.portfolio_value(cmd.user_id)
        concentration = abs(new_qty * context.last_price(cmd.symbol)) / portfolio_value
        if concentration > 0.20:  # 20% max single-name concentration
            return RuleResult(approved=False,
                              reason=f"Concentration {concentration:.1%} exceeds 20% limit")
        return RuleResult(approved=True)

class RiskEngine:
    """Closed for modification. Add rules by passing them at construction."""

    def __init__(self, rules: list[RiskRule], context_factory: "RiskContextFactory"):
        self._rules = rules
        self._context_factory = context_factory

    async def check(self, cmd: "PlaceOrderCommand") -> "RiskResult":
        context = await self._context_factory.build(cmd.user_id)
        for rule in self._rules:
            result = await rule.evaluate(cmd, context)
            if not result.approved:
                return RiskResult(approved=False, reason=f"[{rule.name}] {result.reason}")
        return RiskResult(approved=True)

The risk check runs in under 5 milliseconds for well-warmed Redis caches. If Redis is unavailable, the engine fails closed: all orders are rejected until the cache recovers. This is the correct failure mode for a risk system.


Market Data Architecture: Kinesis and Fan-Out
#

Market data is the highest-throughput component. At peak, the system ingests price updates for thousands of symbols across multiple exchanges simultaneously. The architecture separates ingestion (raw feed to Kinesis) from distribution (fan-out to subscribers).

flowchart TD
    subgraph Ingestion
        Feed1[Exchange Feed A\nWebSocket] --> Normalizer1[Normalizer Lambda]
        Feed2[Exchange Feed B\nFIX] --> Normalizer2[Normalizer Lambda]
        Feed3[Exchange Feed C\nREST Poll] --> Normalizer3[Normalizer Lambda]
        Normalizer1 --> Kinesis[(Kinesis Data Streams\n8 shards)]
        Normalizer2 --> Kinesis
        Normalizer3 --> Kinesis
    end

    subgraph Distribution
        Kinesis --> MDSConsumer[Market Data Service\nKinesis Consumer]
        MDSConsumer --> PriceCache[(Redis\nLast Price Cache\nTTL 5s)]
        MDSConsumer --> CandleAgg[Candle Aggregator\n1m 5m 1h OHLCV]
        MDSConsumer --> RiskFeed[Risk Engine Feed\nvia SQS]
        MDSConsumer --> SSEBroadcast[SSE Broadcaster\nper-symbol channels]
    end

    subgraph User Delivery
        SSEBroadcast --> RedisPS[(Redis Pub/Sub\nper-symbol channels)]
        WebTier[Web Tier\nSSE endpoint] --> RedisPS
        Users([Users with\nactive subscriptions]) --> WebTier
    end

The normalizer lambdas convert exchange-specific formats into a canonical PriceUpdate schema before writing to Kinesis. Each shard handles a non-overlapping range of symbols (partitioned by symbol hash). This guarantees ordering per-symbol while enabling horizontal parallelism.

market_data/consumer.py
import asyncio
import json
from dataclasses import dataclass

@dataclass(frozen=True)
class PriceUpdate:
    symbol: str
    bid: float
    ask: float
    last: float
    volume: int
    timestamp_ms: int
    exchange: str

class MarketDataConsumer:
    """
    Async consumer for Kinesis records.
    Applies the Observer Pattern: registered handlers are called for each update.
    Handlers are segregated by interest (ISP): risk only subscribes to price,
    candle builder subscribes to price+volume, SSE broadcast subscribes to all.
    """

    def __init__(self, kinesis_client, redis_client, publisher):
        self._kinesis = kinesis_client
        self._redis = redis_client
        self._publisher = publisher
        self._handlers: list[callable] = []

    def register(self, handler: callable) -> None:
        """Open/Closed: add handlers without modifying the consumer."""
        self._handlers.append(handler)

    async def run(self, stream_name: str, shard_id: str) -> None:
        iterator = await self._get_shard_iterator(stream_name, shard_id)
        while True:
            records = await self._kinesis.get_records(ShardIterator=iterator,
                                                       Limit=500)
            for record in records["Records"]:
                update = PriceUpdate(**json.loads(record["Data"]))
                await self._process(update)

            iterator = records["NextShardIterator"]
            if not records["Records"]:
                await asyncio.sleep(0.1)  # Kinesis polling interval

    async def _process(self, update: PriceUpdate) -> None:
        # Update last price cache (used by risk engine)
        await self._redis.setex(
            f"price:{update.symbol}",
            5,  # TTL: stale prices are dangerous
            f"{update.last}",
        )

        # Fan out to all registered handlers concurrently
        await asyncio.gather(
            *[handler(update) for handler in self._handlers],
            return_exceptions=True,  # One failing handler must not block others
        )

The return_exceptions=True in asyncio.gather is not optional here. A bug in the candle aggregator must not prevent the risk feed from receiving price updates. Each handler’s failures are logged but do not propagate.


Web Tier: Scaling to 15,000 Concurrent Users
#

Serving 15,000 concurrent users requires thinking carefully about what “concurrent” actually means. Most of those connections are idle most of the time: a user with the trading dashboard open is maintaining an SSE connection for live data but is not actively making API calls. The connection count is therefore much higher than the request rate.

flowchart TD
    CF[CloudFront\nStatic SPA\nCache-Control: immutable] --> ALB
    ALB[External ALB\nHTTP/2\nSticky for SSE] --> ECS

    subgraph ECS Fargate Cluster
        ECS[Web Service\n8 min / 40 max tasks\n2 vCPU / 4 GB each]
        ECS --> Uvicorn[Uvicorn ASGI\n4 workers per task]
    end

    ECS --> SessionRedis[(ElastiCache Redis\nSession Store\nCluster Mode 3 shards)]
    ECS --> RateLimit[(ElastiCache Redis\nRate Limiter\nSliding Window per user)]
    ECS --> SSERedis[(ElastiCache Redis\nPub/Sub\nMarket Data Fan-out)]
    ECS --> Services[Internal Services\nvia Service Discovery]

    subgraph Autoscaling
        CW[CloudWatch\nMetrics] --> ASG[ECS Service\nAutoscaling]
        ASG --> ECS
    end

Key decisions for the web tier at this scale:

ASGI over WSGI. Django with Uvicorn in ASGI mode handles concurrent SSE connections efficiently. A synchronous WSGI server would spawn a thread per connection, exhausting resources at this concurrency level. ASGI uses cooperative multitasking: waiting on SSE or database I/O yields the event loop to other requests.

SSE over WebSocket for market data. SSE is unidirectional (server to client), which is all market data delivery needs. It works over HTTP/2, survives load balancer reconnects, and is simpler to implement and debug than WebSocket at this scale. Order submission uses regular POST requests.

Sticky sessions on ALB only for SSE. SSE connections are long-lived and tied to a specific task. ALB duration-based stickiness (1-hour cookie) ensures SSE connections are not routed to a different task mid-stream. Normal API requests use round-robin.

Rate limiting at the web tier, not the services. Each task enforces a per-user sliding window rate limit via Redis before proxying to internal services. This prevents individual users from saturating downstream services with automated scripts.

web/sse.py
import asyncio
import json
from django.http import StreamingHttpResponse

async def market_data_stream(request, symbols: str):
    """
    SSE endpoint. Each active subscription is a long-lived async generator.
    The ALB sticky session ensures this generator stays on one task.
    """
    symbol_list = symbols.split(",")
    user_id = request.user.id
    pubsub = request.app.redis.pubsub()

    # Subscribe to per-symbol Redis channels
    channels = [f"mkt:{sym}" for sym in symbol_list]
    await pubsub.subscribe(*channels)

    async def event_generator():
        try:
            # Send initial snapshot from cache
            for symbol in symbol_list:
                price = await request.app.redis.get(f"price:{symbol}")
                if price:
                    yield f"data: {json.dumps({'symbol': symbol, 'price': float(price)})}\n\n"

            # Stream live updates
            async for message in pubsub.listen():
                if message["type"] == "message":
                    yield f"data: {message['data'].decode()}\n\n"

                # Heartbeat to detect dead connections
                yield ": heartbeat\n\n"
                await asyncio.sleep(15)

        except asyncio.CancelledError:
            pass
        finally:
            await pubsub.unsubscribe(*channels)

    response = StreamingHttpResponse(
        event_generator(),
        content_type="text/event-stream",
    )
    response["Cache-Control"] = "no-cache"
    response["X-Accel-Buffering"] = "no"
    return response

The heartbeat comment-only SSE line (: heartbeat) serves two purposes: it detects dead connections on proxies that time out idle connections, and it prevents ALB idle timeout from terminating the stream. The interval should be shorter than the ALB idle timeout (default 60 seconds, recommended 30 seconds for SSE).

Autoscaling policy. The ECS service scales on a composite metric: average CPU utilization above 60% OR average active connection count above 3,000 per task triggers scale-out. Scale-in requires both metrics to drop below their thresholds for 5 consecutive minutes to prevent thrashing.


Event Sourcing and the Audit Service
#

Every order, fill, cancellation, amendment, and risk decision is an event. The Audit Service consumes all Kafka topics and writes immutable records to S3 in Parquet format, partitioned by date and event type. This serves two purposes: regulatory compliance (many jurisdictions require 7-year retention of trading records) and event replay (any service’s state can be reconstructed from scratch by replaying its events).

audit/consumer.py
import pyarrow as pa
import pyarrow.parquet as pq
from dataclasses import asdict

class AuditConsumer:
    """
    Append-only event log. No updates, no deletes.
    Implements the Repository Pattern over S3 as the backing store.
    """

    def __init__(self, s3_client, bucket: str):
        self._s3 = s3_client
        self._bucket = bucket
        self._buffer: list[dict] = []
        self._flush_interval_s = 30
        self._max_buffer_size = 10_000

    async def consume(self, event: dict) -> None:
        self._buffer.append(event)
        if len(self._buffer) >= self._max_buffer_size:
            await self._flush()

    async def _flush(self) -> None:
        if not self._buffer:
            return

        table = pa.Table.from_pylist(self._buffer)
        key = self._partition_key(self._buffer[0])

        # Write to S3 as Parquet
        buf = pa.BufferOutputStream()
        pq.write_table(table, buf, compression="snappy")

        await self._s3.put_object(
            Bucket=self._bucket,
            Key=key,
            Body=buf.getvalue().to_pybytes(),
        )
        self._buffer.clear()

    def _partition_key(self, event: dict) -> str:
        ts = event["timestamp"][:10]  # YYYY-MM-DD
        event_type = event["type"]
        return f"events/dt={ts}/type={event_type}/{event['id']}.parquet"

Parquet with Snappy compression achieves roughly 10:1 compression on JSON event data. The S3 partition scheme (dt=YYYY-MM-DD/type=event_type) is compatible with Athena for ad-hoc SQL queries without any ETL pipeline.


Data Architecture: Aurora, DynamoDB, and Redis
#

Different data has different characteristics. Using one database for everything is the most common architectural mistake in trading platforms.

flowchart LR
    subgraph Transactional
        Aurora[(Aurora PostgreSQL\nServerless v2\nWriter + 2 Read Replicas)]
        OMS --> Aurora
        Note1[Orders, users, accounts\nStrong consistency\nACID transactions]
    end

    subgraph Hot State
        DDB[(DynamoDB\nOn-Demand\nDAX cache optional)]
        PosSvc --> DDB
        Note2[Positions, open orders\nms-level reads\nAtomic increments]
    end

    subgraph Cache
        Redis[(ElastiCache Redis\nCluster Mode\n3 primary shards)]
        Note3[Last prices, sessions\nrate limits, pub/sub\nTTL enforced]
    end

    subgraph Cold Archive
        S3[(S3 + Glacier\nIntelligent Tiering)]
        Note4[Audit events, backfill\nhistorical data\nAthena queryable]
    end

Aurora PostgreSQL handles order lifecycle, user accounts, and transaction history. Writes go to the writer instance. Query-heavy operations (order history, account statements) use read replicas. Aurora Serverless v2 auto-scales ACUs (Aurora Capacity Units) between 0.5 and 64 ACUs based on load, eliminating over-provisioning costs during off-market hours.

DynamoDB handles positions and open order state. The position record for a user-symbol pair is updated atomically on every fill using UpdateItem with ADD operations. This is the correct way to handle concurrent fill updates without transactions: atomic increment semantics prevent lost updates even with multiple Position Service consumers running in parallel.

position/repository.py
class PositionRepository:
    """
    Dependency Inversion: depends on abstract DynamoDB client, not boto3 directly.
    Atomic update pattern: no read-modify-write race conditions.
    """

    def __init__(self, table):
        self._table = table

    async def apply_fill(self, user_id: str, symbol: str,
                         fill_qty: int, avg_price: float,
                         fill_id: str, side: str) -> None:
        signed_qty = fill_qty if side == "buy" else -fill_qty
        notional = fill_qty * avg_price

        await self._table.update_item(
            Key={"pk": f"USER#{user_id}", "sk": f"POS#{symbol}"},
            UpdateExpression=(
                "ADD qty :q, total_cost :c "
                "SET last_updated = :ts, last_fill_id = :fid"
            ),
            ExpressionAttributeValues={
                ":q": signed_qty,
                ":c": notional if side == "buy" else -notional,
                ":ts": datetime.utcnow().isoformat(),
                ":fid": fill_id,
                ":prev_fid": fill_id,
            },
            # Idempotency: skip if this fill was already applied
            ConditionExpression="attribute_not_exists(last_fill_id) OR last_fill_id <> :prev_fid",
        )

The ConditionExpression provides idempotency. If the consumer crashes after applying a fill but before committing the Kafka offset, the same fill event is redelivered. The condition prevents it from being applied twice.


Circuit Breaker and Resilience Patterns
#

A trading platform that fails open is worse than one that fails closed. Every external call (broker gateway, risk service, market data) is wrapped in a circuit breaker.

resilience/circuit_breaker.py
import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal: calls pass through
    OPEN = "open"          # Tripped: calls fail fast
    HALF_OPEN = "half_open"  # Recovery probe

class CircuitBreaker:
    """
    Single Responsibility: manages open/closed state for a single dependency.
    Fails fast when the dependency is unhealthy rather than queuing up calls.
    """

    def __init__(self, failure_threshold: int = 5,
                 recovery_timeout_s: float = 30.0,
                 success_threshold: int = 2):
        self._failure_threshold = failure_threshold
        self._recovery_timeout = recovery_timeout_s
        self._success_threshold = success_threshold
        self._failures = 0
        self._successes = 0
        self._last_failure_time: float | None = None
        self._state = CircuitState.CLOSED

    async def call(self, func, *args, **kwargs):
        if self._state == CircuitState.OPEN:
            if time.monotonic() - self._last_failure_time > self._recovery_timeout:
                self._state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError("Circuit breaker is OPEN")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as exc:
            self._on_failure()
            raise

    def _on_success(self):
        if self._state == CircuitState.HALF_OPEN:
            self._successes += 1
            if self._successes >= self._success_threshold:
                self._state = CircuitState.CLOSED
                self._failures = 0
                self._successes = 0
        else:
            self._failures = 0

    def _on_failure(self):
        self._failures += 1
        self._last_failure_time = time.monotonic()
        if self._failures >= self._failure_threshold:
            self._state = CircuitState.OPEN

When the broker gateway circuit breaks open, the Execution Engine immediately starts publishing orders to a dead_letter Kafka topic with BROKER_UNAVAILABLE status, and the notification service pushes alerts to affected users. The system degrades gracefully rather than silently failing.


Infrastructure as Code
#

The full system deploys via Terraform. Each service has its own ECS task definition, and the shared infrastructure (MSK, ElastiCache, Aurora, VPC) is managed by a separate foundation module.

flowchart TD
    subgraph VPC 10.0.0.0/16
        subgraph Public Subnets
            ALBExt[External ALB]
        end
        subgraph Private App Subnets
            ECSCluster[ECS Cluster\n8 services]
            ALBInt[Internal ALB\nservice mesh]
        end
        subgraph Private Data Subnets
            Aurora[(Aurora\nMulti-AZ)]
            DDB_EP[DynamoDB VPC\nEndpoint]
            Redis[(ElastiCache\nMulti-AZ)]
            MSK[(Amazon MSK\n3 broker nodes\n3 AZs)]
        end
    end

    IGW[Internet Gateway] --> ALBExt
    ECSCluster --> ALBInt
    ALBInt --> ECSCluster
    ECSCluster --> Aurora
    ECSCluster --> Redis
    ECSCluster --> MSK
    ECSCluster --> DDB_EP
    Kinesis_EP[Kinesis VPC\nEndpoint] --> ECSCluster
    S3_EP[S3 VPC\nEndpoint] --> ECSCluster

MSK runs 3 broker nodes across 3 Availability Zones with a replication factor of 3 for all critical topics (order.submitted, order.filled, position.updated). The market.price topic uses replication factor 2 and acks=1 (the producer does not wait for follower replication) because occasional price update loss is acceptable. Missing a fill event is not.

Topics and their configurations:

TopicPartitionsReplicationAcksRetention
order.submitted123all7 days
order.filled123all7 days
position.updated123all7 days
market.price64211 hour
audit.events43all30 days

Observability
#

A distributed system at this complexity is impossible to operate without structured observability from day one.

Structured logging: every log line is JSON with trace_id, user_id, service, level, and correlation_id. The correlation ID that tracks an order from submission to fill appears in every log line in the chain. A single Athena query on the S3 log archive can reconstruct the full timeline of any order.

Distributed tracing: AWS X-Ray traces span the web tier, OMS, Risk Engine, and Execution Engine. The Kafka message headers carry the trace context so that the span continues on the consumer side, not just the producer side.

Custom metrics: beyond the standard ECS/Lambda/Aurora metrics, every service publishes application-level metrics via CloudWatch Embedded Metric Format:

  • Order submission latency (p50, p95, p99)
  • Risk check latency
  • Broker gateway fill latency (from submission to fill callback)
  • Position update lag (time between fill event and materialized view consistency)
  • Kafka consumer lag per consumer group
  • SSE connection count per web tier task

The broker gateway fill latency metric deserves special attention. A spike here indicates broker-side issues before they cause visible order failures. Alerting on p95 fill latency above 2 seconds triggers an on-call notification and automatically increases the Broker Gateway’s timeout parameter from 10 seconds to 30 seconds, giving the broker more time before the circuit opens.


Design Patterns Summary
#

PatternWhere AppliedWhy
Command PatternPlaceOrderCommand, CancelOrderCommandEncapsulates a user’s intent as an immutable object, decouples sender from handler
Observer PatternMarket Data Consumer, Risk FeedMultiple handlers notified of price updates without coupling to each other
Adapter PatternBrokerAdapter protocol, exchange normalizersBroker-specific APIs converted to a uniform interface
Repository PatternOrderRepository, PositionRepositoryPersistence logic isolated from business logic, injectable and testable
Circuit BreakerAll external callsFail fast, prevent cascading failures, degrade gracefully
CQRSOrder Management ServiceSeparate write and read models, independent scaling, consistent reads from projections
Materialized ViewPosition ServiceAsync fills materialized into a queryable snapshot, async-to-sync bridge
Correlation IDOrder flow end-to-endTraces a synchronous user action through an asynchronous pipeline

Lessons from This Design
#

Correctness over performance. The system accepts a 200-250ms overhead for pre-trade risk checks on every order. This is not a performance problem. An order that bypasses risk controls due to a latency optimization is an existential problem. Correctness constraints are not negotiable.

The async boundary must be explicit. Every place where the system crosses from synchronous to asynchronous (HTTP to Kafka, Kafka to broker callback, broker callback to asyncio Future) is an explicit, named seam in the code. Implicit async transitions are where subtle bugs live: off-by-one in event ordering, race conditions in Future resolution, lost events in restart scenarios.

Idempotency is not optional. At-least-once delivery (the only safe guarantee for Kafka without significant performance overhead) means every consumer must be idempotent. The position update’s ConditionExpression idempotency key, the fill event’s deduplication in the OMS, and the audit event’s content-addressable S3 key all follow the same principle: applying the same event twice must produce the same result as applying it once.

Fail closed, not open. Risk systems, circuit breakers, and rate limiters all fail in the direction that prevents dangerous action. If the risk service is unreachable, orders are rejected. If the broker circuit breaks open, orders queue in dead letter. If the Redis session store is down, users are logged out. Availability is important, but a financial system that makes uncontrolled decisions under failure is worse than one that refuses to act.

Scale the read path, not the write path. 15,000 users do far more reading (viewing positions, checking order status, watching market data) than writing (submitting orders). The CQRS pattern, Aurora read replicas, DynamoDB as a position cache, and the materialized view pattern all serve the same goal: the write path is small, well-controlled, and transactional. The read path scales horizontally with zero contention.


References
#


Want to go deeper on AI integration, platform engineering, or backend systems? I offer 1:1 coaching sessions tailored to your background and goals. Check out the coaching page.

Related