natsclient

package
v1.0.0-alpha.54 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2026 License: MIT Imports: 22 Imported by: 0

README

natsclient

NATS client with circuit breaker protection, automatic reconnection, and comprehensive JetStream/KV support for distributed edge systems.

Overview

The natsclient package provides a robust NATS client implementation designed for resilient operation in edge computing environments. It wraps the standard NATS Go client with additional reliability features including circuit breaker pattern for failure protection, exponential backoff for reconnection, and proper context propagation throughout all operations.

This package is the foundation for all NATS communication in the SemStreams framework, providing both core pub/sub functionality and advanced features like JetStream streams, consumers, and Key-Value stores. The client handles connection lifecycle management automatically, allowing applications to focus on business logic rather than connection state management.

Installation

import "github.com/c360/semstreams/natsclient"

Core Concepts

Circuit Breaker Pattern

The client implements a circuit breaker that prevents cascading failures by failing fast after a threshold of consecutive failures (default: 5). The circuit opens to prevent further attempts, then gradually tests the connection with exponential backoff.

Connection Lifecycle

Connection states transition through: Disconnected → Connecting → Connected → Reconnecting (on failure) → Connected. The client handles all transitions automatically with configurable callbacks for state changes.

KVStore Abstraction

A high-level abstraction over NATS KV providing automatic CAS (Compare-And-Swap) retry logic, JSON helpers, and consistent error handling for configuration management scenarios.

Distributed Tracing

All publish and request methods automatically propagate W3C-compliant trace context. If no trace exists in the context, one is auto-generated, ensuring every message can be correlated across services.

Headers injected:

  • traceparent - W3C Trace Context format (00-{trace_id}-{span_id}-{flags})
  • X-Trace-ID, X-Span-ID, X-Parent-Span-ID - Simplified headers for compatibility

Usage

Basic Example
// Create and connect to NATS
client, err := natsclient.NewClient("nats://localhost:4222")
if err != nil {
    return err
}

ctx := context.Background()
err = client.Connect(ctx)
if err != nil {
    return err
}
defer client.Close(ctx)

// Publish a message
err = client.Publish(ctx, "subject.name", []byte("message data"))

// Subscribe to messages
sub, err := client.Subscribe("subject.*", func(msg *nats.Msg) {
    // Handle message
    fmt.Printf("Received: %s\n", string(msg.Data))
})
Advanced Usage
// Create client with options
client, err := natsclient.NewClient("nats://localhost:4222",
    natsclient.WithMaxReconnects(-1),  // Infinite reconnects
    natsclient.WithReconnectWait(2*time.Second),
    natsclient.WithCircuitBreakerThreshold(10),
    natsclient.WithDisconnectCallback(func(err error) {
        log.Printf("Disconnected: %v", err)
    }),
    natsclient.WithReconnectCallback(func() {
        log.Println("Reconnected successfully")
    }),
)

// JetStream operations
js, err := client.JetStream()
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
    Name:     "EVENTS",
    Subjects: []string{"events.>"},
})

// Key-Value store with CAS
bucket, err := client.CreateKeyValueBucket(ctx, jetstream.KeyValueConfig{
    Bucket: "config",
})

kvStore := client.NewKVStore(bucket)
err = kvStore.UpdateJSON(ctx, "service.config", func(config map[string]any) error {
    config["enabled"] = true
    return nil
})
Tracing
// Traces are auto-generated - no action needed for basic tracing
err := client.Publish(ctx, "events.user", data)

// Explicit trace context
tc := natsclient.NewTraceContext()
ctx = natsclient.ContextWithTrace(ctx, tc)
err = client.Request(ctx, "service.action", data, 5*time.Second)

// Extract trace from incoming message
tc = natsclient.ExtractTrace(msg)
if tc != nil {
    // Create child span for downstream calls
    childCtx := natsclient.ContextWithTrace(ctx, tc.NewSpan())
    err = client.Publish(childCtx, "downstream.subject", response)
}

API Reference

Types
Client

Main client type providing NATS connectivity with circuit breaker protection.

type Client struct {
    // Internal fields not exposed
}
KVStore

High-level KV operations with built-in CAS support.

type KVStore struct {
    // Internal fields not exposed
}
KVOptions

Configuration for KV operations behavior.

type KVOptions struct {
    MaxRetries int           // Maximum CAS retry attempts (default: 3)
    RetryDelay time.Duration // Delay between retries (default: 10ms)
    Timeout    time.Duration // Operation timeout (default: 5s)
}
Functions
NewClient(url string, opts ...Option) (*Client, error)

Creates a new NATS client with the specified server URL and options.

(c *Client) Connect(ctx context.Context) error

Establishes connection to NATS server with circuit breaker protection.

(c *Client) Publish(ctx context.Context, subject string, data []byte) error

Publishes a message to the specified subject.

(c *Client) NewKVStore(bucket jetstream.KeyValue, opts ...func(*KVOptions)) *KVStore

Creates a KVStore instance for high-level KV operations.

(kv *KVStore) UpdateJSON(ctx context.Context, key string, updateFn func(map[string]any) error) error

Performs CAS update on JSON data with automatic retry on conflicts.

Interfaces
Logger
type Logger interface {
    Printf(format string, v ...any)
}

Optional logger interface for debug output.

Architecture

Design Decisions

Circuit Breaker over Simple Retry: Chose circuit breaker pattern to prevent cascade failures in distributed systems. After threshold failures, the circuit opens to fail fast rather than continuously retry, giving the system time to recover.

Context-First API: Every I/O operation requires context.Context as first parameter for proper cancellation and timeout support, essential for production systems.

KVStore Abstraction: Created high-level KV abstraction with built-in CAS retry logic to eliminate code duplication across services. Centralizes revision conflict handling and retry logic.

Integration Points
  • Dependencies: NATS server (2.x compatible)
  • Used By: All SemStreams services and components requiring messaging
  • Data Flow: Application → Client → Circuit Breaker → NATS Connection → Server

Configuration

Connection Options
WithMaxReconnects(-1)              // Infinite reconnects (default: 5)
WithReconnectWait(2*time.Second)   // Wait between reconnects (default: 2s)
WithTimeout(10*time.Second)        // Connection timeout (default: 5s)
WithCircuitBreakerThreshold(5)     // Failures before circuit opens (default: 5)
WithLogger(logger)                  // Enable debug logging
Callbacks
WithDisconnectCallback(func(err error) {
    // Handle disconnection event
})

WithReconnectCallback(func() {
    // Handle successful reconnection
})

WithClosedCallback(func() {
    // Handle connection closed
})

Error Handling

Error Types
var (
    ErrCircuitOpen        = errors.New("circuit breaker is open")
    ErrNotConnected       = errors.New("not connected to NATS")
    ErrKVKeyNotFound      = errors.New("kv: key not found")
    ErrKVRevisionMismatch = errors.New("kv: revision mismatch (concurrent update)")
    ErrKVMaxRetriesExceeded = errors.New("kv: max retries exceeded")
)
Error Detection
// Check specific errors
if errors.Is(err, natsclient.ErrCircuitOpen) {
    // Wait for circuit to close
}

