Salta al contenuto principale

NATS Messaging in Go: Core, JetStream e Pattern di Produzione

·9 minuti
Indice dei contenuti
NATS e’ un sistema di messaggistica in un unico binario con latenza sub-millisecondo, costruito per i workload cloud-native. Non e’ Kafka, e non e’ Redis Streams. Sapere quando sceglierlo e cosa non sa fare e’ importante quanto sapere come usarlo. Questo post copre core pub/sub, queue group, request/reply, JetStream, pull consumer e autenticazione.

Kafka, NATS, Redis Streams e RabbitMQ si descrivono tutti come “sistemi di messaggistica.” Non sono intercambiabili. Kafka e’ ottimizzato per lo storage di event log ad alto throughput con replay a lungo termine. Redis Streams e’ una buona scelta quando hai gia’ Redis e hai bisogno di un consumer group leggero. RabbitMQ e’ la scelta giusta per routing complesso e semantiche AMQP.

NATS occupa una nicchia diversa: e’ estremamente leggero (un unico binario sotto i 20MB), si avvia in millisecondi, offre latenza round-trip sub-millisecondo e gestisce il bilanciamento del carico tra i consumer automaticamente. Per control plane, mesh di microservizi, telemetria IoT e comunicazione RPC interna, NATS e’ spesso la scelta piu’ pulita. Per event sourcing duraturo o consegna exactly-once garantita, JetStream estende il core con persistenza e acknowledgement.

Panoramica dell’Architettura
#

flowchart TD
    Publisher[Publisher] -->|Publish| Server[NATS Server]
    Server -->|Fan-out| Sub1[Subscriber A]
    Server -->|Fan-out| Sub2[Subscriber B]
    Server -->|Load balanced| QG[Queue Group\nworkers.orders]
    QG --> W1[Worker 1]
    QG --> W2[Worker 2]
    Server -->|Persisted| JS[(JetStream\nStream)]
    JS -->|Push / Pull| Consumer[Durable Consumer]

Setup della Connessione con Opzioni di Produzione
#

Installa il client:

go get github.com/nats-io/nats.go

Una connessione di produzione ha bisogno di TLS, una reconnect policy e un error handler. Affidarsi al semplice nats.Connect("nats://localhost:4222") va bene per lo sviluppo locale e basta.

internal/messaging/client.go
package messaging

import (
    "crypto/tls"
    "log/slog"
    "time"

    "github.com/nats-io/nats.go"
)

func NewConn(serverURL string, opts ...nats.Option) (*nats.Conn, error) {
    defaults := []nats.Option{
        // Riconnetti fino a 10 volte con intervalli di 2 secondi.
        nats.MaxReconnects(10),
        nats.ReconnectWait(2 * time.Second),

        // Log degli eventi del ciclo di vita della connessione.
        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
            slog.Warn("nats disconnesso", "err", err)
        }),
        nats.ReconnectHandler(func(nc *nats.Conn) {
            slog.Info("nats riconnesso", "url", nc.ConnectedUrl())
        }),
        nats.ClosedHandler(func(nc *nats.Conn) {
            slog.Error("connessione nats chiusa definitivamente")
        }),

        // TLS per la produzione; rimuovere per lo sviluppo locale.
        nats.Secure(&tls.Config{MinVersion: tls.VersionTLS12}),
    }

    merged := append(defaults, opts...)
    return nats.Connect(serverURL, merged...)
}
Importante

Il NATS core e’ un sistema fire-and-forget. Se nessun subscriber e’ in ascolto quando viene pubblicato un messaggio, il messaggio e’ perso. Per qualsiasi caso d’uso che richiede durabilita’, usa JetStream. Le subscription core sopravvivono alle riconnessioni automaticamente; i JetStream consumer non necessitano di ri-subscription perche’ sono stato server-side.

Core Pub/Sub: Gerarchie di Subject e Wildcard
#

I subject NATS sono stringhe separate da punti. Due wildcard permettono di costruire routing flessibile:

  • * corrisponde esattamente a un token: service.*.events corrisponde a service.orders.events e service.payments.events ma non a service.orders.internal.events.
  • > corrisponde a uno o piu’ token: logs.> corrisponde a tutto sotto logs..
