Skip to main content

NATS Messaging in Go: Core, JetStream, and Production Patterns

·9 mins
Table of Contents
NATS is a single binary, sub-millisecond messaging system built for cloud-native workloads. It is not Kafka, and it is not Redis Streams. Knowing when to choose it and what it cannot do is as important as knowing how to use it. This post covers core NATS pub/sub, queue groups, request/reply, JetStream persistence, pull consumers, and authentication.

Kafka, NATS, Redis Streams, and RabbitMQ all describe themselves as “messaging systems.” They are not interchangeable. Kafka is optimised for high-throughput event log storage with long-term replay. Redis Streams is a good fit when you already have Redis and need a lightweight consumer group. RabbitMQ is the right choice when you need complex routing and AMQP semantics.

NATS occupies a different niche: it is extremely lightweight (a single binary under 20MB), starts in milliseconds, delivers sub-millisecond round-trip latency, and handles automatic load balancing across consumers out of the box. For control planes, microservice meshes, IoT telemetry, and internal RPC-style communication, NATS is often the cleanest choice. For durable event sourcing or guaranteed exactly-once delivery, NATS JetStream extends the core with persistence and acknowledgement.

Architecture Overview
#

flowchart TD
    Publisher[Publisher] -->|Publish| Server[NATS Server]
    Server -->|Fan-out| Sub1[Subscriber A]
    Server -->|Fan-out| Sub2[Subscriber B]
    Server -->|Load balanced| QG[Queue Group\nworkers.orders]
    QG --> W1[Worker 1]
    QG --> W2[Worker 2]
    Server -->|Persisted| JS[(JetStream\nStream)]
    JS -->|Push / Pull| Consumer[Durable Consumer]

Connection Setup with Production Options
#

Install the client:

go get github.com/nats-io/nats.go

A production connection needs TLS, a reconnect policy, and an error handler. Relying on the default bare nats.Connect("nats://localhost:4222") is fine for local development and nothing else.

internal/messaging/client.go
package messaging

import (
    "crypto/tls"
    "log/slog"
    "time"

    "github.com/nats-io/nats.go"
)

func NewConn(serverURL string, opts ...nats.Option) (*nats.Conn, error) {
    defaults := []nats.Option{
        // Reconnect up to 10 times with 2-second intervals.
        nats.MaxReconnects(10),
        nats.ReconnectWait(2 * time.Second),

        // Log connection lifecycle events.
        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
            slog.Warn("nats disconnected", "err", err)
        }),
        nats.ReconnectHandler(func(nc *nats.Conn) {
            slog.Info("nats reconnected", "url", nc.ConnectedUrl())
        }),
        nats.ClosedHandler(func(nc *nats.Conn) {
            slog.Error("nats connection closed permanently")
        }),

        // TLS for production; remove for local dev.
        nats.Secure(&tls.Config{MinVersion: tls.VersionTLS12}),
    }

    merged := append(defaults, opts...)
    return nats.Connect(serverURL, merged...)
}
Important

NATS core is a fire-and-forget system. If no subscriber is listening when a message is published, the message is gone. For any use case that requires durability, use JetStream. The reconnect handler above will re-establish subscriptions on reconnect only if your code re-subscribes in the ReconnectHandler. Core subscriptions survive reconnects automatically; JetStream consumers do not need re-subscription because they are server-side state.

Core Pub/Sub: Subject Hierarchies and Wildcards
#

NATS subjects are dot-separated strings. Two wildcards let you build flexible routing:

  • * matches exactly one token: service.*.events matches service.orders.events and service.payments.events but not service.orders.internal.events.
  • > matches one or more tokens: logs.> matches everything under logs..
internal/messaging/subscribe.go
package messaging

import (
    "log/slog"

    "github.com/nats-io/nats.go"
)

// SubscribeEvents subscribes to all service event subjects.
func SubscribeEvents(nc *nats.Conn) (*nats.Subscription, error) {
    return nc.Subscribe("service.*.events", func(msg *nats.Msg) {
        slog.Info("received event",
            "subject", msg.Subject,
            "data", string(msg.Data),
        )
    })
}

// SubscribeAllLogs subscribes to the entire logs hierarchy.
func SubscribeAllLogs(nc *nats.Conn) (*nats.Subscription, error) {
    return nc.Subscribe("logs.>", func(msg *nats.Msg) {
        slog.Info("log entry", "subject", msg.Subject, "data", string(msg.Data))
    })
}
Note

Subject naming convention matters. Use lowercase, dot-separated nouns: orders.created, payments.processed, inventory.updated. Avoid generic names like events or messages at the top level, as they make wildcard subscriptions ambiguous and debugging harder.

Queue Subscriptions: Load Balancing
#

A queue subscription (also called a queue group) is the primary production pattern for distributing work across multiple instances of the same service. Any message published to the subject is delivered to exactly one member of the group.

