logging

package
v1.0.0-alpha.63 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 6 Imported by: 0

README

Logging Package

Structured logging handlers with multi-destination support, NATS publishing, and graceful fallback behavior.

Features

  • Multi-Handler Composition: Dispatch logs to multiple destinations simultaneously
  • NATS Publishing: Publish logs to JetStream for real-time streaming
  • Source Filtering: Exclude specific sources from NATS publishing (prefix matching)
  • Graceful Degradation: NATS failures never block stdout logging
  • Async Publishing: Non-blocking NATS publish to avoid logging latency
  • Thread-Safe: Safe for concurrent use across goroutines

Installation

import "github.com/c360/semstreams/pkg/logging"

Quick Start

Basic Multi-Handler Setup
import (
    "log/slog"
    "os"
    
    "github.com/c360/semstreams/pkg/logging"
)

// Create stdout handler
stdoutHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
    Level: slog.LevelInfo,
})

// Create NATS handler
natsHandler := logging.NewNATSLogHandler(natsClient, logging.NATSLogHandlerConfig{
    MinLevel:       slog.LevelInfo,
    ExcludeSources: nil,
})

// Compose handlers - logs go to both stdout and NATS
multiHandler := logging.NewMultiHandler(stdoutHandler, natsHandler)
logger := slog.New(multiHandler)
slog.SetDefault(logger)
With Source Filtering
// Exclude WebSocket worker logs from NATS to prevent feedback loops
natsHandler := logging.NewNATSLogHandler(natsClient, logging.NATSLogHandlerConfig{
    MinLevel:       slog.LevelDebug,
    ExcludeSources: []string{"flow-service.websocket"},
})

// This goes to stdout only (source matches exclude prefix):
logger.With("source", "flow-service.websocket.health").Info("Health check")

// This goes to both stdout and NATS:
logger.With("source", "flow-service").Info("Flow started")

Handlers

MultiHandler

Composes multiple slog.Handler instances, dispatching log records to all of them.

multi := logging.NewMultiHandler(handler1, handler2, handler3)
logger := slog.New(multi)

// Logs dispatched to all three handlers
logger.Info("Hello world")

Key Behaviors:

  • If one handler fails, others continue processing
  • Enabled() returns true if ANY handler is enabled for the level
  • WithAttrs() and WithGroup() create new instances (immutable)
NATSLogHandler

Publishes log records to NATS subjects in the format logs.{source}.{level}.

natsHandler := logging.NewNATSLogHandler(natsClient, logging.NATSLogHandlerConfig{
    MinLevel:       slog.LevelInfo,
    ExcludeSources: []string{"noisy-component", "debug-only"},
})

Configuration:

Field Type Description
MinLevel slog.Level Minimum log level to publish to NATS
ExcludeSources []string Source prefixes to exclude from NATS publishing

Source Extraction

NATSLogHandler extracts the source identifier from log attributes with priority:

  1. source attribute (explicit)
  2. component attribute (component-tagged logs)
  3. service attribute (service-tagged logs)
  4. "system" (default fallback)
// Explicit source
logger.With("source", "my-service.worker").Info("Working")
// → logs.my-service.worker.INFO

// Component attribute
logger.With("component", "udp-input").Info("Packet received")
// → logs.udp-input.INFO

// No source attributes
slog.Info("System message")
// → logs.system.INFO

Source Filtering

Exclude sources from NATS publishing using prefix matching:

ExcludeSources: []string{"flow-service.websocket"}

Matching Rules:

  • "flow-service.websocket" → excluded
  • "flow-service.websocket.health" → excluded (prefix match)
  • "flow-service" → NOT excluded (different prefix)
  • "flow-service.api" → NOT excluded (different prefix)

Use Case: Prevent feedback loops where WebSocket worker debug logs are sent over the same WebSocket connection.

NATS Subject Pattern

Logs are published to subjects following this pattern:

logs.{source}.{level}
  └── logs.udp-input.INFO
  └── logs.graph-processor.ERROR
  └── logs.system.WARN
  └── logs.flow-service.DEBUG
JetStream Stream Configuration

Configure a LOGS stream with TTL and size limits:

StreamConfig{
    Name:     "LOGS",
    Subjects: []string{"logs.>"},
    MaxAge:   1 * time.Hour,        // TTL: expire after 1 hour
    MaxBytes: 100 * 1024 * 1024,    // 100MB max storage
    Discard:  DiscardOld,           // Drop oldest when full
}

Log Entry Format

Logs published to NATS are JSON-encoded:

{
    "timestamp": "2024-01-15T10:30:00.123456789Z",
    "level": "INFO",
    "source": "udp-input",
    "message": "Packet received",
    "fields": {
        "bytes": 1024,
        "remote_addr": "192.168.1.1:5000"
    }
}

Architecture