internal/messaging/subscribe.go
package messaging

import (
    "log/slog"

    "github.com/nats-io/nats.go"
)

// SubscribeEvents si iscrive a tutti i subject di eventi dei servizi.
func SubscribeEvents(nc *nats.Conn) (*nats.Subscription, error) {
    return nc.Subscribe("service.*.events", func(msg *nats.Msg) {
        slog.Info("evento ricevuto",
            "subject", msg.Subject,
            "data", string(msg.Data),
        )
    })
}

// SubscribeAllLogs si iscrive all'intera gerarchia dei log.
func SubscribeAllLogs(nc *nats.Conn) (*nats.Subscription, error) {
    return nc.Subscribe("logs.>", func(msg *nats.Msg) {
        slog.Info("voce di log", "subject", msg.Subject, "data", string(msg.Data))
    })
}
Nota

La convenzione di naming dei subject e’ importante. Usa sostantivi minuscoli separati da punti: orders.created, payments.processed, inventory.updated. Evita nomi generici come events o messages al primo livello – rendono le wildcard subscription ambigue e il debugging piu’ difficile.

Queue Subscription: Bilanciamento del Carico
#

Una queue subscription (chiamata anche queue group) e’ il pattern di produzione primario per distribuire il lavoro tra piu’ istanze dello stesso servizio. Ogni messaggio pubblicato nel subject viene consegnato esattamente a un membro del gruppo.

internal/messaging/queue.go
package messaging

import (
    "encoding/json"
    "fmt"
    "log/slog"

    "github.com/nats-io/nats.go"
)

type OrderEvent struct {
    OrderID string `json:"order_id"`
    Amount  int64  `json:"amount"`
}

// StartOrderWorker si iscrive al queue group "workers.orders".
// Piu' istanze di questa funzione su processi diversi
// condivideranno automaticamente il carico dei messaggi.
func StartOrderWorker(nc *nats.Conn) (*nats.Subscription, error) {
    return nc.QueueSubscribe("orders.created", "workers.orders", func(msg *nats.Msg) {
        var event OrderEvent
        if err := json.Unmarshal(msg.Data, &event); err != nil {
            slog.Error("impossibile deserializzare l'evento ordine", "err", err)
            return
        }
        slog.Info("elaborazione ordine", "order_id", event.OrderID, "amount", event.Amount)
        // elabora...
    })
}

// Publish invia un evento ordine.
func PublishOrder(nc *nats.Conn, event OrderEvent) error {
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("marshal order event: %w", err)
    }
    return nc.Publish("orders.created", data)
}

Request/Reply: RPC Sincrono su NATS
#

NATS ha un pattern request/reply integrato. Il publisher invia un messaggio con un reply subject incorporato; qualsiasi subscriber che risponde a quel reply subject completa la chiamata. nc.Request gestisce questo in modo trasparente con un timeout.

internal/messaging/rpc.go
package messaging

import (
    "encoding/json"
    "fmt"
    "time"

    "github.com/nats-io/nats.go"
)

type PriceRequest struct {
    ProductID string `json:"product_id"`
}

type PriceResponse struct {
    Price int64  `json:"price"`
    Error string `json:"error,omitempty"`
}

// RequestPrice invia una richiesta sincrona di ricerca prezzi via NATS request/reply.
func RequestPrice(nc *nats.Conn, productID string) (PriceResponse, error) {
    payload, _ := json.Marshal(PriceRequest{ProductID: productID})

    msg, err := nc.Request("pricing.lookup", payload, 2*time.Second)
    if err != nil {
        return PriceResponse{}, fmt.Errorf("price request: %w", err)
    }

    var resp PriceResponse
    if err := json.Unmarshal(msg.Data, &resp); err != nil {
        return PriceResponse{}, fmt.Errorf("price response unmarshal: %w", err)
    }
    return resp, nil
}