internal/messaging/queue.go
package messaging

import (
    "encoding/json"
    "fmt"
    "log/slog"

    "github.com/nats-io/nats.go"
)

type OrderEvent struct {
    OrderID string `json:"order_id"`
    Amount  int64  `json:"amount"`
}

// StartOrderWorker subscribes to the queue group "workers.orders".
// Multiple instances of this function across different processes
// will share the message load automatically.
func StartOrderWorker(nc *nats.Conn) (*nats.Subscription, error) {
    return nc.QueueSubscribe("orders.created", "workers.orders", func(msg *nats.Msg) {
        var event OrderEvent
        if err := json.Unmarshal(msg.Data, &event); err != nil {
            slog.Error("failed to unmarshal order event", "err", err)
            return
        }
        slog.Info("processing order", "order_id", event.OrderID, "amount", event.Amount)
        // process...
    })
}

// Publish sends an order event.
func PublishOrder(nc *nats.Conn, event OrderEvent) error {
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("marshal order event: %w", err)
    }
    return nc.Publish("orders.created", data)
}

Request/Reply: Synchronous RPC over NATS
#

NATS has a built-in request/reply pattern. The publisher sends a message with a reply subject embedded; any subscriber that responds to that reply subject completes the call. nc.Request handles this transparently with a timeout.

internal/messaging/rpc.go
package messaging

import (
    "encoding/json"
    "fmt"
    "time"

    "github.com/nats-io/nats.go"
)

type PriceRequest struct {
    ProductID string `json:"product_id"`
}

type PriceResponse struct {
    Price int64  `json:"price"`
    Error string `json:"error,omitempty"`
}

// RequestPrice sends a synchronous price lookup via NATS request/reply.
func RequestPrice(nc *nats.Conn, productID string) (PriceResponse, error) {
    payload, _ := json.Marshal(PriceRequest{ProductID: productID})

    msg, err := nc.Request("pricing.lookup", payload, 2*time.Second)
    if err != nil {
        return PriceResponse{}, fmt.Errorf("price request: %w", err)
    }

    var resp PriceResponse
    if err := json.Unmarshal(msg.Data, &resp); err != nil {
        return PriceResponse{}, fmt.Errorf("price response unmarshal: %w", err)
    }
    return resp, nil
}

// ServePricing registers a responder for price lookups.
func ServePricing(nc *nats.Conn) (*nats.Subscription, error) {
    return nc.Subscribe("pricing.lookup", func(msg *nats.Msg) {
        var req PriceRequest
        json.Unmarshal(msg.Data, &req)

        // In reality, look up the price from a database.
        resp := PriceResponse{Price: 4999}
        data, _ := json.Marshal(resp)
        msg.Respond(data)
    })
}

JetStream: Persistence, Replay, and Acknowledgement
#

JetStream is NATS’ built-in persistence layer. It adds durable streams, consumer groups, replay, and at-least-once or exactly-once delivery guarantees. A stream captures messages matching one or more subjects and retains them according to your retention policy.

internal/messaging/jetstream.go
package messaging

import (
    "context"
    "fmt"
    "time"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

// SetupStream creates (or updates) a JetStream stream for order events.
func SetupStream(ctx context.Context, nc *nats.Conn) (jetstream.JetStream, error) {
    js, err := jetstream.New(nc)
    if err != nil {
        return nil, fmt.Errorf("jetstream context: %w", err)
    }

    _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
        Name:        "ORDERS",
        Subjects:    []string{"orders.>"},
        MaxAge:      7 * 24 * time.Hour, // retain for 7 days
        Storage:     jetstream.FileStorage,
        Replicas:    1, // set to 3 for clustered deployments
        Retention:   jetstream.LimitsPolicy,
        Discard:     jetstream.DiscardOld,
    })
    if err != nil {
        return nil, fmt.Errorf("create stream: %w", err)
    }

    return js, nil
}

Push Consumer: Durable Subscription
#

A push consumer has messages delivered automatically by the server. It is the closest analogue to a regular subscription but with durability.

internal/messaging/push_consumer.go
package messaging

import (
    "context"
    "fmt"
    "log/slog"

    "github.com/nats-io/nats.go/jetstream"
)

// StartPushConsumer creates a durable push consumer for order processing.
func StartPushConsumer(ctx context.Context, js jetstream.JetStream) error {
    consumer, err := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
        Durable:       "order-processor",
        FilterSubject: "orders.created",
        AckPolicy:     jetstream.AckExplicitPolicy,
        MaxDeliver:    5,                 // retry up to 5 times before nacking permanently
        AckWait:       30 * time.Second, // re-deliver if not acked within 30s
    })
    if err != nil {
        return fmt.Errorf("create consumer: %w", err)
    }

    _, err = consumer.Consume(func(msg jetstream.Msg) {
        slog.Info("processing order from JetStream", "subject", msg.Subject())

        // Process the message.
        if err := processOrder(msg.Data()); err != nil {
            slog.Error("order processing failed, nacking", "err", err)
            msg.Nak() // will be redelivered
            return
        }
        msg.Ack()
    })
    return err
}