Data Flow
┌─────────────────────────────────────────────────────────────────┐
│                        slog.Logger                               │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                       MultiHandler                               │
│  ┌──────────────────────┐    ┌──────────────────────────────┐   │
│  │   TextHandler        │    │      NATSLogHandler          │   │
│  │   (stdout)           │    │                              │   │
│  │                      │    │  ┌────────────────────────┐  │   │
│  │                      │    │  │ Check MinLevel         │  │   │
│  │                      │    │  │ Extract Source         │  │   │
│  │                      │    │  │ Check ExcludeSources   │  │   │
│  │                      │    │  │ Async Publish to NATS  │  │   │
│  │                      │    │  └────────────────────────┘  │   │
│  └──────────────────────┘    └──────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
         │                                    │
         ▼                                    ▼
    ┌─────────┐                    ┌──────────────────┐
    │ stdout  │                    │  NATS JetStream  │
    └─────────┘                    │  (LOGS stream)   │
                                   └──────────────────┘
                                            │
                                            ▼
                                   ┌──────────────────┐
                                   │  WebSocket       │
                                   │  Status Stream   │
                                   └──────────────────┘
Graceful Degradation

The architecture ensures logging never fails due to NATS issues:

Scenario Behavior
NATS connected Logs go to stdout AND NATS
NATS disconnected Logs go to stdout only (NATS errors silently dropped)
NATS publish slow Async publish - stdout not blocked
Handler error Other handlers continue (MultiHandler ignores errors)

Performance

Benchmarks (M3 MacBook Pro)
Operation Time
MultiHandler dispatch (per handler) ~50ns
NATSLogHandler (async setup) ~100ns
Combined (stdout + NATS) ~150ns

At 10,000 logs/second: ~1.5ms total overhead per second.

Memory
  • MultiHandler: O(n) where n is number of handlers
  • NATSLogHandler: O(1) for handler state
  • Per log: O(m) where m is number of attributes
  • Async goroutines: Short-lived, minimal impact

Integration with WebSocket Status Stream

This package is designed to work with the service package's WebSocket status stream:

  1. Application logs → Published to NATS via NATSLogHandler
  2. LOGS JetStream stream → Stores logs with TTL (1hr default)
  3. WebSocket clients → Subscribe to logs.> subjects
  4. Real-time streaming → No slog interception timing issues

The ExcludeSources config prevents feedback loops where WebSocket worker logs would be sent over the WebSocket, generating more logs.

Testing

# Run tests with race detector
go test -race ./pkg/logging

# Run benchmarks
go test -bench=. ./pkg/logging

Example: Complete Application Setup

package main

import (
    "context"
    "log/slog"
    "os"

    "github.com/c360/semstreams/config"
    "github.com/c360/semstreams/natsclient"
    "github.com/c360/semstreams/pkg/logging"
)

func main() {
    // Connect to NATS
    natsClient, err := natsclient.NewClient("nats://localhost:4222")
    if err != nil {
        slog.Error("Failed to connect to NATS", "error", err)
        os.Exit(1)
    }
    defer natsClient.Close(context.Background())

    // Ensure LOGS stream exists
    streamsManager := config.NewStreamsManager(natsClient, slog.Default())
    if err := streamsManager.EnsureStreams(context.Background(), cfg); err != nil {
        slog.Error("Failed to ensure streams", "error", err)
        os.Exit(1)
    }

    // Setup multi-destination logging
    level := slog.LevelInfo
    
    stdoutHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
        Level: level,
    })
    
    natsHandler := logging.NewNATSLogHandler(natsClient, logging.NATSLogHandlerConfig{
        MinLevel:       level,
        ExcludeSources: []string{"flow-service.websocket"},
    })
    
    multiHandler := logging.NewMultiHandler(stdoutHandler, natsHandler)
    logger := slog.New(multiHandler)
    slog.SetDefault(logger)

    // Application logs now go to both stdout and NATS
    slog.Info("Application started", "version", "1.0.0")

    // Component-tagged logs
    componentLogger := slog.With("component", "processor")
    componentLogger.Info("Processing started", "batch_size", 100)

    // Source-tagged logs for fine-grained filtering
    workerLogger := slog.With("source", "flow-service.websocket")
    workerLogger.Debug("WebSocket worker tick") // Goes to stdout only
}
  • service: WebSocket status stream that consumes NATS logs
  • config: LOGS stream configuration with TTL and size limits
  • natsclient: NATS client used for publishing

License

See LICENSE file in repository root.

Documentation

Overview

Package logging provides slog handlers for structured logging with multi-destination support, NATS publishing, and graceful fallback behavior.

Overview

The logging package implements slog.Handler interfaces that enable logs to be written to multiple destinations simultaneously (stdout, NATS JetStream, etc.). This supports the out-of-band logging pattern where logs are always available via NATS for real-time streaming, even when WebSocket connections are not established.