// ServePricing registra un responder per le ricerche di prezzi.
func ServePricing(nc *nats.Conn) (*nats.Subscription, error) {
    return nc.Subscribe("pricing.lookup", func(msg *nats.Msg) {
        var req PriceRequest
        json.Unmarshal(msg.Data, &req)

        // In realta', cercare il prezzo in un database.
        resp := PriceResponse{Price: 4999}
        data, _ := json.Marshal(resp)
        msg.Respond(data)
    })
}

JetStream: Persistenza, Replay e Acknowledgement
#

JetStream e’ il layer di persistenza integrato di NATS. Aggiunge stream durabili, consumer group, replay e garanzie di consegna at-least-once o exactly-once. Uno stream cattura i messaggi corrispondenti a uno o piu’ subject e li conserva in base alla policy di retention.

internal/messaging/jetstream.go
package messaging

import (
    "context"
    "fmt"
    "time"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

// SetupStream crea (o aggiorna) un JetStream stream per gli eventi degli ordini.
func SetupStream(ctx context.Context, nc *nats.Conn) (jetstream.JetStream, error) {
    js, err := jetstream.New(nc)
    if err != nil {
        return nil, fmt.Errorf("jetstream context: %w", err)
    }

    _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
        Name:        "ORDERS",
        Subjects:    []string{"orders.>"},
        MaxAge:      7 * 24 * time.Hour, // conserva per 7 giorni
        Storage:     jetstream.FileStorage,
        Replicas:    1, // imposta a 3 per deployment in cluster
        Retention:   jetstream.LimitsPolicy,
        Discard:     jetstream.DiscardOld,
    })
    if err != nil {
        return nil, fmt.Errorf("create stream: %w", err)
    }

    return js, nil
}

Push Consumer: Subscription Durabile
#

Un push consumer riceve i messaggi consegnati automaticamente dal server. E’ l’analogo piu’ vicino a una subscription regolare ma con durabilita'.

internal/messaging/push_consumer.go
package messaging

import (
    "context"
    "fmt"
    "log/slog"
    "time"

    "github.com/nats-io/nats.go/jetstream"
)

// StartPushConsumer crea un push consumer duraturo per l'elaborazione degli ordini.
func StartPushConsumer(ctx context.Context, js jetstream.JetStream) error {
    consumer, err := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
        Durable:       "order-processor",
        FilterSubject: "orders.created",
        AckPolicy:     jetstream.AckExplicitPolicy,
        MaxDeliver:    5,
        AckWait:       30 * time.Second,
    })
    if err != nil {
        return fmt.Errorf("create consumer: %w", err)
    }

    _, err = consumer.Consume(func(msg jetstream.Msg) {
        slog.Info("elaborazione ordine da JetStream", "subject", msg.Subject())

        if err := processOrder(msg.Data()); err != nil {
            slog.Error("elaborazione ordine fallita, nacking", "err", err)
            msg.Nak()
            return
        }
        msg.Ack()
    })
    return err
}

func processOrder(data []byte) error { return nil }

Pull Consumer: Elaborazione a Batch Controllata
#

Un pull consumer permette alla tua applicazione di recuperare messaggi su richiesta. Questo e’ il modello giusto quando e’ necessario controllare il throughput o elaborare in batch.

internal/messaging/pull_consumer.go
package messaging

import (
    "context"
    "fmt"
    "log/slog"
    "time"

    "github.com/nats-io/nats.go/jetstream"
)

// RunPullBatch recupera un batch di messaggi e li elabora.
func RunPullBatch(ctx context.Context, js jetstream.JetStream) error {
    consumer, err := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
        Durable:   "order-batch-processor",
        AckPolicy: jetstream.AckExplicitPolicy,
    })
    if err != nil {
        return fmt.Errorf("create pull consumer: %w", err)
    }

    msgs, err := consumer.Fetch(50, jetstream.FetchMaxWait(2*time.Second))
    if err != nil {
        return fmt.Errorf("fetch: %w", err)
    }

    for msg := range msgs.Messages() {
        slog.Info("elaborazione batch", "subject", msg.Subject())
        msg.Ack()
    }
    if err := msgs.Error(); err != nil {
        return fmt.Errorf("fetch error: %w", err)
    }
    return nil
}
Suggerimento

