Documentation
¶
Overview ¶
Package natsclient provides a client for managing NATS connections with circuit breaker pattern.
Package natsclient provides a robust NATS client with circuit breaker protection, automatic reconnection, and comprehensive JetStream/KV support for distributed edge systems.
The natsclient package wraps the standard NATS Go client with additional reliability features including circuit breaker pattern for failure protection, exponential backoff for reconnection, and proper context propagation throughout all operations. It serves as the foundation for all NATS communication in the StreamKit framework.
Core Features ¶
Circuit Breaker Pattern: Prevents cascading failures by failing fast after a threshold of consecutive failures (default: 5). The circuit opens to prevent further attempts, then gradually tests the connection with exponential backoff.
Connection Lifecycle Management: Handles connection states automatically through the lifecycle: Disconnected → Connecting → Connected → Reconnecting → Connected. The client manages all transitions with configurable callbacks for state changes.
JetStream Support: Full support for JetStream streams, consumers, and Key-Value stores with proper error handling and circuit breaker integration.
KVStore Abstraction: High-level abstraction over NATS KV providing automatic CAS (Compare-And-Swap) retry logic, JSON helpers, and consistent error handling for configuration management scenarios.
Basic Usage ¶
Creating and connecting to NATS:
client, err := natsclient.NewClient("nats://localhost:4222")
if err != nil {
return err
}
ctx := context.Background()
err = client.Connect(ctx)
if err != nil {
return err
}
defer client.Close(ctx)
// Publish a message
err = client.Publish(ctx, "subject.name", []byte("message data"))
// Subscribe to messages (receives full *nats.Msg for access to Subject, Data, Headers)
err = client.Subscribe(ctx, "subject.*", func(msgCtx context.Context, msg *nats.Msg) {
// Handle message with context (30s timeout per message)
// For wildcard subscriptions, msg.Subject contains the actual subject
fmt.Printf("Received on %s: %s\n", msg.Subject, string(msg.Data))
})
Advanced Configuration ¶
Creating client with options:
client, err := natsclient.NewClient("nats://localhost:4222",
natsclient.WithMaxReconnects(-1), // Infinite reconnects
natsclient.WithReconnectWait(2*time.Second),
natsclient.WithCircuitBreakerThreshold(10),
natsclient.WithDisconnectCallback(func(err error) {
log.Printf("Disconnected: %v", err)
}),
natsclient.WithReconnectCallback(func() {
log.Println("Reconnected successfully")
}),
)
JetStream Operations ¶
Working with JetStream streams and consumers:
// Create a stream
stream, err := client.CreateStream(ctx, jetstream.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.>"},
})
// Publish to stream
err = client.PublishToStream(ctx, "events.user.created", []byte(`{"user_id": "123"}`))
// Consume from stream (receives full jetstream.Msg for access to Subject, Data, Headers)
// Handler is responsible for calling msg.Ack() after processing
err = client.ConsumeStream(ctx, "EVENTS", "events.>", func(msg jetstream.Msg) {
// Process event - msg.Subject() contains actual subject for wildcard filters
processEvent(msg.Subject(), msg.Data())
msg.Ack()
})
Key-Value Store ¶
Using KVStore for configuration management with atomic updates:
// Create or get KV bucket
bucket, err := client.CreateKeyValueBucket(ctx, jetstream.KeyValueConfig{
Bucket: "config",
History: 5,
Replicas: 3,
})
// Create KVStore wrapper
kvStore := client.NewKVStore(bucket)
// Atomic JSON update with automatic CAS retry
err = kvStore.UpdateJSON(ctx, "service.config", func(config map[string]any) error {
// This function may be called multiple times on conflict
config["enabled"] = true
config["workers"] = 10
return nil
})
// Get JSON value
var config map[string]any
err = kvStore.GetJSON(ctx, "service.config", &config)
Circuit Breaker Pattern ¶
The circuit breaker protects against cascading failures:
// Circuit states:
// - Closed: Normal operation, requests pass through
// - Open: Failures exceeded threshold, failing fast
// - Half-Open: Testing if system recovered
err := client.Connect(ctx)
if errors.Is(err, natsclient.ErrCircuitOpen) {
// Circuit is open, wait for it to test recovery
log.Println("Circuit breaker is open, backing off...")
time.Sleep(client.Backoff())
// Retry later
}
Circuit breaker configuration:
client, err := natsclient.NewClient(url,
natsclient.WithCircuitBreakerThreshold(5), // Open after 5 failures
natsclient.WithMaxBackoff(time.Minute), // Max backoff duration
)
Connection Status and Health ¶
Monitoring connection health:
// Check current status
status := client.Status()
switch status {
case natsclient.StatusConnected:
// Healthy and ready
case natsclient.StatusReconnecting:
// Temporarily disconnected, reconnecting
case natsclient.StatusCircuitOpen:
// Circuit breaker is open
case natsclient.StatusDisconnected:
// Not connected
}
// Get detailed status
statusInfo := client.GetStatus()
log.Printf("Status: %v, Failures: %d, RTT: %v",
statusInfo.Status,
statusInfo.FailureCount,
statusInfo.RTT)
// Wait for connection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := client.WaitForConnection(ctx)
Health monitoring with callbacks:
client, err := natsclient.NewClient(url,
natsclient.WithHealthCheck(10*time.Second),
natsclient.WithHealthChangeCallback(func(healthy bool) {
if healthy {
log.Println("Connection restored")
} else {
log.Println("Connection lost")
}
}),
)
Error Handling ¶
The package defines specific error types for different failure scenarios:
var (
ErrCircuitOpen = errors.New("circuit breaker is open")
ErrNotConnected = errors.New("not connected to NATS")
ErrConnectionTimeout = errors.New("connection timeout")
)
Error detection patterns:
err := client.Publish(ctx, "subject", data)
if err != nil {
// Check for circuit breaker
if errors.Is(err, natsclient.ErrCircuitOpen) {
// Back off and retry later
return
}
// Check for connection issues
if errors.Is(err, natsclient.ErrNotConnected) {
// Trigger reconnection
return
}
// Other error
log.Printf("Publish failed: %v", err)
}
KV-specific error handling:
err := kvStore.UpdateJSON(ctx, key, updateFn)
if err != nil {
// Check for key not found
if natsclient.IsKVNotFoundError(err) {
// Key doesn't exist, create it
}
// Check for conflict (CAS failed after retries)
if natsclient.IsKVConflictError(err) {
// Too many concurrent updates
}
}
Connection Options ¶
Available configuration options:
WithMaxReconnects(n int) // Maximum reconnection attempts (-1 = infinite) WithReconnectWait(d time.Duration) // Wait between reconnection attempts WithTimeout(d time.Duration) // Connection timeout WithDrainTimeout(d time.Duration) // Timeout for graceful shutdown WithPingInterval(d time.Duration) // Health check interval WithCircuitBreakerThreshold(n int) // Failures before circuit opens WithMaxBackoff(d time.Duration) // Maximum backoff duration WithLogger(logger Logger) // Custom logger for debug output WithHealthCheck(d time.Duration) // Enable health monitoring WithClientName(name string) // Client identification
Authentication and Security ¶
Username/password authentication:
client, err := natsclient.NewClient(url,
natsclient.WithCredentials("username", "password"),
)
Token authentication:
client, err := natsclient.NewClient(url,
natsclient.WithToken("auth-token"),
)
TLS configuration:
client, err := natsclient.NewClient(url,
natsclient.WithTLS(true),
natsclient.WithTLSCerts("client.crt", "client.key"),
natsclient.WithTLSCA("ca.crt"),
)
Note: Credentials are cleared from memory when the client is closed.
Testing ¶
The package provides test utilities for integration testing:
func TestMyService(t *testing.T) {
// Create test client with real NATS via testcontainers
testClient := natsclient.NewTestClient(t,
natsclient.WithJetStream(),
natsclient.WithKV(),
)
defer testClient.Close()
client := testClient.Client
// Test with real NATS server
err := client.Publish(ctx, "test.subject", []byte("test data"))
assert.NoError(t, err)
}
Testing patterns:
- Uses real NATS server via testcontainers (no mocks)
- Tests actual behavior including connection lifecycle
- Thread-safe testing with proper synchronization
- Comprehensive circuit breaker scenario testing
Thread Safety ¶
The Client type is thread-safe and can be used concurrently from multiple goroutines:
- All public methods are safe for concurrent use
- Connection state is managed with atomic operations and mutexes
- Subscriptions and consumers can be created from any goroutine
- Close() can only be called once (subsequent calls are no-ops)
Performance Considerations ¶
Concurrency: Thread-safe for concurrent use from multiple goroutines. No artificial concurrency limits - scales with available system resources.
Memory: Memory usage scales with number of active subscriptions and consumers. Each subscription maintains its own message buffer. Health monitoring adds minimal overhead (one goroutine with configurable interval).
Throughput: Limited primarily by network latency and NATS server performance. Circuit breaker adds negligible overhead in normal operation and fails fast when open.
Connection Lifecycle: Reconnection uses exponential backoff to avoid overwhelming the server during failures. Maximum backoff is configurable (default: 1 minute).
Distributed Tracing ¶
The package provides W3C-compliant trace context propagation for distributed tracing. All publish and request methods automatically generate trace context if none exists, ensuring complete observability across the message flow.
Trace headers are injected into all outbound NATS messages:
- traceparent: W3C Trace Context header (00-{trace_id}-{span_id}-{flags})
- X-Trace-ID: Simplified trace ID header
- X-Span-ID: Current span identifier
- X-Parent-Span-ID: Parent span for nested operations
Using trace context:
// Traces are auto-generated if not present
err := client.Publish(ctx, "subject", data)
// Or provide explicit trace context
tc := natsclient.NewTraceContext()
ctx = natsclient.ContextWithTrace(ctx, tc)
err := client.Publish(ctx, "subject", data)
// Extract trace from received message
tc := natsclient.ExtractTrace(msg)
if tc != nil {
log.Printf("Trace ID: %s, Span ID: %s", tc.TraceID, tc.SpanID)
}
Creating child spans for nested operations:
parentTC, _ := natsclient.TraceContextFromContext(ctx) childTC := parentTC.NewSpan() childCtx := natsclient.ContextWithTrace(ctx, childTC) err := client.Request(childCtx, "service.action", data, timeout)
Architecture Integration ¶
The natsclient package integrates with StreamKit components:
- service: Services use natsclient for pub/sub communication
- config: Manager uses KV store for runtime configuration
- component: Components receive natsclient for messaging
- engine: Flow engine coordinates component communication via NATS
Data flow:
Application → Client → Circuit Breaker → NATS Connection → NATS Server
Design Decisions ¶
Circuit Breaker over Simple Retry: Chose circuit breaker pattern to prevent cascade failures in distributed systems. After threshold failures, the circuit opens to fail fast rather than continuously retry, giving the system time to recover.
Context-First API: Every I/O operation requires context.Context as first parameter for proper cancellation and timeout support, essential for production systems.
KVStore Abstraction: Created high-level KV abstraction with built-in CAS retry logic to eliminate code duplication across services. Centralizes revision conflict handling and retry logic.
Testcontainers over Mocks: Integration tests use real NATS server via testcontainers to catch actual integration issues. Mock-based testing can miss edge cases in the NATS protocol implementation.
Examples ¶
Resilient publisher with automatic reconnection:
package main
import (
"context"
"log"
"time"
"github.com/c360studio/semstreams/natsclient"
)
func main() {
client, err := natsclient.NewClient("nats://localhost:4222",
natsclient.WithMaxReconnects(-1),
natsclient.WithLogger(log.Default()),
)
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
if err := client.Connect(ctx); err != nil {
log.Fatal(err)
}
defer client.Close(ctx)
// Publish with automatic reconnection handling
for {
err := client.Publish(ctx, "telemetry.data", []byte("sensor reading"))
if err != nil {
if errors.Is(err, natsclient.ErrCircuitOpen) {
log.Println("Circuit open, waiting...")
time.Sleep(5 * time.Second)
continue
}
log.Printf("Publish error: %v", err)
}
time.Sleep(time.Second)
}
}
Configuration management with atomic updates:
// Manage service configuration with optimistic locking
bucket, _ := client.CreateKeyValueBucket(ctx, jetstream.KeyValueConfig{
Bucket: "config",
History: 5,
Replicas: 3,
})
kvStore := client.NewKVStore(bucket)
// Atomic configuration update with automatic retry
err = kvStore.UpdateJSON(ctx, "services.processor", func(config map[string]any) error {
// This function may be called multiple times on conflict
config["workers"] = 10
config["timeout"] = "30s"
return nil
})
For more examples and detailed usage, see the README.md in this directory.
Package natsclient provides request/reply pattern support for NATS.
Package natsclient provides JetStream stream management utilities.
Package natsclient provides testcontainers-based NATS infrastructure for testing.
Package natsclient provides typed subject patterns for compile-time type safety.
Index ¶
- Constants
- Variables
- func ConsumeWithHeartbeat(ctx context.Context, msg jetstream.Msg, heartbeatInterval time.Duration, ...) error
- func ContextWithTrace(ctx context.Context, tc *TraceContext) context.Context
- func DetachContextWithTrace(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
- func FilteredKeys(ctx context.Context, kv jetstream.KeyValue, pattern string) ([]string, error)
- func InjectTrace(ctx context.Context, msg *nats.Msg)
- func IsKVConflictError(err error) bool
- func IsKVNotFoundError(err error) bool
- type Client
- func (m *Client) Backoff() time.Duration
- func (m *Client) Close(ctx context.Context) error
- func (m *Client) Connect(ctx context.Context) error
- func (m *Client) ConnectionOptions() []nats.Option
- func (m *Client) ConsumeStream(ctx context.Context, streamName, subject string, handler func(jetstream.Msg)) error
- func (c *Client) ConsumeStreamWithConfig(ctx context.Context, cfg StreamConsumerConfig, ...) error
- func (m *Client) CreateKeyValueBucket(ctx context.Context, cfg jetstream.KeyValueConfig) (jetstream.KeyValue, error)
- func (m *Client) CreateStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error)
- func (m *Client) DeleteKeyValueBucket(ctx context.Context, name string) error
- func (c *Client) EnsureStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error)
- func (m *Client) Failures() int32
- func (m *Client) GetConnection() *nats.Conn
- func (m *Client) GetKeyValueBucket(ctx context.Context, name string) (jetstream.KeyValue, error)
- func (m *Client) GetStatus() *Status
- func (m *Client) GetStream(ctx context.Context, name string) (jetstream.Stream, error)
- func (m *Client) IsHealthy() bool
- func (m *Client) JetStream() (jetstream.JetStream, error)
- func (m *Client) ListKeyValueBuckets(ctx context.Context) ([]string, error)
- func (m *Client) MaxReconnects() int
- func (c *Client) NewKVStore(bucket jetstream.KeyValue, opts ...func(*KVOptions)) *KVStore
- func (m *Client) OnHealthChange(fn func(bool))
- func (m *Client) PingInterval() time.Duration
- func (m *Client) Publish(ctx context.Context, subject string, data []byte) error
- func (m *Client) PublishToStream(ctx context.Context, subject string, data []byte) error
- func (c *Client) PublishToStreamWithAck(ctx context.Context, subject string, data []byte) (*jetstream.PubAck, error)
- func (m *Client) RTT() (time.Duration, error)
- func (m *Client) ReconnectWait() time.Duration
- func (c *Client) Reply(ctx context.Context, replyTo string, data []byte) error
- func (c *Client) ReplyWithHeaders(ctx context.Context, replyTo string, data []byte, headers map[string]string) error
- func (c *Client) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)
- func (c *Client) RequestWithHeaders(ctx context.Context, subject string, data []byte, headers map[string]string, ...) (*nats.Msg, error)
- func (c *Client) RequestWithRetry(ctx context.Context, subject string, data []byte, timeout time.Duration, ...) ([]byte, error)
- func (m *Client) SetConnection(conn *nats.Conn)
- func (m *Client) Status() ConnectionStatus
- func (c *Client) StopAllConsumers()
- func (c *Client) StopAndDeleteConsumer(ctx context.Context, streamName, consumerName string) error
- func (c *Client) StopConsumer(streamName, consumerName string)
- func (m *Client) Subscribe(ctx context.Context, subject string, handler func(context.Context, *nats.Msg)) (*Subscription, error)
- func (c *Client) SubscribeForRequests(ctx context.Context, subject string, ...) (*Subscription, error)
- func (m *Client) URLs() string
- func (m *Client) WaitForConnection(ctx context.Context) error
- func (m *Client) WithHealthCheck(interval time.Duration)
- type ClientOption
- func WithCircuitBreakerThreshold(threshold int32) ClientOption
- func WithCompression(enabled bool) ClientOption
- func WithConnectionLostCallback(fn func(error)) ClientOption
- func WithCredentials(username, password string) ClientOption
- func WithDisconnectCallback(fn func(error)) ClientOption
- func WithDrainTimeout(d time.Duration) ClientOption
- func WithHealthChangeCallback(fn func(healthy bool)) ClientOption
- func WithHealthInterval(d time.Duration) ClientOption
- func WithLogger(logger Logger) ClientOption
- func WithMaxBackoff(d time.Duration) ClientOption
- func WithMaxReconnects(max int) ClientOption
- func WithMetrics(registry *metric.MetricsRegistry) ClientOption
- func WithName(name string) ClientOption
- func WithPingInterval(d time.Duration) ClientOption
- func WithReconnectCallback(fn func()) ClientOption
- func WithReconnectWait(d time.Duration) ClientOption
- func WithTLS(certFile, keyFile, caFile string) ClientOption
- func WithTimeout(d time.Duration) ClientOption
- func WithToken(token string) ClientOption
- type Codec
- type ConnectionStatus
- type JSONCodec
- type KVEntry
- type KVOptions
- type KVStore
- func (kv *KVStore) Create(ctx context.Context, key string, value []byte) (uint64, error)
- func (kv *KVStore) Delete(ctx context.Context, key string) error
- func (kv *KVStore) Get(ctx context.Context, key string) (*KVEntry, error)
- func (kv *KVStore) Keys(ctx context.Context) ([]string, error)
- func (kv *KVStore) KeysByPrefix(ctx context.Context, prefix string) ([]string, error)
- func (kv *KVStore) Put(ctx context.Context, key string, value []byte) (uint64, error)
- func (kv *KVStore) Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error)
- func (kv *KVStore) UpdateJSON(ctx context.Context, key string, updateFn func(current map[string]any) error) error
- func (kv *KVStore) UpdateWithRetry(ctx context.Context, key string, updateFn func(current []byte) ([]byte, error)) error
- func (kv *KVStore) Watch(ctx context.Context, pattern string) (jetstream.KeyWatcher, error)
- type Logger
- type RetryConfig
- type Status
- type StreamAutoCreateConfig
- type StreamConsumerConfig
- type Subject
- func (s Subject[T]) Publish(ctx context.Context, client *Client, payload T) error
- func (s Subject[T]) PublishToStream(ctx context.Context, client *Client, payload T) error
- func (s Subject[T]) Subscribe(ctx context.Context, client *Client, handler func(context.Context, T) error) (*Subscription, error)
- func (s Subject[T]) SubscribeWithMsg(ctx context.Context, client *Client, ...) (*Subscription, error)
- type Subscription
- type TemporalResolver
- func (tr *TemporalResolver) Close() error
- func (tr *TemporalResolver) GetAtTimestamp(ctx context.Context, key string, targetTime time.Time) (jetstream.KeyValueEntry, error)
- func (tr *TemporalResolver) GetInTimeRange(ctx context.Context, key string, startTime, endTime time.Time) ([]jetstream.KeyValueEntry, error)
- func (tr *TemporalResolver) GetRangeAtTimestamp(ctx context.Context, keys []string, targetTime time.Time) (map[string]jetstream.KeyValueEntry, error)
- func (tr *TemporalResolver) GetRangeInTimeRange(ctx context.Context, keys []string, startTime, endTime time.Time) (map[string][]jetstream.KeyValueEntry, error)
- func (tr *TemporalResolver) GetStats() *cache.Statistics
- type TestClient
- func (tc *TestClient) CreateKVBucket(ctx context.Context, name string) (jetstream.KeyValue, error)
- func (tc *TestClient) CreateStream(ctx context.Context, name string, subjects []string) (jetstream.Stream, error)
- func (tc *TestClient) GetKVBucket(ctx context.Context, name string) (jetstream.KeyValue, error)
- func (tc *TestClient) GetNativeConnection() *gonats.Conn
- func (tc *TestClient) GetStream(ctx context.Context, name string) (jetstream.Stream, error)
- func (tc *TestClient) IsReady() bool
- func (tc *TestClient) PrefixedBucketName(name string) string
- func (tc *TestClient) Terminate() error
- type TestOption
- func WithBucketPrefix(prefix string) TestOption
- func WithE2EDefaults() TestOption
- func WithFastStartup() TestOption
- func WithFileStorage() TestOption
- func WithIntegrationDefaults() TestOption
- func WithJetStream() TestOption
- func WithKV() TestOption
- func WithKVBuckets(buckets ...string) TestOption
- func WithMinimalFeatures() TestOption
- func WithNATSVersion(version string) TestOption
- func WithProductionLike() TestOption
- func WithStartTimeout(timeout time.Duration) TestOption
- func WithStreams(streams ...TestStreamConfig) TestOption
- func WithTestTimeout(timeout time.Duration) TestOption
- type TestStreamConfig
- type TraceContext
Constants ¶
const ( // TraceparentHeader is the W3C standard trace context header // Format: 00-{trace_id}-{span_id}-{flags} // trace_id: 32 hex chars (16 bytes), span_id: 16 hex chars (8 bytes) TraceparentHeader = "traceparent" // TraceIDHeader is a simplified header for internal use TraceIDHeader = "X-Trace-ID" // SpanIDHeader is a simplified header for internal use SpanIDHeader = "X-Span-ID" // ParentSpanHeader is a simplified header for internal use ParentSpanHeader = "X-Parent-Span-ID" )
W3C Trace Context headers
const DefaultRequestTimeout = 5 * time.Second
DefaultRequestTimeout is the default timeout for request/reply operations.
Variables ¶
var ( ErrNotConnected = stderrors.New("not connected to NATS") ErrCircuitOpen = stderrors.New("circuit breaker is open") ErrConnectionTimeout = stderrors.New("connection timeout") )
Error messages
var ( ErrKVKeyNotFound = errors.New("kv: key not found") ErrKVKeyExists = errors.New("kv: key already exists") ErrKVRevisionMismatch = errors.New("kv: revision mismatch (concurrent update)") ErrKVMaxRetriesExceeded = errors.New("kv: max retries exceeded") )
Well-known errors matching Graph processor patterns
Functions ¶
func ConsumeWithHeartbeat ¶
func ConsumeWithHeartbeat( ctx context.Context, msg jetstream.Msg, heartbeatInterval time.Duration, work func(context.Context) error, ) error
ConsumeWithHeartbeat runs work in a goroutine while periodically calling msg.InProgress() to reset the AckWait clock. This allows short AckWait values for failure detection while supporting arbitrarily long processing.
Ack/Nak ownership: this function calls Ack, NakWithDelay, or Nak on the message. The caller must NOT call these methods when using this helper.
On work success: msg.Ack() On work error: msg.NakWithDelay(30s) to allow breathing room before retry On context cancellation: msg.NakWithDelay(5s) for graceful shutdown On InProgress failure: returns error (message will be redelivered by server)
func ContextWithTrace ¶
func ContextWithTrace(ctx context.Context, tc *TraceContext) context.Context
ContextWithTrace returns a context with trace information
func DetachContextWithTrace ¶
func DetachContextWithTrace(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)
DetachContextWithTrace creates a new context that preserves trace context but resets deadline/cancellation. This is useful for publishing error responses when the original context has expired but trace continuity is still needed.
func FilteredKeys ¶
FilteredKeys returns keys matching a NATS wildcard pattern from a raw jetstream.KeyValue bucket. Use this for components that hold jetstream.KeyValue directly instead of *KVStore. The pattern should be a valid NATS subject filter (e.g., "0.>" for level-0 community keys). Returns nil, nil when no keys match.
func InjectTrace ¶
InjectTrace adds trace headers to a NATS message from context
func IsKVConflictError ¶
IsKVConflictError checks if error indicates a conflict (key exists or wrong revision)
func IsKVNotFoundError ¶
IsKVNotFoundError checks if error indicates key not found
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client manages NATS connections with circuit breaker pattern
func NewClient ¶
func NewClient(urls string, opts ...ClientOption) (*Client, error)
NewClient creates a new NATS client with optional configuration. The urls parameter accepts comma-separated NATS server URLs for clustering support (e.g., "nats://server1:4222,nats://server2:4222").
func (*Client) ConnectionOptions ¶
ConnectionOptions returns the NATS connection options
func (*Client) ConsumeStream ¶
func (m *Client) ConsumeStream(ctx context.Context, streamName, subject string, handler func(jetstream.Msg)) error
ConsumeStream creates a consumer for a stream. Handler receives the full jetstream.Msg to access Subject, Data, Headers, etc. This is essential for wildcard subscriptions where the actual subject differs from the pattern. The handler is responsible for calling msg.Ack() after processing.
func (*Client) ConsumeStreamWithConfig ¶
func (c *Client) ConsumeStreamWithConfig( ctx context.Context, cfg StreamConsumerConfig, handler func(ctx context.Context, msg jetstream.Msg), ) error
ConsumeStreamWithConfig creates a JetStream consumer with full configuration. The handler receives the raw jetstream.Msg which includes Ack(), Nak(), and Term() methods. Handler MUST call one of these methods to acknowledge the message.
func (*Client) CreateKeyValueBucket ¶
func (m *Client) CreateKeyValueBucket(ctx context.Context, cfg jetstream.KeyValueConfig) (jetstream.KeyValue, error)
CreateKeyValueBucket creates or gets a KV bucket with configuration
func (*Client) CreateStream ¶
func (m *Client) CreateStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error)
CreateStream creates a JetStream stream
func (*Client) DeleteKeyValueBucket ¶
DeleteKeyValueBucket deletes a KV bucket
func (*Client) EnsureStream ¶
func (c *Client) EnsureStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error)
EnsureStream creates a stream if it doesn't exist, or returns the existing one.
func (*Client) GetConnection ¶
GetConnection returns the current NATS connection
func (*Client) GetKeyValueBucket ¶
GetKeyValueBucket gets an existing KV bucket
func (*Client) ListKeyValueBuckets ¶
ListKeyValueBuckets lists all KV buckets
func (*Client) MaxReconnects ¶
MaxReconnects returns the maximum number of reconnection attempts
func (*Client) NewKVStore ¶
NewKVStore creates a new KV store with the given bucket
func (*Client) OnHealthChange ¶
OnHealthChange sets a callback for health status changes
func (*Client) PingInterval ¶
PingInterval returns the interval for health checks
func (*Client) PublishToStream ¶
PublishToStream publishes to a JetStream stream with automatic trace context propagation. If no trace context exists in ctx, one is auto-generated for distributed tracing.
func (*Client) PublishToStreamWithAck ¶
func (c *Client) PublishToStreamWithAck( ctx context.Context, subject string, data []byte, ) (*jetstream.PubAck, error)
PublishToStreamWithAck publishes a message to a JetStream subject with acknowledgment. If AutoCreate is true and the stream doesn't exist, it will be created. Trace context is auto-generated if not present, and propagated via NATS message headers.
func (*Client) ReconnectWait ¶
ReconnectWait returns the wait duration between reconnection attempts
func (*Client) Reply ¶
Reply sends a reply to a request message. This is typically used by service handlers to respond to requests.
func (*Client) ReplyWithHeaders ¶
func (c *Client) ReplyWithHeaders(ctx context.Context, replyTo string, data []byte, headers map[string]string) error
ReplyWithHeaders sends a reply with custom headers.
func (*Client) Request ¶
func (c *Client) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)
Request performs a synchronous request/reply operation. It publishes a message to the subject and waits for a response. The timeout parameter controls how long to wait for a response. If timeout is 0, DefaultRequestTimeout is used.
func (*Client) RequestWithHeaders ¶
func (c *Client) RequestWithHeaders( ctx context.Context, subject string, data []byte, headers map[string]string, timeout time.Duration, ) (*nats.Msg, error)
RequestWithHeaders performs a request/reply operation with custom headers. Headers are passed as a map and converted to NATS message headers. Returns the full NATS message to allow access to response headers.
func (*Client) RequestWithRetry ¶
func (c *Client) RequestWithRetry( ctx context.Context, subject string, data []byte, timeout time.Duration, retry RetryConfig, ) ([]byte, error)
RequestWithRetry performs a request with configurable retry on failure. This is useful for handling transient "no responders" errors in NATS where the subscriber may not be ready when the request arrives.
func (*Client) SetConnection ¶
SetConnection sets the NATS connection (for testing)
func (*Client) Status ¶
func (m *Client) Status() ConnectionStatus
Status returns the current connection status
func (*Client) StopAllConsumers ¶
func (c *Client) StopAllConsumers()
StopAllConsumers stops all active consumers.
func (*Client) StopAndDeleteConsumer ¶
StopAndDeleteConsumer stops a specific consumer and deletes the durable consumer from the server. WARNING: This permanently removes the consumer's position. Use only for test cleanup or when you intentionally want to reset consumer state.
func (*Client) StopConsumer ¶
StopConsumer stops a specific consumer by stream and consumer name. This stops the local consume context but does not delete the durable consumer from the server.
func (*Client) Subscribe ¶
func (m *Client) Subscribe(ctx context.Context, subject string, handler func(context.Context, *nats.Msg)) (*Subscription, error)
Subscribe subscribes to a NATS subject with context propagation. Each message handler receives the full *nats.Msg to access Subject, Data, Headers, etc. This is essential for wildcard subscriptions where the actual subject differs from the pattern. The context is derived from the parent context with a 30-second timeout for message processing. Returns a Subscription handle that can be used to unsubscribe.
func (*Client) SubscribeForRequests ¶
func (c *Client) SubscribeForRequests( ctx context.Context, subject string, handler func(ctx context.Context, data []byte) ([]byte, error), ) (*Subscription, error)
SubscribeForRequests subscribes to a subject and handles request/reply patterns. The handler receives the message data and reply subject, and should return the response data or an error. Returns the Subscription so the caller can unsubscribe when done. This is a convenience method for implementing request/reply services.
func (*Client) WaitForConnection ¶
WaitForConnection waits for the connection to be established
func (*Client) WithHealthCheck ¶
WithHealthCheck enables health monitoring with a specified interval
type ClientOption ¶
ClientOption is a functional option for configuring the Client
func WithCircuitBreakerThreshold ¶
func WithCircuitBreakerThreshold(threshold int32) ClientOption
WithCircuitBreakerThreshold sets the number of failures before opening circuit
func WithCompression ¶
func WithCompression(enabled bool) ClientOption
WithCompression enables message compression
func WithConnectionLostCallback ¶
func WithConnectionLostCallback(fn func(error)) ClientOption
WithConnectionLostCallback sets a callback for when connection is completely lost
func WithCredentials ¶
func WithCredentials(username, password string) ClientOption
WithCredentials sets username and password for authentication
func WithDisconnectCallback ¶
func WithDisconnectCallback(fn func(error)) ClientOption
WithDisconnectCallback sets a callback for disconnection events This is in addition to NATS's built-in disconnect handler
func WithDrainTimeout ¶
func WithDrainTimeout(d time.Duration) ClientOption
WithDrainTimeout sets the timeout for draining on disconnect
func WithHealthChangeCallback ¶
func WithHealthChangeCallback(fn func(healthy bool)) ClientOption
WithHealthChangeCallback sets a callback for health status changes
func WithHealthInterval ¶
func WithHealthInterval(d time.Duration) ClientOption
WithHealthInterval sets the interval for health monitoring
func WithLogger ¶
func WithLogger(logger Logger) ClientOption
WithLogger sets a custom logger for the client
func WithMaxBackoff ¶
func WithMaxBackoff(d time.Duration) ClientOption
WithMaxBackoff sets the maximum backoff duration for circuit breaker
func WithMaxReconnects ¶
func WithMaxReconnects(max int) ClientOption
WithMaxReconnects sets the maximum number of reconnection attempts (-1 for infinite)
func WithMetrics ¶
func WithMetrics(registry *metric.MetricsRegistry) ClientOption
WithMetrics enables JetStream metrics collection using the provided registry. Metrics will track streams and consumers created through this client.
func WithName ¶
func WithName(name string) ClientOption
WithName sets the client name for identification
func WithPingInterval ¶
func WithPingInterval(d time.Duration) ClientOption
WithPingInterval sets the ping interval for connection health checks
func WithReconnectCallback ¶
func WithReconnectCallback(fn func()) ClientOption
WithReconnectCallback sets a callback for reconnection events This is in addition to NATS's built-in reconnect handler
func WithReconnectWait ¶
func WithReconnectWait(d time.Duration) ClientOption
WithReconnectWait sets the wait time between reconnection attempts
func WithTLS ¶
func WithTLS(certFile, keyFile, caFile string) ClientOption
WithTLS enables TLS with optional certificate paths
func WithTimeout ¶
func WithTimeout(d time.Duration) ClientOption
WithTimeout sets the connection timeout
func WithToken ¶
func WithToken(token string) ClientOption
WithToken sets a token for authentication
type Codec ¶
Codec defines the serialization interface for typed subjects. Implementations handle marshaling and unmarshaling of typed payloads.
type ConnectionStatus ¶
type ConnectionStatus int
ConnectionStatus represents the state of the NATS connection
const ( StatusDisconnected ConnectionStatus = iota StatusConnecting StatusConnected StatusReconnecting StatusCircuitOpen )
Possible connection statuses
func (ConnectionStatus) String ¶
func (s ConnectionStatus) String() string
String returns the string representation of ConnectionStatus
type JSONCodec ¶
type JSONCodec[T any] struct{}
JSONCodec provides JSON serialization for typed subjects. This is the default codec for most use cases.
type KVOptions ¶
type KVOptions struct {
MaxRetries int // Maximum CAS retry attempts
RetryDelay time.Duration // Initial delay between retries
Timeout time.Duration // Operation timeout
MaxValueSize int // Maximum size for values (default: 1MB)
UseExponentialBackoff bool // Enable exponential backoff with jitter
MaxRetryDelay time.Duration // Maximum delay between retries
}
KVOptions configures KV operations behavior
func DefaultKVOptions ¶
func DefaultKVOptions() KVOptions
DefaultKVOptions returns sensible defaults matching Graph processor
type KVStore ¶
type KVStore struct {
// contains filtered or unexported fields
}
KVStore provides high-level KV operations with built-in CAS support
func (*KVStore) KeysByPrefix ¶
KeysByPrefix returns all keys matching the given prefix. Uses JetStream KV's native key filtering for efficient server-side filtering. The prefix is automatically converted to a NATS wildcard pattern (prefix + ">").
func (*KVStore) Update ¶
func (kv *KVStore) Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error)
Update performs CAS update with explicit revision
func (*KVStore) UpdateJSON ¶
func (kv *KVStore) UpdateJSON(ctx context.Context, key string, updateFn func(current map[string]any) error) error
UpdateJSON performs CAS update on JSON data with automatic retry
type Logger ¶
type Logger interface {
Printf(format string, v ...any)
Errorf(format string, v ...any)
Debugf(format string, v ...any)
}
Logger interface for injecting custom loggers
type RetryConfig ¶
type RetryConfig struct {
MaxRetries int // Number of retry attempts (default: 3)
InitialBackoff time.Duration // First retry delay (default: 100ms)
MaxBackoff time.Duration // Cap on backoff growth (default: 2s)
BackoffMultiplier float64 // Exponential growth factor (default: 2.0)
}
RetryConfig configures retry behavior for requests.
func DefaultRetryConfig ¶
func DefaultRetryConfig() RetryConfig
DefaultRetryConfig returns sensible defaults for retry configuration.
type Status ¶
type Status struct {
Status ConnectionStatus
FailureCount int32
LastFailureTime time.Time
Reconnects int32
RTT time.Duration
}
Status holds runtime status information for the NATS manager
type StreamAutoCreateConfig ¶
type StreamAutoCreateConfig struct {
// Subjects for the stream. If empty, derived from FilterSubject.
Subjects []string
// Storage type: "file" (default) or "memory"
Storage string
// Retention policy: "limits" (default), "interest", "work_queue"
Retention string
// MaxAge is the maximum age of messages (default 7 days).
MaxAge time.Duration
// MaxBytes is the maximum total size (0 = unlimited).
MaxBytes int64
// MaxMsgs is the maximum number of messages (0 = unlimited).
MaxMsgs int64
// Replicas is the number of replicas (default 1).
Replicas int
}
StreamAutoCreateConfig configures automatic stream creation.
func DefaultStreamConfig ¶
func DefaultStreamConfig() *StreamAutoCreateConfig
DefaultStreamConfig returns default auto-create configuration.
type StreamConsumerConfig ¶
type StreamConsumerConfig struct {
// StreamName is the name of the stream to consume from (required).
StreamName string
// ConsumerName is the durable consumer name. If empty, creates an ephemeral consumer.
ConsumerName string
// FilterSubject filters messages within the stream. If empty, receives all messages.
FilterSubject string
// DeliverPolicy determines where to start delivering messages.
// Options: "all" (default), "last", "new", "by_start_time"
DeliverPolicy string
// AckPolicy determines how messages are acknowledged.
// Options: "explicit" (default), "none", "all"
AckPolicy string
// MaxDeliver is the maximum number of delivery attempts (0 = unlimited).
MaxDeliver int
// AckWait is how long to wait for an ack before redelivery.
// Default is 30 seconds.
AckWait time.Duration
// MaxAckPending limits the number of outstanding (unacknowledged) messages
// that can be delivered to a consumer. This provides backpressure to prevent
// overwhelming the consumer. 0 means unlimited (default NATS behavior).
MaxAckPending int
// AutoCreate enables automatic stream creation if it doesn't exist.
AutoCreate bool
// AutoCreateConfig is used when auto-creating a stream.
// If nil, defaults are used based on FilterSubject.
AutoCreateConfig *StreamAutoCreateConfig
// BackOff overrides AckWait per retry attempt. Index 0 is the first retry
// wait, index 1 is the second, and so on. The last value is used for all
// subsequent retries. If empty, AckWait applies uniformly.
BackOff []time.Duration
// MessageTimeout is the context timeout for processing each message.
// This timeout is passed to the handler and should accommodate the full
// processing time including any downstream calls (e.g., LLM requests).
// Default is 30 seconds if not specified.
MessageTimeout time.Duration
}
StreamConsumerConfig configures a JetStream consumer.
type Subject ¶
type Subject[T any] struct { // Pattern is the NATS subject pattern (may include wildcards for subscribe) Pattern string // Codec handles serialization/deserialization Codec Codec[T] }
Subject represents a typed NATS subject with compile-time type safety. It binds a subject pattern to a specific payload type and codec.
Example:
var WorkflowStarted = Subject[WorkflowStartedEvent]{
Pattern: "workflow.events.started",
Codec: JSONCodec[WorkflowStartedEvent]{},
}
// Type-safe publish
err := WorkflowStarted.Publish(ctx, client, event)
// Type-safe subscribe
sub, err := WorkflowStarted.Subscribe(ctx, client, func(ctx context.Context, event WorkflowStartedEvent) error {
// event is already typed - no assertions needed
return nil
})
func NewSubject ¶
NewSubject creates a typed subject with a JSON codec (most common case).
func NewSubjectWithCodec ¶
NewSubjectWithCodec creates a typed subject with a custom codec.
func (Subject[T]) Publish ¶
Publish sends a typed payload to the subject. The payload is serialized using the subject's codec before publishing.
func (Subject[T]) PublishToStream ¶
PublishToStream sends a typed payload to a JetStream subject. The payload is serialized using the subject's codec before publishing.
func (Subject[T]) Subscribe ¶
func (s Subject[T]) Subscribe(ctx context.Context, client *Client, handler func(context.Context, T) error) (*Subscription, error)
Subscribe creates a subscription to the subject with type-safe message handling. The handler receives deserialized payloads directly.
func (Subject[T]) SubscribeWithMsg ¶
func (s Subject[T]) SubscribeWithMsg(ctx context.Context, client *Client, handler func(context.Context, *nats.Msg, T) error) (*Subscription, error)
SubscribeWithMsg creates a subscription that provides both the typed payload and raw message. Use this when you need access to message metadata (subject, headers, etc.).
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription wraps a NATS subscription for lifecycle management
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe() error
Unsubscribe unsubscribes from the subject
type TemporalResolver ¶
type TemporalResolver struct {
// contains filtered or unexported fields
}
TemporalResolver provides efficient timestamp-based KV queries with caching
func NewTemporalResolver ¶
NewTemporalResolver creates a resolver for timestamp-based queries The context is used for the cache background cleanup goroutine lifecycle
func NewTemporalResolverWithCache ¶
func NewTemporalResolverWithCache( ctx context.Context, bucket jetstream.KeyValue, cacheTTL time.Duration, ) (*TemporalResolver, error)
NewTemporalResolverWithCache creates a resolver with custom cache TTL The context is used for the cache background cleanup goroutine lifecycle
func (*TemporalResolver) Close ¶
func (tr *TemporalResolver) Close() error
Close shuts down the temporal resolver and its cache
func (*TemporalResolver) GetAtTimestamp ¶
func (tr *TemporalResolver) GetAtTimestamp( ctx context.Context, key string, targetTime time.Time, ) (jetstream.KeyValueEntry, error)
GetAtTimestamp finds the entity state that was current at the given timestamp Uses binary search for O(log n) performance with caching
func (*TemporalResolver) GetInTimeRange ¶
func (tr *TemporalResolver) GetInTimeRange( ctx context.Context, key string, startTime, endTime time.Time, ) ([]jetstream.KeyValueEntry, error)
GetInTimeRange finds all entity states within a time range Returns states ordered by timestamp
func (*TemporalResolver) GetRangeAtTimestamp ¶
func (tr *TemporalResolver) GetRangeAtTimestamp( ctx context.Context, keys []string, targetTime time.Time, ) (map[string]jetstream.KeyValueEntry, error)
GetRangeAtTimestamp finds multiple entities at a specific timestamp Useful for reconstructing entire system state at time T
func (*TemporalResolver) GetRangeInTimeRange ¶
func (tr *TemporalResolver) GetRangeInTimeRange( ctx context.Context, keys []string, startTime, endTime time.Time, ) (map[string][]jetstream.KeyValueEntry, error)
GetRangeInTimeRange finds multiple entities within a time range Returns a map of key -> entries within the range
func (*TemporalResolver) GetStats ¶
func (tr *TemporalResolver) GetStats() *cache.Statistics
GetStats returns cache statistics for monitoring
type TestClient ¶
type TestClient struct {
Client *Client // Drop-in replacement for existing natsclient.Client
URL string
BucketPrefix string // Prefix applied to all KV bucket names for test isolation
// contains filtered or unexported fields
}
TestClient provides testcontainers-based NATS for testing
func NewSharedTestClient ¶
func NewSharedTestClient(opts ...TestOption) (*TestClient, error)
NewSharedTestClient creates a new NATS test container for use in TestMain Unlike NewTestClient, this doesn't require testing.T and returns errors
func NewTestClient ¶
func NewTestClient(t testing.TB, opts ...TestOption) *TestClient
NewTestClient creates a new NATS test container Accepts testing.TB so it works with both *testing.T and *testing.B
func (*TestClient) CreateKVBucket ¶
CreateKVBucket is a helper for creating KV buckets during tests. The bucket prefix is automatically applied if configured.
func (*TestClient) CreateStream ¶
func (tc *TestClient) CreateStream(ctx context.Context, name string, subjects []string) (jetstream.Stream, error)
CreateStream is a helper for creating JetStream streams during tests
func (*TestClient) GetKVBucket ¶
GetKVBucket is a helper for getting existing KV buckets during tests. The bucket prefix is automatically applied if configured.
func (*TestClient) GetNativeConnection ¶
func (tc *TestClient) GetNativeConnection() *gonats.Conn
GetNativeConnection returns the underlying NATS connection for direct access
func (*TestClient) GetStream ¶
GetStream is a helper for getting existing JetStream streams during tests
func (*TestClient) IsReady ¶
func (tc *TestClient) IsReady() bool
IsReady checks if the NATS connection is ready for use
func (*TestClient) PrefixedBucketName ¶
func (tc *TestClient) PrefixedBucketName(name string) string
PrefixedBucketName returns the full bucket name with prefix applied. Use this when you need to pass bucket names to components that create their own buckets.
func (*TestClient) Terminate ¶
func (tc *TestClient) Terminate() error
Terminate manually terminates the container and client (usually handled by t.Cleanup)
type TestOption ¶
type TestOption func(*testConfig)
TestOption for configuring test client
func WithBucketPrefix ¶
func WithBucketPrefix(prefix string) TestOption
WithBucketPrefix sets a prefix for all KV bucket names to enable test isolation. When tests run in parallel, each test can use a unique prefix (e.g., test name) to avoid bucket name collisions.
func WithE2EDefaults ¶
func WithE2EDefaults() TestOption
WithE2EDefaults configures NATS with settings good for end-to-end tests
func WithFastStartup ¶
func WithFastStartup() TestOption
WithFastStartup configures NATS for fastest possible startup (good for unit tests)
func WithFileStorage ¶
func WithFileStorage() TestOption
WithFileStorage enables file-backed JetStream storage instead of the default memory-only store. Use this when tests create many KV buckets or write large volumes of data that would exceed the 256MB default memory limit.
func WithIntegrationDefaults ¶
func WithIntegrationDefaults() TestOption
WithIntegrationDefaults configures NATS with settings good for integration tests
func WithJetStream ¶
func WithJetStream() TestOption
WithJetStream enables JetStream for tests that need it
func WithKVBuckets ¶
func WithKVBuckets(buckets ...string) TestOption
WithKVBuckets pre-creates specific KV buckets
func WithMinimalFeatures ¶
func WithMinimalFeatures() TestOption
WithMinimalFeatures configures NATS with only basic pub/sub (fastest startup)
func WithNATSVersion ¶
func WithNATSVersion(version string) TestOption
WithNATSVersion specifies a specific NATS server version to use
func WithProductionLike ¶
func WithProductionLike() TestOption
WithProductionLike configures NATS with settings that mimic production
func WithStartTimeout ¶
func WithStartTimeout(timeout time.Duration) TestOption
WithStartTimeout sets the container startup timeout
func WithStreams ¶
func WithStreams(streams ...TestStreamConfig) TestOption
WithStreams pre-creates JetStream streams for testing
func WithTestTimeout ¶
func WithTestTimeout(timeout time.Duration) TestOption
WithTestTimeout sets the connection timeout for test client
type TestStreamConfig ¶
TestStreamConfig defines a stream to pre-create for testing
type TraceContext ¶
type TraceContext struct {
TraceID string // 32 hex chars (16 bytes)
SpanID string // 16 hex chars (8 bytes)
ParentSpanID string // 16 hex chars, empty for root span
Sampled bool
}
TraceContext holds trace information propagated through the system
func ExtractTrace ¶
func ExtractTrace(msg *nats.Msg) *TraceContext
ExtractTrace reads trace headers from a NATS message
func ExtractTraceFromJetStream ¶
func ExtractTraceFromJetStream(headers nats.Header) *TraceContext
ExtractTraceFromJetStream reads trace headers from a JetStream message
func NewTraceContext ¶
func NewTraceContext() *TraceContext
NewTraceContext creates a new trace context with generated IDs
func ParseTraceparent ¶
func ParseTraceparent(header string) (*TraceContext, error)
ParseTraceparent parses W3C traceparent header Format: {version}-{trace_id}-{span_id}-{flags} Example: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
func TraceContextFromContext ¶
func TraceContextFromContext(ctx context.Context) (*TraceContext, bool)
TraceContextFromContext extracts trace context from context
func (*TraceContext) FormatTraceparent ¶
func (tc *TraceContext) FormatTraceparent() string
FormatTraceparent formats trace context as W3C traceparent
func (*TraceContext) NewSpan ¶
func (tc *TraceContext) NewSpan() *TraceContext
NewSpan creates a child span from existing trace context