// KV conflict detection
if errors.Is(err, natsclient.ErrKVRevisionMismatch) {
    // Handle concurrent update
}

// Helper functions for NATS errors
if natsclient.IsKVNotFoundError(err) {
    // Key doesn't exist
}

Testing

Test Utilities

The package provides comprehensive test utilities:

// Create test client with real NATS via testcontainers
testClient := natsclient.NewTestClient(t, 
    natsclient.WithJetStream(),
    natsclient.WithKV(),
)
defer testClient.Close()

client := testClient.Client
Testing Patterns
  • Uses real NATS server via testcontainers (no mocks)
  • Tests actual behavior including connection lifecycle
  • Thread-safe testing with proper synchronization
  • Comprehensive circuit breaker scenario testing

Performance Considerations

  • Concurrency: Thread-safe for concurrent use from multiple goroutines
  • Memory: Scales with number of active subscriptions and consumers
  • Throughput: Limited by network latency and NATS server performance
  • Circuit Breaker: Adds minimal overhead, fails fast when open

Examples

Resilient Publisher
package main

import (
    "context"
    "log"
    "time"

    "github.com/c360/semstreams/natsclient"
)

func main() {
    client, err := natsclient.NewClient("nats://localhost:4222",
        natsclient.WithMaxReconnects(-1),
        natsclient.WithLogger(log.Default()),
    )
    if err != nil {
        log.Fatal(err)
    }
    
    ctx := context.Background()
    if err := client.Connect(ctx); err != nil {
        log.Fatal(err)
    }
    defer client.Close(ctx)
    
    // Publish with automatic reconnection handling
    for {
        err := client.Publish(ctx, "telemetry.data", []byte("sensor reading"))
        if err != nil {
            if errors.Is(err, natsclient.ErrCircuitOpen) {
                log.Println("Circuit open, waiting...")
                time.Sleep(5 * time.Second)
                continue
            }
            log.Printf("Publish error: %v", err)
        }
        time.Sleep(time.Second)
    }
}
Configuration Management with KV
// Manage service configuration with optimistic locking
bucket, _ := client.CreateKeyValueBucket(ctx, jetstream.KeyValueConfig{
    Bucket:   "config",
    History:  5,
    Replicas: 3,
})

kvStore := client.NewKVStore(bucket)

// Atomic configuration update with retry
err = kvStore.UpdateJSON(ctx, "services.processor", func(config map[string]any) error {
    // This function may be called multiple times on conflict
    config["workers"] = 10
    config["timeout"] = "30s"
    return nil
})
  • service: Uses natsclient for service communication
  • component: Components receive natsclient for messaging
  • config: Manager uses KV store for runtime configuration

License

MIT

Documentation

Overview

Package natsclient provides a client for managing NATS connections with circuit breaker pattern.

Package natsclient provides a robust NATS client with circuit breaker protection, automatic reconnection, and comprehensive JetStream/KV support for distributed edge systems.

The natsclient package wraps the standard NATS Go client with additional reliability features including circuit breaker pattern for failure protection, exponential backoff for reconnection, and proper context propagation throughout all operations. It serves as the foundation for all NATS communication in the StreamKit framework.

Core Features

Circuit Breaker Pattern: Prevents cascading failures by failing fast after a threshold of consecutive failures (default: 5). The circuit opens to prevent further attempts, then gradually tests the connection with exponential backoff.

Connection Lifecycle Management: Handles connection states automatically through the lifecycle: Disconnected → Connecting → Connected → Reconnecting → Connected. The client manages all transitions with configurable callbacks for state changes.

JetStream Support: Full support for JetStream streams, consumers, and Key-Value stores with proper error handling and circuit breaker integration.

KVStore Abstraction: High-level abstraction over NATS KV providing automatic CAS (Compare-And-Swap) retry logic, JSON helpers, and consistent error handling for configuration management scenarios.

Basic Usage

Creating and connecting to NATS:

client, err := natsclient.NewClient("nats://localhost:4222")
if err != nil {
    return err
}

ctx := context.Background()
err = client.Connect(ctx)
if err != nil {
    return err
}
defer client.Close(ctx)

// Publish a message
err = client.Publish(ctx, "subject.name", []byte("message data"))

// Subscribe to messages (receives full *nats.Msg for access to Subject, Data, Headers)
err = client.Subscribe(ctx, "subject.*", func(msgCtx context.Context, msg *nats.Msg) {
    // Handle message with context (30s timeout per message)
    // For wildcard subscriptions, msg.Subject contains the actual subject
    fmt.Printf("Received on %s: %s\n", msg.Subject, string(msg.Data))
})

Advanced Configuration

Creating client with options:

client, err := natsclient.NewClient("nats://localhost:4222",
    natsclient.WithMaxReconnects(-1),  // Infinite reconnects
    natsclient.WithReconnectWait(2*time.Second),
    natsclient.WithCircuitBreakerThreshold(10),
    natsclient.WithDisconnectCallback(func(err error) {
        log.Printf("Disconnected: %v", err)
    }),
    natsclient.WithReconnectCallback(func() {
        log.Println("Reconnected successfully")
    }),
)

JetStream Operations

Working with JetStream streams and consumers:

// Create a stream
stream, err := client.CreateStream(ctx, jetstream.StreamConfig{
    Name:     "EVENTS",
    Subjects: []string{"events.>"},
})

// Publish to stream
err = client.PublishToStream(ctx, "events.user.created", []byte(`{"user_id": "123"}`))

// Consume from stream (receives full jetstream.Msg for access to Subject, Data, Headers)
// Handler is responsible for calling msg.Ack() after processing
err = client.ConsumeStream(ctx, "EVENTS", "events.>", func(msg jetstream.Msg) {
    // Process event - msg.Subject() contains actual subject for wildcard filters
    processEvent(msg.Subject(), msg.Data())
    msg.Ack()
})

Key-Value Store

Using KVStore for configuration management with atomic updates:

// Create or get KV bucket
bucket, err := client.CreateKeyValueBucket(ctx, jetstream.KeyValueConfig{
    Bucket:   "config",
    History:  5,
    Replicas: 3,
})

// Create KVStore wrapper
kvStore := client.NewKVStore(bucket)

// Atomic JSON update with automatic CAS retry
err = kvStore.UpdateJSON(ctx, "service.config", func(config map[string]any) error {
    // This function may be called multiple times on conflict
    config["enabled"] = true
    config["workers"] = 10
    return nil
})

// Get JSON value
var config map[string]any
err = kvStore.GetJSON(ctx, "service.config", &config)

Circuit Breaker Pattern

The circuit breaker protects against cascading failures:

// Circuit states:
// - Closed: Normal operation, requests pass through
// - Open: Failures exceeded threshold, failing fast
// - Half-Open: Testing if system recovered

