Kafka, NATS, Redis Streams e RabbitMQ si descrivono tutti come “sistemi di messaggistica.” Non sono intercambiabili. Kafka e’ ottimizzato per lo storage di event log ad alto throughput con replay a lungo termine. Redis Streams e’ una buona scelta quando hai gia’ Redis e hai bisogno di un consumer group leggero. RabbitMQ e’ la scelta giusta per routing complesso e semantiche AMQP.
NATS occupa una nicchia diversa: e’ estremamente leggero (un unico binario sotto i 20MB), si avvia in millisecondi, offre latenza round-trip sub-millisecondo e gestisce il bilanciamento del carico tra i consumer automaticamente. Per control plane, mesh di microservizi, telemetria IoT e comunicazione RPC interna, NATS e’ spesso la scelta piu’ pulita. Per event sourcing duraturo o consegna exactly-once garantita, JetStream estende il core con persistenza e acknowledgement.
Panoramica dell’Architettura#
flowchart TD
Publisher[Publisher] -->|Publish| Server[NATS Server]
Server -->|Fan-out| Sub1[Subscriber A]
Server -->|Fan-out| Sub2[Subscriber B]
Server -->|Load balanced| QG[Queue Group\nworkers.orders]
QG --> W1[Worker 1]
QG --> W2[Worker 2]
Server -->|Persisted| JS[(JetStream\nStream)]
JS -->|Push / Pull| Consumer[Durable Consumer]
Setup della Connessione con Opzioni di Produzione#
Installa il client:
go get github.com/nats-io/nats.goUna connessione di produzione ha bisogno di TLS, una reconnect policy e un error handler. Affidarsi al semplice nats.Connect("nats://localhost:4222") va bene per lo sviluppo locale e basta.
package messaging
import (
"crypto/tls"
"log/slog"
"time"
"github.com/nats-io/nats.go"
)
func NewConn(serverURL string, opts ...nats.Option) (*nats.Conn, error) {
defaults := []nats.Option{
// Riconnetti fino a 10 volte con intervalli di 2 secondi.
nats.MaxReconnects(10),
nats.ReconnectWait(2 * time.Second),
// Log degli eventi del ciclo di vita della connessione.
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
slog.Warn("nats disconnesso", "err", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
slog.Info("nats riconnesso", "url", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
slog.Error("connessione nats chiusa definitivamente")
}),
// TLS per la produzione; rimuovere per lo sviluppo locale.
nats.Secure(&tls.Config{MinVersion: tls.VersionTLS12}),
}
merged := append(defaults, opts...)
return nats.Connect(serverURL, merged...)
}Il NATS core e’ un sistema fire-and-forget. Se nessun subscriber e’ in ascolto quando viene pubblicato un messaggio, il messaggio e’ perso. Per qualsiasi caso d’uso che richiede durabilita’, usa JetStream. Le subscription core sopravvivono alle riconnessioni automaticamente; i JetStream consumer non necessitano di ri-subscription perche’ sono stato server-side.
Core Pub/Sub: Gerarchie di Subject e Wildcard#
I subject NATS sono stringhe separate da punti. Due wildcard permettono di costruire routing flessibile:
*corrisponde esattamente a un token:service.*.eventscorrisponde aservice.orders.eventseservice.payments.eventsma non aservice.orders.internal.events.>corrisponde a uno o piu’ token:logs.>corrisponde a tutto sottologs..
package messaging
import (
"log/slog"
"github.com/nats-io/nats.go"
)
// SubscribeEvents si iscrive a tutti i subject di eventi dei servizi.
func SubscribeEvents(nc *nats.Conn) (*nats.Subscription, error) {
return nc.Subscribe("service.*.events", func(msg *nats.Msg) {
slog.Info("evento ricevuto",
"subject", msg.Subject,
"data", string(msg.Data),
)
})
}
// SubscribeAllLogs si iscrive all'intera gerarchia dei log.
func SubscribeAllLogs(nc *nats.Conn) (*nats.Subscription, error) {
return nc.Subscribe("logs.>", func(msg *nats.Msg) {
slog.Info("voce di log", "subject", msg.Subject, "data", string(msg.Data))
})
}La convenzione di naming dei subject e’ importante. Usa sostantivi minuscoli separati da punti: orders.created, payments.processed, inventory.updated. Evita nomi generici come events o messages al primo livello – rendono le wildcard subscription ambigue e il debugging piu’ difficile.
Queue Subscription: Bilanciamento del Carico#
Una queue subscription (chiamata anche queue group) e’ il pattern di produzione primario per distribuire il lavoro tra piu’ istanze dello stesso servizio. Ogni messaggio pubblicato nel subject viene consegnato esattamente a un membro del gruppo.
package messaging
import (
"encoding/json"
"fmt"
"log/slog"
"github.com/nats-io/nats.go"
)
type OrderEvent struct {
OrderID string `json:"order_id"`
Amount int64 `json:"amount"`
}
// StartOrderWorker si iscrive al queue group "workers.orders".
// Piu' istanze di questa funzione su processi diversi
// condivideranno automaticamente il carico dei messaggi.
func StartOrderWorker(nc *nats.Conn) (*nats.Subscription, error) {
return nc.QueueSubscribe("orders.created", "workers.orders", func(msg *nats.Msg) {
var event OrderEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
slog.Error("impossibile deserializzare l'evento ordine", "err", err)
return
}
slog.Info("elaborazione ordine", "order_id", event.OrderID, "amount", event.Amount)
// elabora...
})
}
// Publish invia un evento ordine.
func PublishOrder(nc *nats.Conn, event OrderEvent) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal order event: %w", err)
}
return nc.Publish("orders.created", data)
}Request/Reply: RPC Sincrono su NATS#
NATS ha un pattern request/reply integrato. Il publisher invia un messaggio con un reply subject incorporato; qualsiasi subscriber che risponde a quel reply subject completa la chiamata. nc.Request gestisce questo in modo trasparente con un timeout.
package messaging
import (
"encoding/json"
"fmt"
"time"
"github.com/nats-io/nats.go"
)
type PriceRequest struct {
ProductID string `json:"product_id"`
}
type PriceResponse struct {
Price int64 `json:"price"`
Error string `json:"error,omitempty"`
}
// RequestPrice invia una richiesta sincrona di ricerca prezzi via NATS request/reply.
func RequestPrice(nc *nats.Conn, productID string) (PriceResponse, error) {
payload, _ := json.Marshal(PriceRequest{ProductID: productID})
msg, err := nc.Request("pricing.lookup", payload, 2*time.Second)
if err != nil {
return PriceResponse{}, fmt.Errorf("price request: %w", err)
}
var resp PriceResponse
if err := json.Unmarshal(msg.Data, &resp); err != nil {
return PriceResponse{}, fmt.Errorf("price response unmarshal: %w", err)
}
return resp, nil
}
// ServePricing registra un responder per le ricerche di prezzi.
func ServePricing(nc *nats.Conn) (*nats.Subscription, error) {
return nc.Subscribe("pricing.lookup", func(msg *nats.Msg) {
var req PriceRequest
json.Unmarshal(msg.Data, &req)
// In realta', cercare il prezzo in un database.
resp := PriceResponse{Price: 4999}
data, _ := json.Marshal(resp)
msg.Respond(data)
})
}JetStream: Persistenza, Replay e Acknowledgement#
JetStream e’ il layer di persistenza integrato di NATS. Aggiunge stream durabili, consumer group, replay e garanzie di consegna at-least-once o exactly-once. Uno stream cattura i messaggi corrispondenti a uno o piu’ subject e li conserva in base alla policy di retention.
package messaging
import (
"context"
"fmt"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// SetupStream crea (o aggiorna) un JetStream stream per gli eventi degli ordini.
func SetupStream(ctx context.Context, nc *nats.Conn) (jetstream.JetStream, error) {
js, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("jetstream context: %w", err)
}
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"},
MaxAge: 7 * 24 * time.Hour, // conserva per 7 giorni
Storage: jetstream.FileStorage,
Replicas: 1, // imposta a 3 per deployment in cluster
Retention: jetstream.LimitsPolicy,
Discard: jetstream.DiscardOld,
})
if err != nil {
return nil, fmt.Errorf("create stream: %w", err)
}
return js, nil
}Push Consumer: Subscription Durabile#
Un push consumer riceve i messaggi consegnati automaticamente dal server. E’ l’analogo piu’ vicino a una subscription regolare ma con durabilita'.
package messaging
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/nats-io/nats.go/jetstream"
)
// StartPushConsumer crea un push consumer duraturo per l'elaborazione degli ordini.
func StartPushConsumer(ctx context.Context, js jetstream.JetStream) error {
consumer, err := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Durable: "order-processor",
FilterSubject: "orders.created",
AckPolicy: jetstream.AckExplicitPolicy,
MaxDeliver: 5,
AckWait: 30 * time.Second,
})
if err != nil {
return fmt.Errorf("create consumer: %w", err)
}
_, err = consumer.Consume(func(msg jetstream.Msg) {
slog.Info("elaborazione ordine da JetStream", "subject", msg.Subject())
if err := processOrder(msg.Data()); err != nil {
slog.Error("elaborazione ordine fallita, nacking", "err", err)
msg.Nak()
return
}
msg.Ack()
})
return err
}
func processOrder(data []byte) error { return nil }Pull Consumer: Elaborazione a Batch Controllata#
Un pull consumer permette alla tua applicazione di recuperare messaggi su richiesta. Questo e’ il modello giusto quando e’ necessario controllare il throughput o elaborare in batch.
package messaging
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/nats-io/nats.go/jetstream"
)
// RunPullBatch recupera un batch di messaggi e li elabora.
func RunPullBatch(ctx context.Context, js jetstream.JetStream) error {
consumer, err := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Durable: "order-batch-processor",
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
return fmt.Errorf("create pull consumer: %w", err)
}
msgs, err := consumer.Fetch(50, jetstream.FetchMaxWait(2*time.Second))
if err != nil {
return fmt.Errorf("fetch: %w", err)
}
for msg := range msgs.Messages() {
slog.Info("elaborazione batch", "subject", msg.Subject())
msg.Ack()
}
if err := msgs.Error(); err != nil {
return fmt.Errorf("fetch error: %w", err)
}
return nil
}I pull consumer sono significativamente migliori per workload con latenza di elaborazione variabile. Con un push consumer, se il tuo handler e’ lento, i messaggi si accumulano nel buffer del channel in-memory. Con i pull consumer, controlli esplicitamente il rate di fetch.
Autenticazione: Username/Password e NKey#
package messaging
import "github.com/nats-io/nats.go"
// ConnectWithPassword si connette usando username/password.
func ConnectWithPassword(serverURL, user, password string) (*nats.Conn, error) {
return nats.Connect(serverURL,
nats.UserInfo(user, password),
nats.MaxReconnects(10),
)
}
// ConnectWithNKey si connette usando un file seed NKey.
// Gli NKey usano Ed25519 -- nessun segreto condiviso viene trasmesso sul wire.
func ConnectWithNKey(serverURL, nkeySeedPath string) (*nats.Conn, error) {
opt, err := nats.NkeyOptionFromSeed(nkeySeedPath)
if err != nil {
return nil, err
}
return nats.Connect(serverURL, opt, nats.MaxReconnects(10))
}Gli NKey sono il metodo di autenticazione preferito per i deployment in produzione. Usano coppie di chiavi Ed25519. Il seed (chiave privata) non lascia mai il client; il server ha bisogno solo della chiave pubblica. Non c’e’ nessun segreto condiviso da ruotare o far trapelare.
Confronto: NATS vs Alternative#
| Feature | NATS Core | JetStream | Kafka | Redis Streams |
|---|---|---|---|---|
| Garanzia di consegna | At-most-once | At-least-once / exactly-once | At-least-once | At-least-once |
| Persistenza | Nessuna | File / Memory | Log (giorni o illimitato) | Memory / RDB |
| Max throughput | Multi-milioni msg/s | Milioni msg/s | Milioni msg/s | Milioni msg/s |
| Latenza (p50) | sub-ms | 1-5ms | 5-15ms | sub-ms |
| Complessita’ operativa | Molto bassa | Bassa | Alta | Bassa |
| Replay | No | Si | Si | Si |
| Consumer group | Queue group | Si (durable consumer) | Si | Si |
| Ideale per | RPC interno, control plane | Event microservizi, IoT | Event sourcing, audit log | Code leggere su stack Redis |
Errori Comuni#
Usare NATS Core dove servono garanzie di consegna
Non gestire la riconnessione per i consumer JetStream
jetstream (API v2) gestiscono la riconnessione automaticamente. Ma se usi la vecchia API js.Subscribe con nats.Durable, devi ri-iscriverti nel reconnect handler. La nuova API e’ la scelta giusta per qualsiasi codice nuovo.Subject naming senza gerarchia
events o orders non danno flessibilita’ con le wildcard e rendono impossibile instradare per servizio o tipo di evento. Usa sempre almeno due livelli: <dominio>.<evento>. Tre livelli scalano meglio per sistemi di grandi dimensioni.Pubblicazione sincrona in hot path
nc.Publish e’ asincrono (fire and forget). js.Publish (JetStream) e’ sincrono per default – attende un acknowledgement dal server. In hot path, usa js.PublishAsync con un done channel per raggruppare i publish ack invece di serializzarli.Se vuoi approfondire uno di questi argomenti, offro sessioni di coaching 1:1 per ingegneri che lavorano su integrazione AI, architettura cloud e platform engineering. Prenota una sessione (50 EUR / 60 min) o scrivimi a manuel.fedele+website@gmail.com.