Quick Start

Basic multi-handler setup:

// Create stdout handler
stdoutHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
    Level: slog.LevelInfo,
})

// Create NATS handler for publishing to logs.> subjects
natsHandler := logging.NewNATSLogHandler(natsClient, logging.NATSLogHandlerConfig{
    MinLevel:       slog.LevelInfo,
    ExcludeSources: []string{"flow-service.websocket"},
})

// Compose handlers
multiHandler := logging.NewMultiHandler(stdoutHandler, natsHandler)
logger := slog.New(multiHandler)
slog.SetDefault(logger)

Handlers

MultiHandler:

Composes multiple slog.Handler instances, dispatching log records to all of them. If one handler fails, others continue processing (graceful degradation).

multi := logging.NewMultiHandler(handler1, handler2, handler3)

NATSLogHandler:

Publishes log records to NATS subjects in the format logs.{source}.{level}. Publishing is asynchronous to avoid blocking the logging chain.

natsHandler := logging.NewNATSLogHandler(natsClient, logging.NATSLogHandlerConfig{
    MinLevel:       slog.LevelDebug,
    ExcludeSources: []string{"noisy-component"},
})

Source Extraction

NATSLogHandler extracts the source identifier from log attributes with the following priority: source > component > service > "system".

// Log with explicit source
logger.With("source", "my-component").Info("Processing started")
// Published to: logs.my-component.INFO

// Log with component attribute
logger.With("component", "udp-input").Info("Packet received")
// Published to: logs.udp-input.INFO

// Log without source attributes
slog.Info("System message")
// Published to: logs.system.INFO

Source Filtering

NATSLogHandler supports excluding sources from NATS publishing using prefix matching. This is useful for preventing log feedback loops (e.g., WebSocket worker logs being sent over WebSocket).

natsHandler := logging.NewNATSLogHandler(natsClient, logging.NATSLogHandlerConfig{
    MinLevel:       slog.LevelDebug,
    ExcludeSources: []string{"flow-service.websocket"},
})

// This log goes to stdout only, not NATS:
logger.With("source", "flow-service.websocket.health").Info("Health check")

// This log goes to both stdout and NATS:
logger.With("source", "flow-service").Info("Flow started")

The prefix matching rule: excluding "flow-service.websocket" also excludes "flow-service.websocket.health", but NOT "flow-service" itself.

NATS Subject Pattern

Logs are published to NATS subjects following this pattern:

logs.{source}.{level}
  └── logs.udp-input.INFO
  └── logs.graph-processor.ERROR
  └── logs.system.WARN

A JetStream stream named "LOGS" should be configured with appropriate TTL and size limits to prevent storage issues:

Stream: LOGS
Subjects: logs.>
MaxAge: 1h (TTL)
MaxBytes: 100MB
Discard: DiscardOld

Log Entry Format

Logs published to NATS are JSON-encoded:

{
    "timestamp": "2024-01-15T10:30:00.123456789Z",
    "level": "INFO",
    "source": "udp-input",
    "message": "Packet received",
    "fields": {
        "bytes": 1024,
        "remote_addr": "192.168.1.1:5000"
    }
}

Graceful Degradation

The logging architecture is designed to never block or fail due to NATS issues:

  • MultiHandler ignores errors from individual handlers
  • NATSLogHandler publishes asynchronously (non-blocking)
  • NATS publish errors are silently dropped
  • Stdout logging always works regardless of NATS state

This ensures that logging continues to function even when NATS is temporarily unavailable or experiencing issues.

Thread Safety

All handlers are safe for concurrent use:

  • MultiHandler dispatches to handlers sequentially but is safe for concurrent calls
  • NATSLogHandler uses atomic operations and goroutines for async publishing
  • WithAttrs and WithGroup create new handler instances (immutable pattern)

Integration with WebSocket Status Stream

This package is designed to work with the WebSocket status stream feature:

  1. Application logs are published to NATS via NATSLogHandler
  2. The LOGS JetStream stream stores logs with TTL
  3. WebSocket clients subscribe to logs.> subjects
  4. Real-time log streaming without slog interception timing issues

The exclude_sources configuration allows filtering out WebSocket worker logs to prevent feedback loops where log messages trigger more log messages.

Performance

Benchmarks on M3 MacBook Pro:

  • MultiHandler dispatch: ~50ns overhead per additional handler
  • NATSLogHandler: ~100ns for async publish setup (publish itself is async)
  • Combined (stdout + NATS): ~150ns per log call

At 10,000 logs/second, this adds ~1.5ms total overhead per second, which is negligible for most applications.

Memory:

  • MultiHandler: O(n) where n is number of handlers
  • NATSLogHandler: O(1) for handler, O(m) per log where m is attributes
  • Async publish goroutines: Short-lived, minimal memory impact