err := client.Connect(ctx)
if errors.Is(err, natsclient.ErrCircuitOpen) {
    // Circuit is open, wait for it to test recovery
    log.Println("Circuit breaker is open, backing off...")
    time.Sleep(client.Backoff())
    // Retry later
}

Circuit breaker configuration:

client, err := natsclient.NewClient(url,
    natsclient.WithCircuitBreakerThreshold(5),  // Open after 5 failures
    natsclient.WithMaxBackoff(time.Minute),     // Max backoff duration
)

Connection Status and Health

Monitoring connection health:

// Check current status
status := client.Status()
switch status {
case natsclient.StatusConnected:
    // Healthy and ready
case natsclient.StatusReconnecting:
    // Temporarily disconnected, reconnecting
case natsclient.StatusCircuitOpen:
    // Circuit breaker is open
case natsclient.StatusDisconnected:
    // Not connected
}

// Get detailed status
statusInfo := client.GetStatus()
log.Printf("Status: %v, Failures: %d, RTT: %v",
    statusInfo.Status,
    statusInfo.FailureCount,
    statusInfo.RTT)

// Wait for connection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := client.WaitForConnection(ctx)

Health monitoring with callbacks:

client, err := natsclient.NewClient(url,
    natsclient.WithHealthCheck(10*time.Second),
    natsclient.WithHealthChangeCallback(func(healthy bool) {
        if healthy {
            log.Println("Connection restored")
        } else {
            log.Println("Connection lost")
        }
    }),
)

Error Handling

The package defines specific error types for different failure scenarios:

var (
    ErrCircuitOpen        = errors.New("circuit breaker is open")
    ErrNotConnected       = errors.New("not connected to NATS")
    ErrConnectionTimeout  = errors.New("connection timeout")
)

Error detection patterns:

err := client.Publish(ctx, "subject", data)
if err != nil {
    // Check for circuit breaker
    if errors.Is(err, natsclient.ErrCircuitOpen) {
        // Back off and retry later
        return
    }

    // Check for connection issues
    if errors.Is(err, natsclient.ErrNotConnected) {
        // Trigger reconnection
        return
    }

    // Other error
    log.Printf("Publish failed: %v", err)
}

KV-specific error handling:

err := kvStore.UpdateJSON(ctx, key, updateFn)
if err != nil {
    // Check for key not found
    if natsclient.IsKVNotFoundError(err) {
        // Key doesn't exist, create it
    }

    // Check for conflict (CAS failed after retries)
    if natsclient.IsKVConflictError(err) {
        // Too many concurrent updates
    }
}

Connection Options

Available configuration options:

WithMaxReconnects(n int)              // Maximum reconnection attempts (-1 = infinite)
WithReconnectWait(d time.Duration)    // Wait between reconnection attempts
WithTimeout(d time.Duration)          // Connection timeout
WithDrainTimeout(d time.Duration)     // Timeout for graceful shutdown
WithPingInterval(d time.Duration)     // Health check interval
WithCircuitBreakerThreshold(n int)    // Failures before circuit opens
WithMaxBackoff(d time.Duration)       // Maximum backoff duration
WithLogger(logger Logger)             // Custom logger for debug output
WithHealthCheck(d time.Duration)      // Enable health monitoring
WithClientName(name string)           // Client identification

Authentication and Security

Username/password authentication:

client, err := natsclient.NewClient(url,
    natsclient.WithCredentials("username", "password"),
)

Token authentication:

client, err := natsclient.NewClient(url,
    natsclient.WithToken("auth-token"),
)

TLS configuration:

client, err := natsclient.NewClient(url,
    natsclient.WithTLS(true),
    natsclient.WithTLSCerts("client.crt", "client.key"),
    natsclient.WithTLSCA("ca.crt"),
)

Note: Credentials are cleared from memory when the client is closed.

Testing

The package provides test utilities for integration testing:

func TestMyService(t *testing.T) {
    // Create test client with real NATS via testcontainers
    testClient := natsclient.NewTestClient(t,
        natsclient.WithJetStream(),
        natsclient.WithKV(),
    )
    defer testClient.Close()

    client := testClient.Client

    // Test with real NATS server
    err := client.Publish(ctx, "test.subject", []byte("test data"))
    assert.NoError(t, err)
}

Testing patterns:

  • Uses real NATS server via testcontainers (no mocks)
  • Tests actual behavior including connection lifecycle
  • Thread-safe testing with proper synchronization
  • Comprehensive circuit breaker scenario testing

Thread Safety

The Client type is thread-safe and can be used concurrently from multiple goroutines:

  • All public methods are safe for concurrent use
  • Connection state is managed with atomic operations and mutexes
  • Subscriptions and consumers can be created from any goroutine
  • Close() can only be called once (subsequent calls are no-ops)

Performance Considerations

Concurrency: Thread-safe for concurrent use from multiple goroutines. No artificial concurrency limits - scales with available system resources.

Memory: Memory usage scales with number of active subscriptions and consumers. Each subscription maintains its own message buffer. Health monitoring adds minimal overhead (one goroutine with configurable interval).

Throughput: Limited primarily by network latency and NATS server performance. Circuit breaker adds negligible overhead in normal operation and fails fast when open.

Connection Lifecycle: Reconnection uses exponential backoff to avoid overwhelming the server during failures. Maximum backoff is configurable (default: 1 minute).

Distributed Tracing

The package provides W3C-compliant trace context propagation for distributed tracing. All publish and request methods automatically generate trace context if none exists, ensuring complete observability across the message flow.

Trace headers are injected into all outbound NATS messages:

  • traceparent: W3C Trace Context header (00-{trace_id}-{span_id}-{flags})
  • X-Trace-ID: Simplified trace ID header
  • X-Span-ID: Current span identifier
  • X-Parent-Span-ID: Parent span for nested operations

Using trace context:

// Traces are auto-generated if not present
err := client.Publish(ctx, "subject", data)

// Or provide explicit trace context
tc := natsclient.NewTraceContext()
ctx = natsclient.ContextWithTrace(ctx, tc)
err := client.Publish(ctx, "subject", data)

// Extract trace from received message
tc := natsclient.ExtractTrace(msg)
if tc != nil {
    log.Printf("Trace ID: %s, Span ID: %s", tc.TraceID, tc.SpanID)
}

Creating child spans for nested operations:

parentTC, _ := natsclient.TraceContextFromContext(ctx)
childTC := parentTC.NewSpan()
childCtx := natsclient.ContextWithTrace(ctx, childTC)
err := client.Request(childCtx, "service.action", data, timeout)

Architecture Integration

The natsclient package integrates with StreamKit components:

  • service: Services use natsclient for pub/sub communication
  • config: Manager uses KV store for runtime configuration
  • component: Components receive natsclient for messaging
  • engine: Flow engine coordinates component communication via NATS

Data flow:

Application → Client → Circuit Breaker → NATS Connection → NATS Server

Design Decisions

Circuit Breaker over Simple Retry: Chose circuit breaker pattern to prevent cascade failures in distributed systems. After threshold failures, the circuit opens to fail fast rather than continuously retry, giving the system time to recover.

Context-First API: Every I/O operation requires context.Context as first parameter for proper cancellation and timeout support, essential for production systems.

KVStore Abstraction: Created high-level KV abstraction with built-in CAS retry logic to eliminate code duplication across services. Centralizes revision conflict handling and retry logic.

Testcontainers over Mocks: Integration tests use real NATS server via testcontainers to catch actual integration issues. Mock-based testing can miss edge cases in the NATS protocol implementation.

Examples

Resilient publisher with automatic reconnection:

package main

import (
    "context"
    "log"
    "time"

    "github.com/c360studio/semstreams/natsclient"
)

func main() {
    client, err := natsclient.NewClient("nats://localhost:4222",
        natsclient.WithMaxReconnects(-1),
        natsclient.WithLogger(log.Default()),
    )
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    if err := client.Connect(ctx); err != nil {
        log.Fatal(err)
    }
    defer client.Close(ctx)

    // Publish with automatic reconnection handling
    for {
        err := client.Publish(ctx, "telemetry.data", []byte("sensor reading"))
        if err != nil {
            if errors.Is(err, natsclient.ErrCircuitOpen) {
                log.Println("Circuit open, waiting...")
                time.Sleep(5 * time.Second)
                continue
            }
            log.Printf("Publish error: %v", err)
        }
        time.Sleep(time.Second)
    }
}

Configuration management with atomic updates:

// Manage service configuration with optimistic locking
bucket, _ := client.CreateKeyValueBucket(ctx, jetstream.KeyValueConfig{
    Bucket:   "config",
    History:  5,
    Replicas: 3,
})

kvStore := client.NewKVStore(bucket)

// Atomic configuration update with automatic retry
err = kvStore.UpdateJSON(ctx, "services.processor", func(config map[string]any) error {
    // This function may be called multiple times on conflict
    config["workers"] = 10
    config["timeout"] = "30s"
    return nil
})

For more examples and detailed usage, see the README.md in this directory.

Package natsclient provides request/reply pattern support for NATS.

Package natsclient provides JetStream stream management utilities.

Package natsclient provides testcontainers-based NATS infrastructure for testing.

Package natsclient provides typed subject patterns for compile-time type safety.

Index

Constants

View Source
const (
	// TraceparentHeader is the W3C standard trace context header
	// Format: 00-{trace_id}-{span_id}-{flags}
	// trace_id: 32 hex chars (16 bytes), span_id: 16 hex chars (8 bytes)
	TraceparentHeader = "traceparent"

	// TraceIDHeader is a simplified header for internal use
	TraceIDHeader = "X-Trace-ID"
	// SpanIDHeader is a simplified header for internal use
	SpanIDHeader = "X-Span-ID"
	// ParentSpanHeader is a simplified header for internal use
	ParentSpanHeader = "X-Parent-Span-ID"
)

W3C Trace Context headers

View Source
const DefaultRequestTimeout = 5 * time.Second

DefaultRequestTimeout is the default timeout for request/reply operations.

Variables

View Source
var (
	ErrNotConnected      = stderrors.New("not connected to NATS")
	ErrCircuitOpen       = stderrors.New("circuit breaker is open")
	ErrConnectionTimeout = stderrors.New("connection timeout")
)

Error messages

View Source
var (
	ErrKVKeyNotFound        = errors.New("kv: key not found")
	ErrKVKeyExists          = errors.New("kv: key already exists")
	ErrKVRevisionMismatch   = errors.New("kv: revision mismatch (concurrent update)")
	ErrKVMaxRetriesExceeded = errors.New("kv: max retries exceeded")
)

Well-known errors matching Graph processor patterns

Functions

func ConsumeWithHeartbeat

func ConsumeWithHeartbeat(
	ctx context.Context,
	msg jetstream.Msg,
	heartbeatInterval time.Duration,
	work func(context.Context) error,
) error

ConsumeWithHeartbeat runs work in a goroutine while periodically calling msg.InProgress() to reset the AckWait clock. This allows short AckWait values for failure detection while supporting arbitrarily long processing.

Ack/Nak ownership: this function calls Ack, NakWithDelay, or Nak on the message. The caller must NOT call these methods when using this helper.

On work success: msg.Ack() On work error: msg.NakWithDelay(30s) to allow breathing room before retry On context cancellation: msg.NakWithDelay(5s) for graceful shutdown On InProgress failure: returns error (message will be redelivered by server)

func ContextWithTrace

func ContextWithTrace(ctx context.Context, tc *TraceContext) context.Context

ContextWithTrace returns a context with trace information

func DetachContextWithTrace

func DetachContextWithTrace(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc)

DetachContextWithTrace creates a new context that preserves trace context but resets deadline/cancellation. This is useful for publishing error responses when the original context has expired but trace continuity is still needed.

func FilteredKeys

func FilteredKeys(ctx context.Context, kv jetstream.KeyValue, pattern string) ([]string, error)

FilteredKeys returns keys matching a NATS wildcard pattern from a raw jetstream.KeyValue bucket. Use this for components that hold jetstream.KeyValue directly instead of *KVStore. The pattern should be a valid NATS subject filter (e.g., "0.>" for level-0 community keys). Returns nil, nil when no keys match.

func InjectTrace

func InjectTrace(ctx context.Context, msg *nats.Msg)

InjectTrace adds trace headers to a NATS message from context

func IsKVConflictError

func IsKVConflictError(err error) bool

IsKVConflictError checks if error indicates a conflict (key exists or wrong revision)

func IsKVNotFoundError

func IsKVNotFoundError(err error) bool

IsKVNotFoundError checks if error indicates key not found

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages NATS connections with circuit breaker pattern

func NewClient

func NewClient(urls string, opts ...ClientOption) (*Client, error)

NewClient creates a new NATS client with optional configuration. The urls parameter accepts comma-separated NATS server URLs for clustering support (e.g., "nats://server1:4222,nats://server2:4222").

func (*Client) Backoff

func (m *Client) Backoff() time.Duration

Backoff returns the current backoff duration

func (*Client) Close

func (m *Client) Close(ctx context.Context) error

Close closes the NATS connection

func (*Client) Connect

func (m *Client) Connect(ctx context.Context) error

Connect establishes connection to NATS server

func (*Client) ConnectionOptions

func (m *Client) ConnectionOptions() []nats.Option

ConnectionOptions returns the NATS connection options

func (*Client) ConsumeStream

func (m *Client) ConsumeStream(ctx context.Context, streamName, subject string, handler func(jetstream.Msg)) error

ConsumeStream creates a consumer for a stream. Handler receives the full jetstream.Msg to access Subject, Data, Headers, etc. This is essential for wildcard subscriptions where the actual subject differs from the pattern. The handler is responsible for calling msg.Ack() after processing.

func (*Client) ConsumeStreamWithConfig

func (c *Client) ConsumeStreamWithConfig(
	ctx context.Context,
	cfg StreamConsumerConfig,
	handler func(ctx context.Context, msg jetstream.Msg),
) error