func processOrder(data []byte) error { return nil } // placeholder

Pull Consumer: Controlled Batch Processing
#

A pull consumer lets your application fetch messages on demand. This is the right model when you need to control throughput or process in batches.

internal/messaging/pull_consumer.go
package messaging

import (
    "context"
    "fmt"
    "log/slog"
    "time"

    "github.com/nats-io/nats.go/jetstream"
)

// RunPullBatch fetches a batch of messages and processes them.
func RunPullBatch(ctx context.Context, js jetstream.JetStream) error {
    consumer, err := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
        Durable:   "order-batch-processor",
        AckPolicy: jetstream.AckExplicitPolicy,
    })
    if err != nil {
        return fmt.Errorf("create pull consumer: %w", err)
    }

    msgs, err := consumer.Fetch(50, jetstream.FetchMaxWait(2*time.Second))
    if err != nil {
        return fmt.Errorf("fetch: %w", err)
    }

    for msg := range msgs.Messages() {
        slog.Info("batch processing", "subject", msg.Subject())
        // process...
        msg.Ack()
    }
    if err := msgs.Error(); err != nil {
        return fmt.Errorf("fetch error: %w", err)
    }
    return nil
}
Tip

Pull consumers are significantly better for workloads with variable processing latency. With a push consumer, if your handler is slow, messages pile up in the in-memory channel buffer. With pull consumers, you control the fetch rate explicitly.

Authentication: Username/Password and NKey
#

internal/messaging/auth.go
package messaging

import "github.com/nats-io/nats.go"

// ConnectWithPassword connects using username/password (accounts-based auth).
func ConnectWithPassword(serverURL, user, password string) (*nats.Conn, error) {
    return nats.Connect(serverURL,
        nats.UserInfo(user, password),
        nats.MaxReconnects(10),
    )
}

// ConnectWithNKey connects using an NKey seed file.
// NKeys use Ed25519. No shared secret is transmitted over the wire.
func ConnectWithNKey(serverURL, nkeySeedPath string) (*nats.Conn, error) {
    opt, err := nats.NkeyOptionFromSeed(nkeySeedPath)
    if err != nil {
        return nil, err
    }
    return nats.Connect(serverURL, opt, nats.MaxReconnects(10))
}
Note

NKeys are the preferred authentication method for production deployments. They use Ed25519 key pairs. The seed (private key) never leaves the client; the server only needs the public key. There is no shared secret to rotate or leak.

Comparison: NATS vs Alternatives
#

FeatureNATS CoreJetStreamKafkaRedis Streams
Delivery guaranteeAt-most-onceAt-least-once / exactly-onceAt-least-onceAt-least-once
PersistenceNoneFile / MemoryLog (days to forever)Memory / RDB
Max throughputMulti-million msg/sMillions msg/sMillions msg/sMillions msg/s
Latency (p50)sub-ms1-5ms5-15mssub-ms
Operational complexityVery lowLowHighLow (if Redis already used)
ReplayNoYesYesYes
Consumer groupsQueue groupsYes (durable consumers)Yes (consumer groups)Yes
Best forInternal RPC, control planesMicroservice events, IoTEvent sourcing, audit logsLightweight queues in Redis-heavy stacks

Common Mistakes
#

Using NATS Core where you need delivery guarantees
NATS Core is not a queue. If a subscriber is offline when a message is published, the message is gone. Teams that start with core pub/sub for “simple” use cases and then discover they need delivery guarantees face a significant refactor. Decide upfront: if the message must be processed at least once, start with JetStream.
Not handling reconnection for JetStream consumers
JetStream push consumers using the newer jetstream package (v2 API) handle reconnection automatically. But if you are using the older js.Subscribe API with nats.Durable, you must re-subscribe in the reconnect handler. The newer API is the right choice for any new code.
Subject naming without hierarchy
Flat subject names like events or orders give you no wildcard flexibility and make it impossible to route by service or event type. Always use at least two levels: <domain>.<event>. Three levels (<domain>.<aggregate>.<event>) scale better for large systems.
Synchronous publishing in hot paths
nc.Publish is asynchronous (fire and forget). js.Publish (JetStream) is synchronous by default – it waits for a server acknowledgement (publish ack). In hot paths, use js.PublishAsync with a done channel to batch publish acks instead of serialising them.

If you want to go deeper on any of this, I offer 1:1 coaching sessions for engineers working on AI integration, cloud architecture, and platform engineering. Book a session (50 EUR / 60 min) or reach out at manuel.fedele+website@gmail.com.

Related