I pull consumer sono significativamente migliori per workload con latenza di elaborazione variabile. Con un push consumer, se il tuo handler e’ lento, i messaggi si accumulano nel buffer del channel in-memory. Con i pull consumer, controlli esplicitamente il rate di fetch.

Autenticazione: Username/Password e NKey
#

internal/messaging/auth.go
package messaging

import "github.com/nats-io/nats.go"

// ConnectWithPassword si connette usando username/password.
func ConnectWithPassword(serverURL, user, password string) (*nats.Conn, error) {
    return nats.Connect(serverURL,
        nats.UserInfo(user, password),
        nats.MaxReconnects(10),
    )
}

// ConnectWithNKey si connette usando un file seed NKey.
// Gli NKey usano Ed25519 -- nessun segreto condiviso viene trasmesso sul wire.
func ConnectWithNKey(serverURL, nkeySeedPath string) (*nats.Conn, error) {
    opt, err := nats.NkeyOptionFromSeed(nkeySeedPath)
    if err != nil {
        return nil, err
    }
    return nats.Connect(serverURL, opt, nats.MaxReconnects(10))
}
Nota

Gli NKey sono il metodo di autenticazione preferito per i deployment in produzione. Usano coppie di chiavi Ed25519. Il seed (chiave privata) non lascia mai il client; il server ha bisogno solo della chiave pubblica. Non c’e’ nessun segreto condiviso da ruotare o far trapelare.

Confronto: NATS vs Alternative
#

FeatureNATS CoreJetStreamKafkaRedis Streams
Garanzia di consegnaAt-most-onceAt-least-once / exactly-onceAt-least-onceAt-least-once
PersistenzaNessunaFile / MemoryLog (giorni o illimitato)Memory / RDB
Max throughputMulti-milioni msg/sMilioni msg/sMilioni msg/sMilioni msg/s
Latenza (p50)sub-ms1-5ms5-15mssub-ms
Complessita’ operativaMolto bassaBassaAltaBassa
ReplayNoSiSiSi
Consumer groupQueue groupSi (durable consumer)SiSi
Ideale perRPC interno, control planeEvent microservizi, IoTEvent sourcing, audit logCode leggere su stack Redis

Errori Comuni
#

Usare NATS Core dove servono garanzie di consegna
NATS Core non e’ una coda. Se un subscriber e’ offline quando viene pubblicato un messaggio, il messaggio e’ perso. I team che iniziano con il core pub/sub per casi d’uso “semplici” e poi scoprono di aver bisogno di garanzie di consegna affrontano un refactoring significativo. Decidi in anticipo: se il messaggio deve essere elaborato almeno una volta, inizia con JetStream.
Non gestire la riconnessione per i consumer JetStream
I JetStream push consumer che usano il nuovo package jetstream (API v2) gestiscono la riconnessione automaticamente. Ma se usi la vecchia API js.Subscribe con nats.Durable, devi ri-iscriverti nel reconnect handler. La nuova API e’ la scelta giusta per qualsiasi codice nuovo.
Subject naming senza gerarchia
Nomi di subject flat come events o orders non danno flessibilita’ con le wildcard e rendono impossibile instradare per servizio o tipo di evento. Usa sempre almeno due livelli: <dominio>.<evento>. Tre livelli scalano meglio per sistemi di grandi dimensioni.
Pubblicazione sincrona in hot path
nc.Publish e’ asincrono (fire and forget). js.Publish (JetStream) e’ sincrono per default – attende un acknowledgement dal server. In hot path, usa js.PublishAsync con un done channel per raggruppare i publish ack invece di serializzarli.

Se vuoi approfondire uno di questi argomenti, offro sessioni di coaching 1:1 per ingegneri che lavorano su integrazione AI, architettura cloud e platform engineering. Prenota una sessione (50 EUR / 60 min) o scrivimi a manuel.fedele+website@gmail.com.

Articoli correlati