ConsumeStreamWithConfig creates a JetStream consumer with full configuration. The handler receives the raw jetstream.Msg which includes Ack(), Nak(), and Term() methods. Handler MUST call one of these methods to acknowledge the message.

func (*Client) CreateKeyValueBucket

func (m *Client) CreateKeyValueBucket(ctx context.Context, cfg jetstream.KeyValueConfig) (jetstream.KeyValue, error)

CreateKeyValueBucket creates or gets a KV bucket with configuration

func (*Client) CreateStream

func (m *Client) CreateStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error)

CreateStream creates a JetStream stream

func (*Client) DeleteKeyValueBucket

func (m *Client) DeleteKeyValueBucket(ctx context.Context, name string) error

DeleteKeyValueBucket deletes a KV bucket

func (*Client) EnsureStream

func (c *Client) EnsureStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error)

EnsureStream creates a stream if it doesn't exist, or returns the existing one.

func (*Client) Failures

func (m *Client) Failures() int32

Failures returns the current failure count

func (*Client) GetConnection

func (m *Client) GetConnection() *nats.Conn

GetConnection returns the current NATS connection

func (*Client) GetKeyValueBucket

func (m *Client) GetKeyValueBucket(ctx context.Context, name string) (jetstream.KeyValue, error)

GetKeyValueBucket gets an existing KV bucket

func (*Client) GetStatus

func (m *Client) GetStatus() *Status

GetStatus returns current status information

func (*Client) GetStream

func (m *Client) GetStream(ctx context.Context, name string) (jetstream.Stream, error)

GetStream gets an existing JetStream stream

func (*Client) IsHealthy

func (m *Client) IsHealthy() bool

IsHealthy returns true if the connection is healthy

func (*Client) JetStream

func (m *Client) JetStream() (jetstream.JetStream, error)

JetStream returns the JetStream context

func (*Client) ListKeyValueBuckets

func (m *Client) ListKeyValueBuckets(ctx context.Context) ([]string, error)

ListKeyValueBuckets lists all KV buckets

func (*Client) MaxReconnects

func (m *Client) MaxReconnects() int

MaxReconnects returns the maximum number of reconnection attempts

func (*Client) NewKVStore

func (c *Client) NewKVStore(bucket jetstream.KeyValue, opts ...func(*KVOptions)) *KVStore

NewKVStore creates a new KV store with the given bucket

func (*Client) OnHealthChange

func (m *Client) OnHealthChange(fn func(bool))

OnHealthChange sets a callback for health status changes

func (*Client) PingInterval

func (m *Client) PingInterval() time.Duration

PingInterval returns the interval for health checks

func (*Client) Publish

func (m *Client) Publish(ctx context.Context, subject string, data []byte) error

Publish publishes a message to a NATS subject

func (*Client) PublishToStream

func (m *Client) PublishToStream(ctx context.Context, subject string, data []byte) error

PublishToStream publishes to a JetStream stream with automatic trace context propagation. If no trace context exists in ctx, one is auto-generated for distributed tracing.

func (*Client) PublishToStreamWithAck

func (c *Client) PublishToStreamWithAck(
	ctx context.Context,
	subject string,
	data []byte,
) (*jetstream.PubAck, error)

PublishToStreamWithAck publishes a message to a JetStream subject with acknowledgment. If AutoCreate is true and the stream doesn't exist, it will be created. Trace context is auto-generated if not present, and propagated via NATS message headers.

func (*Client) RTT

func (m *Client) RTT() (time.Duration, error)

RTT returns the round-trip time to the NATS server

func (*Client) ReconnectWait

func (m *Client) ReconnectWait() time.Duration

ReconnectWait returns the wait duration between reconnection attempts

func (*Client) Reply

func (c *Client) Reply(ctx context.Context, replyTo string, data []byte) error

Reply sends a reply to a request message. This is typically used by service handlers to respond to requests.

func (*Client) ReplyWithHeaders

func (c *Client) ReplyWithHeaders(ctx context.Context, replyTo string, data []byte, headers map[string]string) error

ReplyWithHeaders sends a reply with custom headers.

func (*Client) Request

func (c *Client) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)

Request performs a synchronous request/reply operation. It publishes a message to the subject and waits for a response. The timeout parameter controls how long to wait for a response. If timeout is 0, DefaultRequestTimeout is used.

func (*Client) RequestWithHeaders

func (c *Client) RequestWithHeaders(
	ctx context.Context,
	subject string,
	data []byte,
	headers map[string]string,
	timeout time.Duration,
) (*nats.Msg, error)

RequestWithHeaders performs a request/reply operation with custom headers. Headers are passed as a map and converted to NATS message headers. Returns the full NATS message to allow access to response headers.

func (*Client) RequestWithRetry

func (c *Client) RequestWithRetry(
	ctx context.Context,
	subject string,
	data []byte,
	timeout time.Duration,
	retry RetryConfig,
) ([]byte, error)

RequestWithRetry performs a request with configurable retry on failure. This is useful for handling transient "no responders" errors in NATS where the subscriber may not be ready when the request arrives.

func (*Client) SetConnection

func (m *Client) SetConnection(conn *nats.Conn)

SetConnection sets the NATS connection (for testing)

func (*Client) Status

func (m *Client) Status() ConnectionStatus

Status returns the current connection status

func (*Client) StopAllConsumers

func (c *Client) StopAllConsumers()

StopAllConsumers stops all active consumers.

func (*Client) StopAndDeleteConsumer

func (c *Client) StopAndDeleteConsumer(ctx context.Context, streamName, consumerName string) error

StopAndDeleteConsumer stops a specific consumer and deletes the durable consumer from the server. WARNING: This permanently removes the consumer's position. Use only for test cleanup or when you intentionally want to reset consumer state.

func (*Client) StopConsumer

func (c *Client) StopConsumer(streamName, consumerName string)

StopConsumer stops a specific consumer by stream and consumer name. This stops the local consume context but does not delete the durable consumer from the server.

func (*Client) Subscribe

func (m *Client) Subscribe(ctx context.Context, subject string, handler func(context.Context, *nats.Msg)) (*Subscription, error)

Subscribe subscribes to a NATS subject with context propagation. Each message handler receives the full *nats.Msg to access Subject, Data, Headers, etc. This is essential for wildcard subscriptions where the actual subject differs from the pattern. The context is derived from the parent context with a 30-second timeout for message processing. Returns a Subscription handle that can be used to unsubscribe.

func (*Client) SubscribeForRequests

func (c *Client) SubscribeForRequests(
	ctx context.Context,
	subject string,
	handler func(ctx context.Context, data []byte) ([]byte, error),
) (*Subscription, error)

SubscribeForRequests subscribes to a subject and handles request/reply patterns. The handler receives the message data and reply subject, and should return the response data or an error. Returns the Subscription so the caller can unsubscribe when done. This is a convenience method for implementing request/reply services.

func (*Client) URLs

func (m *Client) URLs() string

URLs returns the NATS server URLs (comma-separated for clustering)

func (*Client) WaitForConnection