Testing

Both handlers have comprehensive test coverage:

go test -race ./pkg/logging

Tests verify:

  • Handler composition and dispatch
  • Source extraction priority
  • Prefix-based source filtering
  • Async publishing behavior
  • WithAttrs and WithGroup immutability

Example: Complete Setup

package main

import (
    "log/slog"
    "os"

    "github.com/c360studio/semstreams/natsclient"
    "github.com/c360studio/semstreams/pkg/logging"
)

func setupLogger(natsClient *natsclient.Client, level slog.Level) *slog.Logger {
    // Create stdout handler
    stdoutHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
        Level: level,
    })

    // Create NATS handler with source filtering
    natsHandler := logging.NewNATSLogHandler(natsClient, logging.NATSLogHandlerConfig{
        MinLevel:       level,
        ExcludeSources: []string{"flow-service.websocket"},
    })

    // Compose handlers
    multiHandler := logging.NewMultiHandler(stdoutHandler, natsHandler)

    return slog.New(multiHandler)
}

func main() {
    natsClient, _ := natsclient.NewClient("nats://localhost:4222")
    defer natsClient.Close()

    logger := setupLogger(natsClient, slog.LevelInfo)
    slog.SetDefault(logger)

    // Logs go to both stdout and NATS
    slog.Info("Application started", "version", "1.0.0")

    // Component-tagged logs
    componentLogger := slog.With("component", "processor")
    componentLogger.Info("Processing started")
}

See Also

  • service package: WebSocket status stream that consumes NATS logs
  • config package: LOGS stream configuration with TTL and size limits
  • natsclient package: NATS client used for publishing

Package logging provides slog handlers for structured logging across the application.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MultiHandler

type MultiHandler struct {
	// contains filtered or unexported fields
}

MultiHandler composes multiple slog.Handler instances, dispatching log records to all of them. This allows logs to be written to multiple destinations (e.g., stdout and NATS) simultaneously.

func NewMultiHandler

func NewMultiHandler(handlers ...slog.Handler) *MultiHandler

NewMultiHandler creates a new MultiHandler that dispatches to all provided handlers.

func (*MultiHandler) Enabled

func (m *MultiHandler) Enabled(ctx context.Context, level slog.Level) bool

Enabled reports whether any handler handles records at the given level.

func (*MultiHandler) Handle

func (m *MultiHandler) Handle(ctx context.Context, r slog.Record) error

Handle dispatches the record to all handlers. If a handler fails, we continue to the next handler (don't fail the logging chain).

func (*MultiHandler) WithAttrs

func (m *MultiHandler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs returns a new MultiHandler with the given attributes added to all handlers.

func (*MultiHandler) WithGroup

func (m *MultiHandler) WithGroup(name string) slog.Handler

WithGroup returns a new MultiHandler with the given group added to all handlers.

type NATSLogHandler

type NATSLogHandler struct {
	// contains filtered or unexported fields
}

NATSLogHandler is an slog.Handler that publishes log records to NATS JetStream. Logs are published to subjects in the format: logs.{level}.{source} This enables NATS wildcard filtering:

  • logs.WARN.> (all WARN and above)
  • logs.*.graph-processor (one component, all levels)

The handler requires a non-nil publisher at construction time. Create the handler AFTER NATS is connected and streams are created.

func NewNATSLogHandler

func NewNATSLogHandler(publisher NATSPublisher, cfg NATSLogHandlerConfig) *NATSLogHandler

NewNATSLogHandler creates a new NATSLogHandler. The publisher must be non-nil - create this handler AFTER NATS is connected.

func (*NATSLogHandler) Enabled

func (h *NATSLogHandler) Enabled(_ context.Context, level slog.Level) bool

Enabled reports whether the handler handles records at the given level.

func (*NATSLogHandler) Handle

func (h *NATSLogHandler) Handle(_ context.Context, r slog.Record) error

Handle publishes the log record to NATS.

func (*NATSLogHandler) WithAttrs

func (h *NATSLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs returns a new handler with the given attributes added.

func (*NATSLogHandler) WithGroup

func (h *NATSLogHandler) WithGroup(name string) slog.Handler

WithGroup returns a new handler with the given group added.

type NATSLogHandlerConfig

type NATSLogHandlerConfig struct {
	MinLevel       slog.Level
	ExcludeSources []string
}

NATSLogHandlerConfig holds configuration for NATSLogHandler.

type NATSPublisher

type NATSPublisher interface {
	PublishToStream(ctx context.Context, subject string, data []byte) error
}

NATSPublisher defines the interface needed for publishing to NATS JetStream. Uses PublishToStream for durability - logs are persisted to JetStream LOGS stream. This allows for easier testing with mocks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL