Kafka, NATS, Redis Streams, and RabbitMQ all describe themselves as “messaging systems.” They are not interchangeable. Kafka is optimised for high-throughput event log storage with long-term replay. Redis Streams is a good fit when you already have Redis and need a lightweight consumer group. RabbitMQ is the right choice when you need complex routing and AMQP semantics.
NATS occupies a different niche: it is extremely lightweight (a single binary under 20MB), starts in milliseconds, delivers sub-millisecond round-trip latency, and handles automatic load balancing across consumers out of the box. For control planes, microservice meshes, IoT telemetry, and internal RPC-style communication, NATS is often the cleanest choice. For durable event sourcing or guaranteed exactly-once delivery, NATS JetStream extends the core with persistence and acknowledgement.
Architecture Overview#
flowchart TD
Publisher[Publisher] -->|Publish| Server[NATS Server]
Server -->|Fan-out| Sub1[Subscriber A]
Server -->|Fan-out| Sub2[Subscriber B]
Server -->|Load balanced| QG[Queue Group\nworkers.orders]
QG --> W1[Worker 1]
QG --> W2[Worker 2]
Server -->|Persisted| JS[(JetStream\nStream)]
JS -->|Push / Pull| Consumer[Durable Consumer]
Connection Setup with Production Options#
Install the client:
go get github.com/nats-io/nats.goA production connection needs TLS, a reconnect policy, and an error handler. Relying on the default bare nats.Connect("nats://localhost:4222") is fine for local development and nothing else.
package messaging
import (
"crypto/tls"
"log/slog"
"time"
"github.com/nats-io/nats.go"
)
func NewConn(serverURL string, opts ...nats.Option) (*nats.Conn, error) {
defaults := []nats.Option{
// Reconnect up to 10 times with 2-second intervals.
nats.MaxReconnects(10),
nats.ReconnectWait(2 * time.Second),
// Log connection lifecycle events.
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
slog.Warn("nats disconnected", "err", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
slog.Info("nats reconnected", "url", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
slog.Error("nats connection closed permanently")
}),
// TLS for production; remove for local dev.
nats.Secure(&tls.Config{MinVersion: tls.VersionTLS12}),
}
merged := append(defaults, opts...)
return nats.Connect(serverURL, merged...)
}NATS core is a fire-and-forget system. If no subscriber is listening when a message is published, the message is gone. For any use case that requires durability, use JetStream. The reconnect handler above will re-establish subscriptions on reconnect only if your code re-subscribes in the ReconnectHandler. Core subscriptions survive reconnects automatically; JetStream consumers do not need re-subscription because they are server-side state.
Core Pub/Sub: Subject Hierarchies and Wildcards#
NATS subjects are dot-separated strings. Two wildcards let you build flexible routing:
*matches exactly one token:service.*.eventsmatchesservice.orders.eventsandservice.payments.eventsbut notservice.orders.internal.events.>matches one or more tokens:logs.>matches everything underlogs..
package messaging
import (
"log/slog"
"github.com/nats-io/nats.go"
)
// SubscribeEvents subscribes to all service event subjects.
func SubscribeEvents(nc *nats.Conn) (*nats.Subscription, error) {
return nc.Subscribe("service.*.events", func(msg *nats.Msg) {
slog.Info("received event",
"subject", msg.Subject,
"data", string(msg.Data),
)
})
}
// SubscribeAllLogs subscribes to the entire logs hierarchy.
func SubscribeAllLogs(nc *nats.Conn) (*nats.Subscription, error) {
return nc.Subscribe("logs.>", func(msg *nats.Msg) {
slog.Info("log entry", "subject", msg.Subject, "data", string(msg.Data))
})
}Subject naming convention matters. Use lowercase, dot-separated nouns: orders.created, payments.processed, inventory.updated. Avoid generic names like events or messages at the top level, as they make wildcard subscriptions ambiguous and debugging harder.
Queue Subscriptions: Load Balancing#
A queue subscription (also called a queue group) is the primary production pattern for distributing work across multiple instances of the same service. Any message published to the subject is delivered to exactly one member of the group.
package messaging
import (
"encoding/json"
"fmt"
"log/slog"
"github.com/nats-io/nats.go"
)
type OrderEvent struct {
OrderID string `json:"order_id"`
Amount int64 `json:"amount"`
}
// StartOrderWorker subscribes to the queue group "workers.orders".
// Multiple instances of this function across different processes
// will share the message load automatically.
func StartOrderWorker(nc *nats.Conn) (*nats.Subscription, error) {
return nc.QueueSubscribe("orders.created", "workers.orders", func(msg *nats.Msg) {
var event OrderEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
slog.Error("failed to unmarshal order event", "err", err)
return
}
slog.Info("processing order", "order_id", event.OrderID, "amount", event.Amount)
// process...
})
}
// Publish sends an order event.
func PublishOrder(nc *nats.Conn, event OrderEvent) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal order event: %w", err)
}
return nc.Publish("orders.created", data)
}Request/Reply: Synchronous RPC over NATS#
NATS has a built-in request/reply pattern. The publisher sends a message with a reply subject embedded; any subscriber that responds to that reply subject completes the call. nc.Request handles this transparently with a timeout.
package messaging
import (
"encoding/json"
"fmt"
"time"
"github.com/nats-io/nats.go"
)
type PriceRequest struct {
ProductID string `json:"product_id"`
}
type PriceResponse struct {
Price int64 `json:"price"`
Error string `json:"error,omitempty"`
}
// RequestPrice sends a synchronous price lookup via NATS request/reply.
func RequestPrice(nc *nats.Conn, productID string) (PriceResponse, error) {
payload, _ := json.Marshal(PriceRequest{ProductID: productID})
msg, err := nc.Request("pricing.lookup", payload, 2*time.Second)
if err != nil {
return PriceResponse{}, fmt.Errorf("price request: %w", err)
}
var resp PriceResponse
if err := json.Unmarshal(msg.Data, &resp); err != nil {
return PriceResponse{}, fmt.Errorf("price response unmarshal: %w", err)
}
return resp, nil
}
// ServePricing registers a responder for price lookups.
func ServePricing(nc *nats.Conn) (*nats.Subscription, error) {
return nc.Subscribe("pricing.lookup", func(msg *nats.Msg) {
var req PriceRequest
json.Unmarshal(msg.Data, &req)
// In reality, look up the price from a database.
resp := PriceResponse{Price: 4999}
data, _ := json.Marshal(resp)
msg.Respond(data)
})
}JetStream: Persistence, Replay, and Acknowledgement#
JetStream is NATS’ built-in persistence layer. It adds durable streams, consumer groups, replay, and at-least-once or exactly-once delivery guarantees. A stream captures messages matching one or more subjects and retains them according to your retention policy.
package messaging
import (
"context"
"fmt"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// SetupStream creates (or updates) a JetStream stream for order events.
func SetupStream(ctx context.Context, nc *nats.Conn) (jetstream.JetStream, error) {
js, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("jetstream context: %w", err)
}
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"},
MaxAge: 7 * 24 * time.Hour, // retain for 7 days
Storage: jetstream.FileStorage,
Replicas: 1, // set to 3 for clustered deployments
Retention: jetstream.LimitsPolicy,
Discard: jetstream.DiscardOld,
})
if err != nil {
return nil, fmt.Errorf("create stream: %w", err)
}
return js, nil
}Push Consumer: Durable Subscription#
A push consumer has messages delivered automatically by the server. It is the closest analogue to a regular subscription but with durability.
package messaging
import (
"context"
"fmt"
"log/slog"
"github.com/nats-io/nats.go/jetstream"
)
// StartPushConsumer creates a durable push consumer for order processing.
func StartPushConsumer(ctx context.Context, js jetstream.JetStream) error {
consumer, err := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Durable: "order-processor",
FilterSubject: "orders.created",
AckPolicy: jetstream.AckExplicitPolicy,
MaxDeliver: 5, // retry up to 5 times before nacking permanently
AckWait: 30 * time.Second, // re-deliver if not acked within 30s
})
if err != nil {
return fmt.Errorf("create consumer: %w", err)
}
_, err = consumer.Consume(func(msg jetstream.Msg) {
slog.Info("processing order from JetStream", "subject", msg.Subject())
// Process the message.
if err := processOrder(msg.Data()); err != nil {
slog.Error("order processing failed, nacking", "err", err)
msg.Nak() // will be redelivered
return
}
msg.Ack()
})
return err
}
func processOrder(data []byte) error { return nil } // placeholderPull Consumer: Controlled Batch Processing#
A pull consumer lets your application fetch messages on demand. This is the right model when you need to control throughput or process in batches.
package messaging
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/nats-io/nats.go/jetstream"
)
// RunPullBatch fetches a batch of messages and processes them.
func RunPullBatch(ctx context.Context, js jetstream.JetStream) error {
consumer, err := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Durable: "order-batch-processor",
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
return fmt.Errorf("create pull consumer: %w", err)
}
msgs, err := consumer.Fetch(50, jetstream.FetchMaxWait(2*time.Second))
if err != nil {
return fmt.Errorf("fetch: %w", err)
}
for msg := range msgs.Messages() {
slog.Info("batch processing", "subject", msg.Subject())
// process...
msg.Ack()
}
if err := msgs.Error(); err != nil {
return fmt.Errorf("fetch error: %w", err)
}
return nil
}Pull consumers are significantly better for workloads with variable processing latency. With a push consumer, if your handler is slow, messages pile up in the in-memory channel buffer. With pull consumers, you control the fetch rate explicitly.
Authentication: Username/Password and NKey#
package messaging
import "github.com/nats-io/nats.go"
// ConnectWithPassword connects using username/password (accounts-based auth).
func ConnectWithPassword(serverURL, user, password string) (*nats.Conn, error) {
return nats.Connect(serverURL,
nats.UserInfo(user, password),
nats.MaxReconnects(10),
)
}
// ConnectWithNKey connects using an NKey seed file.
// NKeys use Ed25519. No shared secret is transmitted over the wire.
func ConnectWithNKey(serverURL, nkeySeedPath string) (*nats.Conn, error) {
opt, err := nats.NkeyOptionFromSeed(nkeySeedPath)
if err != nil {
return nil, err
}
return nats.Connect(serverURL, opt, nats.MaxReconnects(10))
}NKeys are the preferred authentication method for production deployments. They use Ed25519 key pairs. The seed (private key) never leaves the client; the server only needs the public key. There is no shared secret to rotate or leak.
Comparison: NATS vs Alternatives#
| Feature | NATS Core | JetStream | Kafka | Redis Streams |
|---|---|---|---|---|
| Delivery guarantee | At-most-once | At-least-once / exactly-once | At-least-once | At-least-once |
| Persistence | None | File / Memory | Log (days to forever) | Memory / RDB |
| Max throughput | Multi-million msg/s | Millions msg/s | Millions msg/s | Millions msg/s |
| Latency (p50) | sub-ms | 1-5ms | 5-15ms | sub-ms |
| Operational complexity | Very low | Low | High | Low (if Redis already used) |
| Replay | No | Yes | Yes | Yes |
| Consumer groups | Queue groups | Yes (durable consumers) | Yes (consumer groups) | Yes |
| Best for | Internal RPC, control planes | Microservice events, IoT | Event sourcing, audit logs | Lightweight queues in Redis-heavy stacks |
Common Mistakes#
Using NATS Core where you need delivery guarantees
Not handling reconnection for JetStream consumers
jetstream package (v2 API) handle reconnection automatically. But if you are using the older js.Subscribe API with nats.Durable, you must re-subscribe in the reconnect handler. The newer API is the right choice for any new code.Subject naming without hierarchy
events or orders give you no wildcard flexibility and make it impossible to route by service or event type. Always use at least two levels: <domain>.<event>. Three levels (<domain>.<aggregate>.<event>) scale better for large systems.Synchronous publishing in hot paths
nc.Publish is asynchronous (fire and forget). js.Publish (JetStream) is synchronous by default – it waits for a server acknowledgement (publish ack). In hot paths, use js.PublishAsync with a done channel to batch publish acks instead of serialising them.If you want to go deeper on any of this, I offer 1:1 coaching sessions for engineers working on AI integration, cloud architecture, and platform engineering. Book a session (50 EUR / 60 min) or reach out at manuel.fedele+website@gmail.com.