func (m *Client) WaitForConnection(ctx context.Context) error

WaitForConnection waits for the connection to be established

func (*Client) WithHealthCheck

func (m *Client) WithHealthCheck(interval time.Duration)

WithHealthCheck enables health monitoring with a specified interval

type ClientOption

type ClientOption func(*Client) error

ClientOption is a functional option for configuring the Client

func WithCircuitBreakerThreshold

func WithCircuitBreakerThreshold(threshold int32) ClientOption

WithCircuitBreakerThreshold sets the number of failures before opening circuit

func WithCompression

func WithCompression(enabled bool) ClientOption

WithCompression enables message compression

func WithConnectionLostCallback

func WithConnectionLostCallback(fn func(error)) ClientOption

WithConnectionLostCallback sets a callback for when connection is completely lost

func WithCredentials

func WithCredentials(username, password string) ClientOption

WithCredentials sets username and password for authentication

func WithDisconnectCallback

func WithDisconnectCallback(fn func(error)) ClientOption

WithDisconnectCallback sets a callback for disconnection events This is in addition to NATS's built-in disconnect handler

func WithDrainTimeout

func WithDrainTimeout(d time.Duration) ClientOption

WithDrainTimeout sets the timeout for draining on disconnect

func WithHealthChangeCallback

func WithHealthChangeCallback(fn func(healthy bool)) ClientOption

WithHealthChangeCallback sets a callback for health status changes

func WithHealthInterval

func WithHealthInterval(d time.Duration) ClientOption

WithHealthInterval sets the interval for health monitoring

func WithLogger

func WithLogger(logger Logger) ClientOption

WithLogger sets a custom logger for the client

func WithMaxBackoff

func WithMaxBackoff(d time.Duration) ClientOption

WithMaxBackoff sets the maximum backoff duration for circuit breaker

func WithMaxReconnects

func WithMaxReconnects(max int) ClientOption

WithMaxReconnects sets the maximum number of reconnection attempts (-1 for infinite)

func WithMetrics

func WithMetrics(registry *metric.MetricsRegistry) ClientOption

WithMetrics enables JetStream metrics collection using the provided registry. Metrics will track streams and consumers created through this client.

func WithName

func WithName(name string) ClientOption

WithName sets the client name for identification

func WithPingInterval

func WithPingInterval(d time.Duration) ClientOption

WithPingInterval sets the ping interval for connection health checks

func WithReconnectCallback

func WithReconnectCallback(fn func()) ClientOption

WithReconnectCallback sets a callback for reconnection events This is in addition to NATS's built-in reconnect handler

func WithReconnectWait

func WithReconnectWait(d time.Duration) ClientOption

WithReconnectWait sets the wait time between reconnection attempts

func WithTLS

func WithTLS(certFile, keyFile, caFile string) ClientOption

WithTLS enables TLS with optional certificate paths

func WithTimeout

func WithTimeout(d time.Duration) ClientOption

WithTimeout sets the connection timeout

func WithToken

func WithToken(token string) ClientOption

WithToken sets a token for authentication

type Codec

type Codec[T any] interface {
	Marshal(T) ([]byte, error)
	Unmarshal([]byte, *T) error
}

Codec defines the serialization interface for typed subjects. Implementations handle marshaling and unmarshaling of typed payloads.

type ConnectionStatus

type ConnectionStatus int

ConnectionStatus represents the state of the NATS connection

const (
	StatusDisconnected ConnectionStatus = iota
	StatusConnecting
	StatusConnected
	StatusReconnecting
	StatusCircuitOpen
)

Possible connection statuses

func (ConnectionStatus) String

func (s ConnectionStatus) String() string

String returns the string representation of ConnectionStatus

type JSONCodec

type JSONCodec[T any] struct{}

JSONCodec provides JSON serialization for typed subjects. This is the default codec for most use cases.

func (JSONCodec[T]) Marshal

func (c JSONCodec[T]) Marshal(v T) ([]byte, error)

Marshal serializes a value to JSON bytes.

func (JSONCodec[T]) Unmarshal

func (c JSONCodec[T]) Unmarshal(data []byte, v *T) error

Unmarshal deserializes JSON bytes into a value.

type KVEntry

type KVEntry struct {
	Key      string
	Value    []byte
	Revision uint64
}

KVEntry wraps a KV entry with its revision for CAS operations

type KVOptions

type KVOptions struct {
	MaxRetries            int           // Maximum CAS retry attempts
	RetryDelay            time.Duration // Initial delay between retries
	Timeout               time.Duration // Operation timeout
	MaxValueSize          int           // Maximum size for values (default: 1MB)
	UseExponentialBackoff bool          // Enable exponential backoff with jitter
	MaxRetryDelay         time.Duration // Maximum delay between retries
}

KVOptions configures KV operations behavior

func DefaultKVOptions

func DefaultKVOptions() KVOptions

DefaultKVOptions returns sensible defaults matching Graph processor

type KVStore

type KVStore struct {
	// contains filtered or unexported fields
}

KVStore provides high-level KV operations with built-in CAS support

func (*KVStore) Create

func (kv *KVStore) Create(ctx context.Context, key string, value []byte) (uint64, error)

Create only creates if key doesn't exist (returns error if exists)

func (*KVStore) Delete

func (kv *KVStore) Delete(ctx context.Context, key string) error

Delete removes a key from the bucket

func (*KVStore) Get

func (kv *KVStore) Get(ctx context.Context, key string) (*KVEntry, error)

Get retrieves a value with its revision for CAS operations

func (*KVStore) Keys

func (kv *KVStore) Keys(ctx context.Context) ([]string, error)

Keys returns all keys in the bucket

func (*KVStore) KeysByPrefix

func (kv *KVStore) KeysByPrefix(ctx context.Context, prefix string) ([]string, error)

KeysByPrefix returns all keys matching the given prefix. Uses JetStream KV's native key filtering for efficient server-side filtering. The prefix is automatically converted to a NATS wildcard pattern (prefix + ">").

func (*KVStore) Put

func (kv *KVStore) Put(ctx context.Context, key string, value []byte) (uint64, error)

Put creates or updates a key without revision check (last writer wins)

func (*KVStore) Update

func (kv *KVStore) Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error)

Update performs CAS update with explicit revision

func (*KVStore) UpdateJSON

func (kv *KVStore) UpdateJSON(ctx context.Context, key string,
	updateFn func(current map[string]any) error) error

UpdateJSON performs CAS update on JSON data with automatic retry

func (*KVStore) UpdateWithRetry

func (kv *KVStore) UpdateWithRetry(ctx context.Context, key string,
	updateFn func(current []byte) ([]byte, error)) error

UpdateWithRetry performs CAS update with automatic retry on conflicts If the key doesn't exist, it creates it

func (*KVStore) Watch

func (kv *KVStore) Watch(ctx context.Context, pattern string) (jetstream.KeyWatcher, error)

Watch creates a watcher for key changes Note: Watch does not apply timeout as it creates a long-lived watcher

type Logger

type Logger interface {
	Printf(format string, v ...any)
	Errorf(format string, v ...any)
	Debugf(format string, v ...any)
}

Logger interface for injecting custom loggers

type RetryConfig

type RetryConfig struct {
	MaxRetries        int           // Number of retry attempts (default: 3)
	InitialBackoff    time.Duration // First retry delay (default: 100ms)
	MaxBackoff        time.Duration // Cap on backoff growth (default: 2s)
	BackoffMultiplier float64       // Exponential growth factor (default: 2.0)
}

RetryConfig configures retry behavior for requests.

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns sensible defaults for retry configuration.

type Status

type Status struct {
	Status          ConnectionStatus
	FailureCount    int32
	LastFailureTime time.Time
	Reconnects      int32
	RTT             time.Duration
}

Status holds runtime status information for the NATS manager

type StreamAutoCreateConfig

type StreamAutoCreateConfig struct {
	// Subjects for the stream. If empty, derived from FilterSubject.
	Subjects []string

	// Storage type: "file" (default) or "memory"
	Storage string

	// Retention policy: "limits" (default), "interest", "work_queue"
	Retention string

	// MaxAge is the maximum age of messages (default 7 days).
	MaxAge time.Duration

	// MaxBytes is the maximum total size (0 = unlimited).
	MaxBytes int64

	// MaxMsgs is the maximum number of messages (0 = unlimited).
	MaxMsgs int64

	// Replicas is the number of replicas (default 1).
	Replicas int
}

StreamAutoCreateConfig configures automatic stream creation.

func DefaultStreamConfig

func DefaultStreamConfig() *StreamAutoCreateConfig

DefaultStreamConfig returns default auto-create configuration.

type StreamConsumerConfig

type StreamConsumerConfig struct {
	// StreamName is the name of the stream to consume from (required).
	StreamName string

	// ConsumerName is the durable consumer name. If empty, creates an ephemeral consumer.
	ConsumerName string

	// FilterSubject filters messages within the stream. If empty, receives all messages.
	FilterSubject string

	// DeliverPolicy determines where to start delivering messages.
	// Options: "all" (default), "last", "new", "by_start_time"
	DeliverPolicy string

	// AckPolicy determines how messages are acknowledged.
	// Options: "explicit" (default), "none", "all"
	AckPolicy string

	// MaxDeliver is the maximum number of delivery attempts (0 = unlimited).
	MaxDeliver int

	// AckWait is how long to wait for an ack before redelivery.
	// Default is 30 seconds.
	AckWait time.Duration

	// MaxAckPending limits the number of outstanding (unacknowledged) messages
	// that can be delivered to a consumer. This provides backpressure to prevent
	// overwhelming the consumer. 0 means unlimited (default NATS behavior).
	MaxAckPending int

	// AutoCreate enables automatic stream creation if it doesn't exist.
	AutoCreate bool

	// AutoCreateConfig is used when auto-creating a stream.
	// If nil, defaults are used based on FilterSubject.
	AutoCreateConfig *StreamAutoCreateConfig

	// BackOff overrides AckWait per retry attempt. Index 0 is the first retry
	// wait, index 1 is the second, and so on. The last value is used for all
	// subsequent retries. If empty, AckWait applies uniformly.
	BackOff []time.Duration

	// MessageTimeout is the context timeout for processing each message.
	// This timeout is passed to the handler and should accommodate the full
	// processing time including any downstream calls (e.g., LLM requests).
	// Default is 30 seconds if not specified.
	MessageTimeout time.Duration
}

StreamConsumerConfig configures a JetStream consumer.

type Subject

type Subject[T any] struct {
	// Pattern is the NATS subject pattern (may include wildcards for subscribe)
	Pattern string

	// Codec handles serialization/deserialization
	Codec Codec[T]
}

Subject represents a typed NATS subject with compile-time type safety. It binds a subject pattern to a specific payload type and codec.

Example:

var WorkflowStarted = Subject[WorkflowStartedEvent]{
    Pattern: "workflow.events.started",
    Codec:   JSONCodec[WorkflowStartedEvent]{},
}

// Type-safe publish
err := WorkflowStarted.Publish(ctx, client, event)

// Type-safe subscribe
sub, err := WorkflowStarted.Subscribe(ctx, client, func(ctx context.Context, event WorkflowStartedEvent) error {
    // event is already typed - no assertions needed
    return nil
})

func NewSubject

func NewSubject[T any](pattern string) Subject[T]

NewSubject creates a typed subject with a JSON codec (most common case).

func NewSubjectWithCodec

func NewSubjectWithCodec[T any](pattern string, codec Codec[T]) Subject[T]

NewSubjectWithCodec creates a typed subject with a custom codec.

func (Subject[T]) Publish

func (s Subject[T]) Publish(ctx context.Context, client *Client, payload T) error

Publish sends a typed payload to the subject. The payload is serialized using the subject's codec before publishing.

func (Subject[T]) PublishToStream

func (s Subject[T]) PublishToStream(ctx context.Context, client *Client, payload T) error

PublishToStream sends a typed payload to a JetStream subject. The payload is serialized using the subject's codec before publishing.

func (Subject[T]) Subscribe

func (s Subject[T]) Subscribe(ctx context.Context, client *Client, handler func(context.Context, T) error) (*Subscription, error)

Subscribe creates a subscription to the subject with type-safe message handling. The handler receives deserialized payloads directly.

func (Subject[T]) SubscribeWithMsg

func (s Subject[T]) SubscribeWithMsg(ctx context.Context, client *Client, handler func(context.Context, *nats.Msg, T) error) (*Subscription, error)

SubscribeWithMsg creates a subscription that provides both the typed payload and raw message. Use this when you need access to message metadata (subject, headers, etc.).

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription wraps a NATS subscription for lifecycle management

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Unsubscribe unsubscribes from the subject

type TemporalResolver

type TemporalResolver struct {
	// contains filtered or unexported fields
}

TemporalResolver provides efficient timestamp-based KV queries with caching

func NewTemporalResolver

func NewTemporalResolver(ctx context.Context, bucket jetstream.KeyValue) (*TemporalResolver, error)

NewTemporalResolver creates a resolver for timestamp-based queries The context is used for the cache background cleanup goroutine lifecycle

func NewTemporalResolverWithCache

func NewTemporalResolverWithCache(
	ctx context.Context,
	bucket jetstream.KeyValue,
	cacheTTL time.Duration,
) (*TemporalResolver, error)

NewTemporalResolverWithCache creates a resolver with custom cache TTL The context is used for the cache background cleanup goroutine lifecycle

func (*TemporalResolver) Close

func (tr *TemporalResolver) Close() error

Close shuts down the temporal resolver and its cache

func (*TemporalResolver) GetAtTimestamp

func (tr *TemporalResolver) GetAtTimestamp(
	ctx context.Context,
	key string,
	targetTime time.Time,
) (jetstream.KeyValueEntry, error)

GetAtTimestamp finds the entity state that was current at the given timestamp Uses binary search for O(log n) performance with caching

func (*TemporalResolver) GetInTimeRange

func (tr *TemporalResolver) GetInTimeRange(
	ctx context.Context,
	key string,
	startTime, endTime time.Time,
) ([]jetstream.KeyValueEntry, error)

GetInTimeRange finds all entity states within a time range Returns states ordered by timestamp

func (*TemporalResolver) GetRangeAtTimestamp

func (tr *TemporalResolver) GetRangeAtTimestamp(
	ctx context.Context,
	keys []string,
	targetTime time.Time,
) (map[string]jetstream.KeyValueEntry, error)

GetRangeAtTimestamp finds multiple entities at a specific timestamp Useful for reconstructing entire system state at time T

func (*TemporalResolver) GetRangeInTimeRange

func (tr *TemporalResolver) GetRangeInTimeRange(
	ctx context.Context,
	keys []string,
	startTime, endTime time.Time,
) (map[string][]jetstream.KeyValueEntry, error)

GetRangeInTimeRange finds multiple entities within a time range Returns a map of key -> entries within the range

func (*TemporalResolver) GetStats

func (tr *TemporalResolver) GetStats() *cache.Statistics

GetStats returns cache statistics for monitoring

type TestClient

type TestClient struct {
	Client       *Client // Drop-in replacement for existing natsclient.Client
	URL          string
	BucketPrefix string // Prefix applied to all KV bucket names for test isolation
	// contains filtered or unexported fields
}

TestClient provides testcontainers-based NATS for testing

func NewSharedTestClient

func NewSharedTestClient(opts ...TestOption) (*TestClient, error)

NewSharedTestClient creates a new NATS test container for use in TestMain Unlike NewTestClient, this doesn't require testing.T and returns errors

func NewTestClient

func NewTestClient(t testing.TB, opts ...TestOption) *TestClient

NewTestClient creates a new NATS test container Accepts testing.TB so it works with both *testing.T and *testing.B

func (*TestClient) CreateKVBucket

func (tc *TestClient) CreateKVBucket(ctx context.Context, name string) (jetstream.KeyValue, error)

CreateKVBucket is a helper for creating KV buckets during tests. The bucket prefix is automatically applied if configured.

func (*TestClient) CreateStream

func (tc *TestClient) CreateStream(ctx context.Context, name string, subjects []string) (jetstream.Stream, error)

CreateStream is a helper for creating JetStream streams during tests

func (*TestClient) GetKVBucket

func (tc *TestClient) GetKVBucket(ctx context.Context, name string) (jetstream.KeyValue, error)

GetKVBucket is a helper for getting existing KV buckets during tests. The bucket prefix is automatically applied if configured.

func (*TestClient) GetNativeConnection

func (tc *TestClient) GetNativeConnection() *gonats.Conn

GetNativeConnection returns the underlying NATS connection for direct access

func (*TestClient) GetStream

func (tc *TestClient) GetStream(ctx context.Context, name string) (jetstream.Stream, error)

GetStream is a helper for getting existing JetStream streams during tests

func (*TestClient) IsReady

func (tc *TestClient) IsReady() bool

IsReady checks if the NATS connection is ready for use

func (*TestClient) PrefixedBucketName

func (tc *TestClient) PrefixedBucketName(name string) string

PrefixedBucketName returns the full bucket name with prefix applied. Use this when you need to pass bucket names to components that create their own buckets.

func (*TestClient) Terminate

func (tc *TestClient) Terminate() error

Terminate manually terminates the container and client (usually handled by t.Cleanup)

type TestOption

type TestOption func(*testConfig)

TestOption for configuring test client

func WithBucketPrefix

func WithBucketPrefix(prefix string) TestOption

WithBucketPrefix sets a prefix for all KV bucket names to enable test isolation. When tests run in parallel, each test can use a unique prefix (e.g., test name) to avoid bucket name collisions.

func WithE2EDefaults

func WithE2EDefaults() TestOption

WithE2EDefaults configures NATS with settings good for end-to-end tests

func WithFastStartup

func WithFastStartup() TestOption

WithFastStartup configures NATS for fastest possible startup (good for unit tests)

func WithFileStorage

func WithFileStorage() TestOption

WithFileStorage enables file-backed JetStream storage instead of the default memory-only store. Use this when tests create many KV buckets or write large volumes of data that would exceed the 256MB default memory limit.

func WithIntegrationDefaults

func WithIntegrationDefaults() TestOption

WithIntegrationDefaults configures NATS with settings good for integration tests

func WithJetStream

func WithJetStream() TestOption

WithJetStream enables JetStream for tests that need it

func WithKV

func WithKV() TestOption

WithKV enables KV store for tests that need it

func WithKVBuckets

func WithKVBuckets(buckets ...string) TestOption

WithKVBuckets pre-creates specific KV buckets

func WithMinimalFeatures

func WithMinimalFeatures() TestOption

WithMinimalFeatures configures NATS with only basic pub/sub (fastest startup)

func WithNATSVersion

func WithNATSVersion(version string) TestOption

WithNATSVersion specifies a specific NATS server version to use

func WithProductionLike

func WithProductionLike() TestOption

WithProductionLike configures NATS with settings that mimic production

func WithStartTimeout

func WithStartTimeout(timeout time.Duration) TestOption

WithStartTimeout sets the container startup timeout

func WithStreams

func WithStreams(streams ...TestStreamConfig) TestOption

WithStreams pre-creates JetStream streams for testing

func WithTestTimeout

func WithTestTimeout(timeout time.Duration) TestOption

WithTestTimeout sets the connection timeout for test client

type TestStreamConfig

type TestStreamConfig struct {
	Name     string
	Subjects []string
}

TestStreamConfig defines a stream to pre-create for testing

type TraceContext

type TraceContext struct {
	TraceID      string // 32 hex chars (16 bytes)
	SpanID       string // 16 hex chars (8 bytes)
	ParentSpanID string // 16 hex chars, empty for root span
	Sampled      bool
}

TraceContext holds trace information propagated through the system

func ExtractTrace

func ExtractTrace(msg *nats.Msg) *TraceContext

ExtractTrace reads trace headers from a NATS message

func ExtractTraceFromJetStream

func ExtractTraceFromJetStream(headers nats.Header) *TraceContext

ExtractTraceFromJetStream reads trace headers from a JetStream message

func NewTraceContext

func NewTraceContext() *TraceContext

NewTraceContext creates a new trace context with generated IDs

func ParseTraceparent

func ParseTraceparent(header string) (*TraceContext, error)

ParseTraceparent parses W3C traceparent header Format: {version}-{trace_id}-{span_id}-{flags} Example: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01

func TraceContextFromContext

func TraceContextFromContext(ctx context.Context) (*TraceContext, bool)

TraceContextFromContext extracts trace context from context

func (*TraceContext) FormatTraceparent

func (tc *TraceContext) FormatTraceparent() string

FormatTraceparent formats trace context as W3C traceparent

func (*TraceContext) NewSpan

func (tc *TraceContext) NewSpan() *TraceContext

NewSpan creates a child span from existing trace context

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL