config

package
v1.0.0-alpha.52 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2026 License: MIT Imports: 21 Imported by: 0

README

Configuration Package README

Last Updated: 2025-09-12
Maintainer: SemStreams core Team

Purpose & Scope

What this component does: Manages application configuration for SemStreams with support for JSON file loading, environment variable overrides, NATS KV-based dynamic configuration, and thread-safe configuration updates.

Key responsibilities:

  • Load and parse JSON configuration files with layered overrides and defaults
  • Provide thread-safe concurrent access to configuration data via SafeConfig wrapper
  • Monitor NATS KV bucket for real-time configuration changes using Manager
  • Validate configuration data with extensible validation framework
  • Merge base configuration with dynamic KV overrides
  • Provide channel-based configuration updates to services
  • Support both typed (strongly structured) and flexible (map-based) configuration formats

NOT responsible for: Component lifecycle management, NATS connection establishment, business logic implementation, or configuration persistence (beyond JSON file writing).

Architecture Context

Integration Points
  • Consumes from: JSON configuration files, environment variables, NATS KV bucket "semstreams_config"
  • Provides to: All components requiring configuration (processors, managers, services, input handlers)
  • External dependencies: NATS JetStream for KV operations, file system for JSON file I/O
Data Flow
JSON Files + Env Vars → Loader → Config → SafeConfig → Services/Components
                                    ↓
NATS KV Changes → Manager → Channel Updates → Service/Component Updates
Configuration Structure

The configuration has been simplified to four main sections:

{
  "platform": {
    "org": "c360",                         // Organization namespace
    "id": "vessel-001",                    // Platform identifier
    "type": "vessel",                      // Platform type (vessel, shore, buoy, satellite)
    "region": "gulf_mexico",               // Geographic region
    "capabilities": ["radar", "ctd"],      // Platform capabilities
    "instance_id": "west-1",               // Instance identifier for federation
    "environment": "prod"                  // Environment (prod, dev, test)
  },
  "nats": {
    "urls": ["nats://localhost:4222"],     // NATS server URLs
    "max_reconnects": -1,                  // Unlimited reconnection attempts
    "reconnect_wait": "2s",                // Wait between reconnect attempts
    "username": "",                        // Optional username
    "password": "",                        // Optional password
    "token": "",                           // Optional auth token
    "tls": {                               // TLS configuration
      "enabled": false,
      "cert_file": "",
      "key_file": "",
      "ca_file": ""
    },
    "jetstream": {                         // JetStream configuration
      "enabled": true,
      "domain": "semstreams"
    }
  },
  "services": {                           // Service configurations (JSON-only format)
    "metrics": {                          // Each service gets complete JSON config
      "enabled": true,
      "port": 9090,
      "path": "/metrics"
    },
    "message_logger": {
      "enabled": true,
      "monitor_subjects": ["process.>", "input.>"],
      "max_entries": 10000,
      "output_to_stdout": false
    },
    "health": {
      "enabled": true,
      "port": 8080,
      "path": "/health"
    }
  },
  "components": {                         // Component instance configurations
    "udp-mavlink": {                      // Instance name as map key
      "type": "udp-input",
      "enabled": true,
      "config": {                        // Component-specific configuration
        "port": 14550,
        "bind": "0.0.0.0",
        "subject": "input.udp.mavlink"
      }
    },
    "graph-processor": {
      "type": "graph-processor",
      "enabled": true,
      "config": {
        "entity_bucket": "ENTITY_STATES",
        "cache_size": 10000
      }
    },
    "websocket-output": {
      "type": "websocket-output",
      "enabled": true,
      "config": {
        "port": 8081,
        "subjects": ["output.>"]
      }
    }
  }
}

Critical Behaviors (Testing Focus)

Happy Path - What Should Work
  1. JSON Configuration Loading: Parse configuration files with defaults and overrides

    • Input: Valid JSON file path with proper structure
    • Expected: Config struct populated with merged values (defaults + file + env)
    • Verification: Assert specific field values match expected merged result
  2. Thread-Safe Configuration Access: Concurrent read/write operations on SafeConfig

    • Input: Multiple goroutines calling Get() and Update() simultaneously
    • Expected: No race conditions, consistent data returned via deep copying
    • Verification: Use -race flag, verify no data corruption under concurrent load
  3. NATS KV Configuration Watching: Real-time configuration change detection

    • Input: Write configuration change to NATS KV bucket
    • Expected: Configuration updates received via channel within timeout
    • Verification: Channel receives Update with correct path and updated config
  4. Configuration Validation: Reject invalid configuration data

    • Input: Config with invalid platform.id, missing required fields, or malformed URLs
    • Expected: Specific validation errors returned, config rejected
    • Verification: Assert exact error messages match expected validation failures
  5. Environment Variable Override: Limited environment variable support for specific fields

    • Input: Set STREAMKIT_PLATFORM_ID="env_test" with different file value
    • Expected: Final config has environment variable value (if non-empty string)
    • Verification: Assert Platform.ID equals environment variable, not file value
    • Limitations: Cannot set booleans to false or integers to 0 (empty string check)
    • Supported Fields: Platform.ID, Platform.Type, NATS.URLs, NATS.Username, NATS.Password, NATS.Token
  6. Dynamic Configuration Merging: KV overrides merge with base configuration

    • Input: Base config with KV key "platform.identity" containing {"type": "shore"}
    • Expected: GetConfig() returns merged config with KV override applied
    • Verification: Assert Platform.Type equals KV value, other fields unchanged
    • Note: KV uses JSON-only format - entire objects, not individual fields
Error Conditions - What Should Fail Gracefully
  1. Invalid JSON Configuration: Malformed JSON files handled gracefully

    • Trigger: Load file with syntax errors, missing braces, invalid JSON
    • Expected: Specific parsing error with file path and line information
    • Recovery: System continues with defaults, logs error for debugging
  2. Missing Required Configuration: Required fields validation failures

    • Trigger: Config with empty platform.id or missing platform.org
    • Expected: ValidationError with specific field names and requirements
    • Recovery: Configuration loading fails early before component initialization
  3. NATS KV Connection Loss: Handle KV bucket unavailability

    • Trigger: NATS connection lost or KV bucket deleted during operation
    • Expected: Manager continues with current config, channel updates pause
    • Recovery: Reconnects when NATS available, resumes KV watching automatically
  4. Configuration Update Channel Blocking: Handle slow consumers

    • Trigger: Service doesn't read from config update channel quickly
    • Expected: Non-blocking sends, slow consumers miss updates (buffered channel)
    • Recovery: Manager continues monitoring, consumers get latest state on next read
  5. Invalid Environment Variable Values: Environment override validation failures

    • Trigger: Set STREAMKIT_NATS_URLS="invalid-url-format"
    • Expected: Validation error during config loading with specific field
    • Recovery: Use file or default value, log validation failure
  6. Concurrent Configuration Updates: Race condition prevention during updates

    • Trigger: Multiple threads calling Update() with different configs simultaneously
    • Expected: All updates serialized, final state consistent (last writer wins)
    • Recovery: No data corruption, all readers get valid configuration
Edge Cases - Boundary Conditions
  • Large Configuration Files: Handle 10MB+ JSON files within memory limits
  • High-Frequency KV Changes: Process rapid-fire configuration updates without dropping changes
  • Deep Configuration Nesting: Support 10+ levels of nested configuration objects
  • Unicode Configuration Values: Properly handle UTF-8 characters in config values
  • Long-Running Watchers: Manager operates reliably for days

Usage Patterns

Typical Usage (How Other Code Uses This)
// Load configuration with file layering (map-based merge)
loader := NewLoader()
loader.AddLayer("configs/base.json")      // Base configuration
loader.AddLayer("configs/production.json") // Override layer (CAN set zero values)
loader.EnableValidation(true)

// Map-based merging preserves JSON semantics:
// - Fields with zero values in overlay WILL override non-zero base values
// - Fields not present in overlay keep their base values
config, err := loader.Load()
if err != nil {
    log.Fatalf("Failed to load config: %v", err)
}

// Create Manager for centralized configuration management
configManager, err := NewConfigManager(config, natsClient)
if err != nil {
    log.Fatalf("Failed to create config manager: %v", err)
}

// Start Manager to watch for KV changes
if err := configManager.Start(ctx); err != nil {
    log.Fatalf("Failed to start config manager: %v", err)
}
defer configManager.Stop()

// Subscribe to configuration changes using pattern-based subscriptions
configChan := make(chan Update, 10)
configManager.Subscribe(ctx, "services.*", configChan)

// Process configuration updates in a goroutine
go func() {
    for update := range configChan {
        log.Printf("Config changed: %s", update.Path)
        cfg := update.Config.Get()
        // Apply new configuration to relevant services
        // Services receive complete JSON configs, not individual fields
    }
}()

// Use configuration safely in components
safeConfig := configManager.GetConfig()
cfg := safeConfig.Get()
platformID := cfg.Platform.ID
natsUrls := cfg.NATS.URLs

// Access service configurations (JSON format)
if metricsConfig, exists := cfg.Services["metrics"]; exists {
    // Parse service-specific configuration
    var metrics MetricsConfig
    json.Unmarshal(metricsConfig.Config, &metrics)
}
Common Integration Patterns
  • Component Initialization: Components receive ComponentConfig from the components map
  • Service Initialization: Services receive ServiceConfig JSON from the services map
  • Dynamic Reconfiguration: Services subscribe to Manager channels for real-time updates
  • Pattern-Based Subscriptions: Use wildcards like "services." or "components."
  • JSON-Only Updates: All configuration updates replace entire JSON objects, not individual fields

Testing Strategy

Test Categories
  1. Unit Tests: Individual functions (validation, parsing, merging) with isolated inputs
  2. Integration Tests: Full config loading with real NATS server and file system operations
  3. Concurrency Tests: Thread safety verification with race detection enabled
  4. Error Tests: Failure mode handling and recovery behavior validation
Test Quality Standards
  • ✅ Tests MUST create real configuration objects and verify actual behavior
  • ✅ Tests MUST verify thread safety using go test -race without warnings
  • ✅ Tests MUST test with real NATS KV buckets (use testcontainers for isolation)
  • ✅ Tests MUST verify configuration changes trigger handler callbacks
  • ✅ Tests MUST validate specific error messages and types for failure cases
  • ❌ NO tests that only verify struct field assignment without behavior validation
  • ❌ NO tests that mock the core configuration loading logic
  • ❌ NO tests that skip validation of concurrent access patterns
Mock vs Real Dependencies
  • Use real dependencies for: File system operations, JSON parsing, NATS KV buckets, configuration validation
  • Use mocks for: Component-specific configuration consumers that would create circular dependencies
  • Testcontainers for: NATS server instances to ensure isolated testing environment

Implementation Notes

Configuration Merge Implementation

The loader uses map-based merging that correctly handles zero values:

// Files are loaded as map[string]any, not structs
rawConfig, err := loadRawJSON("production.json")
// rawConfig only contains fields present in the JSON file

// Deep merge preserves JSON semantics
func deepMergeMaps(base, override map[string]any) map[string]any {
    // Zero values in JSON (false, 0, "") are NOT nil in maps
    // They WILL override non-zero base values
}

Key Properties:

  • ✅ CAN override true with false
  • ✅ CAN override 9090 with 0
  • ✅ CAN override ["a","b"] with []
  • ✅ Fields not in overlay file are preserved from base

Note: This is different from struct-based merging, which would have zero-value ambiguity.

Thread Safety
  • Concurrency model: SafeConfig uses RWMutex for protecting configuration access and updates
  • Deep Copying: Get() returns a deep copy using JSON marshal/unmarshal to prevent mutations
  • Shared state: Configuration data protected by mutex, Manager sends updates via channels
  • Critical sections: Configuration updates (Update method) and KV override application require write locks
Performance Considerations
  • Expected throughput: Handle 100+ configuration changes per second through KV watching
  • Memory usage: Configuration structures typically under 1MB, deep copying for safety
  • Bottlenecks: JSON marshaling/unmarshaling for deep copying, NATS KV round-trip latency
Error Handling Philosophy
  • Error propagation: Validation errors bubble up with specific field context and helpful messages
  • Retry strategy: Manager automatically reconnects to NATS, updates resume when available
  • Circuit breaking: Invalid configurations rejected early, system continues with known-good config

Troubleshooting

Common Issues
  1. Manager Not Receiving Updates: KV changes not detected

    • Cause: Channel subscription after KV changes, or NATS connection loss
    • Solution: Subscribe to patterns before changes, check NATS connection health
  2. Configuration Validation Failures: Valid configuration rejected by validator

    • Cause: Strict validation rules, missing required fields, or format mismatches
    • Solution: Review ValidationError details, check field requirements and format constraints
  3. Thread Safety Race Conditions: Data corruption under concurrent access

    • Cause: Direct Config access bypassing SafeConfig wrapper
    • Solution: Always use SafeConfig.Get() for reading, Update() for writing configuration
  4. Environment Variables Not Applied: Environment overrides ignored

    • Cause: Incorrect variable naming, env vars set after loader creation
    • Solution: Use STREAMKIT_ prefix, verify environment variables before loader.Load()
  5. Channel Buffer Overflow: Missing configuration updates

    • Cause: Slow consumer not reading from channel, buffer fills up
    • Solution: Process updates promptly, increase buffer size, or use select with default
Debug Information
  • Logs to check: Configuration loading progress, validation failures, KV watcher status
  • Metrics to monitor: Number of subscribers, configuration change frequency, channel buffer usage
  • Health checks: NATS connection state, KV bucket accessibility, channel status

Development Workflow

Before Making Changes
  1. Read this README to understand component purpose and integration points
  2. Check validation system to understand current constraints and extension points
  3. Identify which behaviors need testing (configuration loading, validation, KV watching)
  4. Update tests BEFORE changing code (TDD approach for configuration behavior)
After Making Changes
  1. Verify all existing tests still pass with go test -race enabled
  2. Add tests for new configuration fields or validation rules
  3. Update this README if responsibilities or integration points changed
  4. Check integration points still work (component initialization, dynamic reconfiguration)
  5. Update configuration examples if new fields or formats added

Documentation

Overview

Package config provides configuration management for StreamKit applications.

This package handles loading, validation, and dynamic updates of application configuration from JSON files, environment variables, and NATS KV store.

Core Components

Config: Main configuration structure containing platform settings, NATS connection details, service configurations, and component definitions.

SafeConfig: Thread-safe wrapper using RWMutex and deep cloning to prevent concurrent access issues and accidental mutations.

Manager: Manages the complete lifecycle of configuration including initialization, NATS KV watching, change notifications via channels, and graceful shutdown with timeout handling.

Loader: Loads configuration with layer merging (base + overrides) and environment variable substitution for flexible deployment scenarios.

Basic Usage

Loading configuration from files with layer merging:

loader := config.NewLoader()
loader.AddLayer("config/base.json")
loader.AddLayer("config/production.json") // Overrides base
loader.EnableValidation(true)

cfg, err := loader.Load()
if err != nil {
	log.Fatal(err)
}

Dynamic Configuration

Using Manager for runtime updates via NATS KV:

cm, err := config.NewConfigManager(cfg, natsClient, logger)
if err != nil {
	log.Fatal(err)
}

// Start watching for config changes
if err := cm.Start(ctx); err != nil {
	log.Fatal(err)
}
defer cm.Stop(5 * time.Second)

// Subscribe to specific config changes
updates := cm.OnChange("services.*")
for update := range updates {
	log.Printf("Service config changed: %s", update.Key)
}

Thread-Safe Access

SafeConfig ensures thread-safe access to configuration:

safeConfig := cm.GetConfig()

// Read config (deep copy returned, safe to use)
cfg := safeConfig.Get()

// Update config atomically
safeConfig.Update(func(cfg *Config) {
	cfg.Components["my-component"].Enabled = true
})

// Push updates to NATS KV
cm.PushToKV(ctx)

Environment Variable Overrides

Configuration values can be overridden using environment variables:

# Override platform ID
export STREAMKIT_PLATFORM_ID="prod-cluster-01"

# Override NATS URLs (comma-separated)
export STREAMKIT_NATS_URLS="nats://server1:4222,nats://server2:4222"

Layer Merging

Configuration layers are merged with last-wins semantics:

base.json:
  {"platform": {"id": "dev", "log_level": "debug"}}

production.json:
  {"platform": {"id": "prod"}}

Result:
  {"platform": {"id": "prod", "log_level": "debug"}}

Security

The package includes security validation:

  • File size limits (10MB max) to prevent memory exhaustion
  • JSON depth validation (100 levels max) to prevent DoS attacks
  • Path validation to prevent directory traversal
  • Regular file checks (no symlinks or device files)

Configuration Structure

The main Config struct contains:

type Config struct {
    Platform   PlatformConfig           // Platform metadata
    NATS       NATSConfig              // Message bus connection
    Services   map[string]any  // Service configurations
    Components map[string]ComponentConfig // Component definitions
}

See the README.md file for detailed examples and configuration patterns.

Package config provides configuration management for SemStreams.

Example (ComponentAccess)

Example_componentAccess demonstrates type-safe component configuration access.

package main

import (
	"fmt"
)

func main() {
	// Assume we have a loaded configuration
	// cfg := loadConfig()

	// Get component configuration with type checking
	// comp, exists := cfg.Components["udp-input"]
	// if !exists {
	//     log.Fatal("Component not found")
	// }

	// Access component properties
	// componentType := comp.Type
	// enabled := comp.Enabled
	// config := comp.Config

	// Type-safe access to nested config using helpers
	// bindAddr := cfg.GetString("components.udp-input.config.bind_address")
	// port := cfg.GetInt("components.udp-input.config.port")

	fmt.Println("Type-safe component access")
}
Output:

Type-safe component access

Index

Examples

Constants

View Source
const (
	StorageModeMemory = "memory" // In-memory only (original implementation)
	StorageModeKV     = "kv"     // NATS KV only (no local cache)
	StorageModeHybrid = "hybrid" // KV + local cache (recommended for production)
)

Storage mode constants

Variables

This section is empty.

Functions

func CompareVersions

func CompareVersions(v1, v2 string) (int, error)

CompareVersions compares two semver version strings Returns:

-1 if v1 < v2
 0 if v1 == v2
 1 if v1 > v2
error if either version is invalid

func DeriveStreamName

func DeriveStreamName(subject string) string

DeriveStreamName extracts stream name from subject convention. Convention: subject "component.action.type" → stream "COMPONENT" Examples:

"objectstore.stored.entity" → "OBJECTSTORE"
"sensor.processed.entity"   → "SENSOR"
"rule.triggered.alert"      → "RULE"

func DeriveStreamSubjects

func DeriveStreamSubjects(subject string) []string

DeriveStreamSubjects creates wildcard pattern for stream capture. Convention: subject "component.action.type" → ["component.>"] Examples:

"objectstore.stored.entity" → ["objectstore.>"]
"sensor.processed.entity"   → ["sensor.>"]

func ExpandEnvWithDefaults

func ExpandEnvWithDefaults(s string) string

ExpandEnvWithDefaults expands environment variables in a string, supporting ${VAR:-default} syntax for default values.

Patterns:

  • ${VAR} - expands to value of VAR, or empty if unset
  • ${VAR:-default} - expands to value of VAR, or "default" if unset
  • $VAR - expands to value of VAR, or empty if unset

func GetBool

func GetBool(cfg map[string]any, key string, defaultVal bool) bool

GetBool safely extracts a boolean value from a config map

func GetComponentConfig

func GetComponentConfig(cfg map[string]any, name string) (map[string]any, error)

GetComponentConfig safely extracts a component configuration section

func GetFloat64

func GetFloat64(cfg map[string]any, key string, defaultVal float64) float64

GetFloat64 safely extracts a float64 value from a config map

func GetInt

func GetInt(cfg map[string]any, key string, defaultVal int) int

GetInt safely extracts an integer value from a config map

func GetNestedBool

func GetNestedBool(cfg map[string]any, keys []string, defaultVal bool) bool

GetNestedBool safely extracts a nested boolean value from a config map

func GetNestedInt

func GetNestedInt(cfg map[string]any, keys []string, defaultVal int) int

GetNestedInt safely extracts a nested integer value from a config map

func GetNestedString

func GetNestedString(cfg map[string]any, keys []string, defaultVal string) string

GetNestedString safely extracts a nested string value from a config map

func GetString

func GetString(cfg map[string]any, key string, defaultVal string) string

GetString safely extracts a string value from a config map

func GetStringSlice

func GetStringSlice(cfg map[string]any, key string, defaultVal []string) []string

GetStringSlice safely extracts a string slice from a config map

func HasKey

func HasKey(cfg map[string]any, key string) bool

HasKey checks if a key exists in the config map

func HasNestedKey

func HasNestedKey(cfg map[string]any, keys []string) bool

HasNestedKey checks if a nested key path exists in the config map

Types

type BucketConfig

type BucketConfig struct {
	Name     string        `json:"name,omitempty"`      // Override default name if needed
	TTL      time.Duration `json:"ttl"`                 // 0 = no expiration
	History  int           `json:"history"`             // Number of versions to keep
	MaxBytes int64         `json:"max_bytes,omitempty"` // Size limit (0 = unlimited)
	Replicas int           `json:"replicas,omitempty"`  // Replication factor
}

BucketConfig defines configuration for a single KV bucket

type ComponentConfigs

type ComponentConfigs map[string]types.ComponentConfig

ComponentConfigs holds component instance configurations. The map key is the instance name (e.g., "udp-sensor-main"). Components are only created if both: 1. Their factory has been registered via init() 2. They have an entry in this config map with enabled=true

type ComponentRegistry

type ComponentRegistry interface {
	GetComponentSchema(componentType string) (component.ConfigSchema, error)
}

ComponentRegistry defines the interface needed for schema validation This allows dependency injection and testing

type Config

type Config struct {
	Version       string               `json:"version"` // Semantic version (e.g., "1.0.0") for KV sync control
	Platform      PlatformConfig       `json:"platform"`
	Security      security.Config      `json:"security,omitempty"` // Platform-wide security configuration
	NATS          NATSConfig           `json:"nats"`
	Services      types.ServiceConfigs `json:"services"`                 // Map of service configs
	Components    ComponentConfigs     `json:"components"`               // Map of component instance configs
	Streams       StreamConfigs        `json:"streams,omitempty"`        // Optional explicit JetStream stream definitions
	ModelRegistry *model.Registry      `json:"model_registry,omitempty"` // Unified model endpoint registry
}

Config represents the complete application configuration Simplified to 6 fields: Version (semver), Platform (identity), Security (TLS), NATS (connection), Services, Components

func (*Config) Clone

func (c *Config) Clone() *Config

Clone creates a deep copy of the configuration

func (*Config) GetOrg

func (c *Config) GetOrg() string

GetOrg returns the organization from platform config

func (*Config) GetPlatform

func (c *Config) GetPlatform() string

GetPlatform returns the platform identifier (prefer instance_id over id)

func (*Config) SaveToFile

func (c *Config) SaveToFile(path string) error

SaveToFile saves the configuration to a JSON file

func (*Config) String

func (c *Config) String() string

String returns a JSON representation of the config

func (*Config) UnmarshalJSON

func (c *Config) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom JSON unmarshaling for Config

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the config is valid

type CoreServicesConfig

type CoreServicesConfig struct {
	MessageLogger bool `json:"message_logger"` // Debug tool
	Discovery     bool `json:"discovery"`      // Component discovery
}

CoreServicesConfig defines which core services to enable

type JetStreamConfig

type JetStreamConfig struct {
	Enabled           bool   `json:"enabled"`
	Domain            string `json:"domain,omitempty"`
	MaxMemory         int64  `json:"max_memory,omitempty"`
	MaxFileStore      int64  `json:"max_file_store,omitempty"`
	RetentionPolicy   string `json:"retention_policy,omitempty"`
	ReplicationFactor int    `json:"replication_factor,omitempty"`
}

JetStreamConfig for JetStream settings

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

Loader handles configuration loading with layers and overrides

func NewLoader

func NewLoader() *Loader

NewLoader creates a new configuration loader

func (*Loader) AddLayer

func (l *Loader) AddLayer(path string)

AddLayer adds a configuration file layer

func (*Loader) EnableValidation

func (l *Loader) EnableValidation(enable bool)

EnableValidation enables or disables configuration validation

func (*Loader) Load

func (l *Loader) Load() (*Config, error)

Load loads and merges all configuration layers

Example

ExampleLoader_Load demonstrates loading configuration from multiple layers with environment variable overrides and validation.

package main

import (
	"fmt"
	"log"

	"github.com/c360studio/semstreams/config"
)

func main() {
	loader := config.NewLoader()

	// Add base configuration layer
	loader.AddLayer("testdata/base.json")

	// Add environment-specific overrides
	loader.AddLayer("testdata/production.json")

	// Enable validation to catch errors early
	loader.EnableValidation(true)

	// Load merged configuration
	cfg, err := loader.Load()
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(cfg.Platform.ID)
}
Output:

test-platform
Example (EnvironmentOverrides)

ExampleLoader_Load_environmentOverrides demonstrates using environment variables to override configuration values at runtime.

package main

import (
	"fmt"
	"log"

	"github.com/c360studio/semstreams/config"
)

func main() {
	// Set environment variables (in real usage, these would be set externally)
	// export STREAMKIT_PLATFORM_ID="prod-cluster-01"
	// export STREAMKIT_NATS_URLS="nats://server1:4222,nats://server2:4222"

	loader := config.NewLoader()
	loader.AddLayer("testdata/base.json")

	cfg, err := loader.Load()
	if err != nil {
		log.Fatal(err)
	}

	// Platform ID and NATS URLs can be overridden via environment
	fmt.Printf("Platform: %s\n", cfg.Platform.ID)
	fmt.Printf("NATS URLs: %v\n", cfg.NATS.URLs)
}

func (*Loader) LoadFile

func (l *Loader) LoadFile(path string) (*Config, error)

LoadFile loads configuration from a single file

func (*Loader) LoadFromBytes

func (l *Loader) LoadFromBytes(data []byte) (*Config, error)

LoadFromBytes loads configuration from JSON bytes. This is useful when you need to pre-process the configuration (e.g., environment variable expansion) before loading.

The data is validated and merged with defaults, just like LoadFile.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager provides centralized configuration management with channel-based updates

Example

ExampleManager demonstrates the complete lifecycle of dynamic configuration management with NATS KV watching.

package main

import (
	"fmt"
)

func main() {
	// This example shows the complete pattern, but cannot run without NATS
	// In real usage:

	// 1. Load initial configuration
	// loader := config.NewLoader()
	// loader.AddLayer("config/base.json")
	// cfg, err := loader.Load()

	// 2. Create Manager with NATS client
	// cm, err := config.NewConfigManager(cfg, natsClient, logger)
	// if err != nil {
	//     log.Fatal(err)
	// }

	// 3. Start watching for changes
	// ctx := context.Background()
	// if err := cm.Start(ctx); err != nil {
	//     log.Fatal(err)
	// }
	// defer cm.Stop(5 * time.Second)

	// 4. Subscribe to configuration changes
	// updates := cm.OnChange("components.*")
	// go func() {
	//     for update := range updates {
	//         log.Printf("Component config changed: %s = %v",
	//             update.Key, update.Value)
	//     }
	// }()

	// 5. Push local changes to NATS KV
	// safeConfig := cm.GetConfig()
	// safeConfig.Update(func(cfg *config.Config) {
	//     cfg.Components["new-component"] = config.ComponentConfig{
	//         Type:    "processor/json_map",
	//         Enabled: true,
	//     }
	// })
	// cm.PushToKV(ctx)

	fmt.Println("Dynamic configuration management")
}
Output:

Dynamic configuration management

func NewConfigManager

func NewConfigManager(cfg *Config, natsClient *natsclient.Client, logger *slog.Logger) (*Manager, error)

NewConfigManager creates a new configuration manager

func (*Manager) DeleteComponentFromKV

func (cm *Manager) DeleteComponentFromKV(ctx context.Context, name string) error

DeleteComponentFromKV deletes a component's configuration from NATS KV. This should be called when a component is removed (e.g., during undeploy). PushToKV only puts keys that exist in memory - it doesn't delete removed keys.

func (*Manager) GetConfig

func (cm *Manager) GetConfig() *SafeConfig

GetConfig returns the current configuration

func (*Manager) OnChange

func (cm *Manager) OnChange(pattern string) <-chan Update

OnChange subscribes to configuration changes matching the pattern Returns a channel that receives updates when configuration changes Pattern examples:

  • "services.metrics" - exact match
  • "services.*" - all services
  • "components.*" - all components
  • "components.udp-*" - components starting with udp-
Example

ExampleManager_OnChange demonstrates subscribing to specific configuration change patterns.

package main

import (
	"fmt"
)

func main() {
	// Assume we have a running Manager
	// cm := getConfigManager()

	// Subscribe to all service configuration changes
	// serviceUpdates := cm.OnChange("services.*")

	// Subscribe to specific component changes
	// componentUpdates := cm.OnChange("components.my-component")

	// Subscribe to platform configuration
	// platformUpdates := cm.OnChange("platform")

	// Process updates
	// go func() {
	//     for update := range serviceUpdates {
	//         log.Printf("Service updated: %s", update.Key)
	//         // React to configuration change
	//         handleServiceUpdate(update)
	//     }
	// }()

	fmt.Println("Subscribed to configuration changes")
}
Output:

Subscribed to configuration changes

func (*Manager) PushToKV

func (cm *Manager) PushToKV(ctx context.Context) error

PushToKV pushes the current configuration to NATS KV This is useful for initial setup or config synchronization

Example

ExampleManager_PushToKV demonstrates pushing local configuration changes to NATS KV for distribution to other instances.

package main

import (
	"fmt"
)

func main() {
	// This demonstrates the pattern for pushing config updates

	// Get the safe config wrapper
	// safeConfig := cm.GetConfig()

	// Make local changes
	// safeConfig.Update(func(cfg *config.Config) {
	//     cfg.Platform.LogLevel = "debug"
	//     cfg.Components["processor-1"].Enabled = false
	// })

	// Push changes to NATS KV
	// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	// defer cancel()
	//
	// if err := cm.PushToKV(ctx); err != nil {
	//     log.Printf("Failed to push config: %v", err)
	// }

	// Other instances watching the KV will receive the updates

	fmt.Println("Configuration pushed to NATS KV")
}
Output:

Configuration pushed to NATS KV

func (*Manager) PutComponentToKV

func (cm *Manager) PutComponentToKV(ctx context.Context, name string, compConfig types.ComponentConfig) error

PutComponentToKV writes a single component's configuration to NATS KV. This is more efficient than PushToKV when only one component has changed, and avoids race conditions with KV watchers when multiple operations are in flight.

func (*Manager) Start

func (cm *Manager) Start(ctx context.Context) error

Start begins watching for configuration changes

func (*Manager) Stop

func (cm *Manager) Stop(timeout time.Duration) error

Stop stops watching for configuration changes

Example

ExampleManager_Stop demonstrates graceful shutdown of Manager.

package main

import (
	"fmt"
)

func main() {
	// Assume we have a running Manager
	// cm := getConfigManager()

	// Graceful shutdown with timeout
	// timeout := 5 * time.Second
	// if err := cm.Stop(timeout); err != nil {
	//     log.Printf("Manager shutdown error: %v", err)
	// }

	// Stop is idempotent - safe to call multiple times
	// cm.Stop(timeout) // No error

	fmt.Println("Manager stopped gracefully")
}
Output:

Manager stopped gracefully

func (*Manager) ValidateAndPersistComponentConfig

func (cm *Manager) ValidateAndPersistComponentConfig(
	ctx context.Context,
	registry ComponentRegistry,
	componentName, componentType string,
	configJSON json.RawMessage,
) error

ValidateAndPersistComponentConfig validates and persists component configuration to KV This method combines validation with persistence in a single operation Returns validation errors if validation fails, or a persistence error wrapped appropriately

func (*Manager) ValidateComponentConfig

func (cm *Manager) ValidateComponentConfig(
	ctx context.Context,
	registry ComponentRegistry,
	componentType string,
	configJSON json.RawMessage,
) []component.ValidationError

ValidateComponentConfig validates a component configuration from KV format This is a convenience method that handles JSON unmarshaling

func (*Manager) ValidateWithSchema

func (cm *Manager) ValidateWithSchema(
	ctx context.Context,
	registry ComponentRegistry,
	componentType string,
	config map[string]any,
) []component.ValidationError

ValidateWithSchema validates component configuration against its schema Returns validation errors if the config doesn't meet schema requirements This function should be called before persisting configuration to KV

type MinimalConfig

type MinimalConfig struct {
	Platform PlatformConfig     `json:"platform"`
	NATS     NATSConfig         `json:"nats"`
	Services CoreServicesConfig `json:"services"`
}

MinimalConfig represents the core application configuration This is a simplified version focusing only on essential services

Example

ExampleMinimalConfig demonstrates using the simplified MinimalConfig for basic StreamKit applications.

package main

import (
	"fmt"
)

func main() {
	// MinimalConfig provides a simplified configuration structure
	// for applications that don't need the full Config complexity

	// Load minimal configuration
	// cfg, err := config.LoadMinimalConfig("config/minimal.json")
	// if err != nil {
	//     log.Fatal(err)
	// }

	// Access core settings
	// platformID := cfg.Platform.ID
	// natsURLs := cfg.NATS.URLs
	// messageLoggerEnabled := cfg.Services.MessageLogger

	// MinimalConfig includes:
	// - Platform configuration (ID, environment, logging)
	// - NATS connection settings
	// - Core service toggles (message logger, discovery)

	fmt.Println("Minimal configuration for simple applications")
}
Output:

Minimal configuration for simple applications

func LoadMinimalConfig

func LoadMinimalConfig(path string) (*MinimalConfig, error)

LoadMinimalConfig loads configuration from a file

func (*MinimalConfig) ToJSON

func (c *MinimalConfig) ToJSON() (string, error)

ToJSON converts config to JSON string for debugging

func (*MinimalConfig) Validate

func (c *MinimalConfig) Validate() error

Validate checks if the minimal config is valid

type NATSConfig

type NATSConfig struct {
	URLs          []string        `json:"urls,omitempty"`
	MaxReconnects int             `json:"max_reconnects,omitempty"`
	ReconnectWait time.Duration   `json:"reconnect_wait,omitempty"`
	Username      string          `json:"username,omitempty"`
	Password      string          `json:"password,omitempty"`
	Token         string          `json:"token,omitempty"`
	TLS           NATSTLSConfig   `json:"tls,omitempty"`
	JetStream     JetStreamConfig `json:"jetstream,omitempty"`
}

NATSConfig defines NATS connection settings

type NATSTLSConfig

type NATSTLSConfig struct {
	Enabled  bool   `json:"enabled"`
	CertFile string `json:"cert_file,omitempty"`
	KeyFile  string `json:"key_file,omitempty"`
	CAFile   string `json:"ca_file,omitempty"`
}

NATSTLSConfig for secure NATS connections

type PlatformConfig

type PlatformConfig struct {
	Org          string   `json:"org"`                    // Organization namespace (e.g., "c360", "noaa")
	ID           string   `json:"id"`                     // Platform identifier (e.g., "platform1")
	Type         string   `json:"type"`                   // vessel, shore, buoy, satellite
	Region       string   `json:"region,omitempty"`       // gulf_mexico, atlantic, pacific
	Capabilities []string `json:"capabilities,omitempty"` // radar, ctd, deployment, etc.

	// Federation support for multi-platform deployments
	InstanceID  string `json:"instance_id,omitempty"` // e.g., "west-1", "dev-local", "vessel-alpha"
	Environment string `json:"environment,omitempty"` // "prod", "dev", "test"
}

PlatformConfig defines platform identity and capabilities

type PortDefinition

type PortDefinition struct {
	Name    string `json:"name"`
	Subject string `json:"subject"`
	Type    string `json:"type"` // "nats", "jetstream", etc.
}

PortDefinition represents a single port definition.

type PortsConfig

type PortsConfig struct {
	Inputs  []PortDefinition `json:"inputs,omitempty"`
	Outputs []PortDefinition `json:"outputs,omitempty"`
}

PortsConfig represents the ports section of a component config.

type SafeConfig

type SafeConfig struct {
	// contains filtered or unexported fields
}

SafeConfig provides thread-safe access to configuration

func NewSafeConfig

func NewSafeConfig(cfg *Config) *SafeConfig

NewSafeConfig creates a new thread-safe config wrapper

func (*SafeConfig) Get

func (sc *SafeConfig) Get() *Config

Get returns a deep copy of the current configuration

Example

ExampleSafeConfig_Get demonstrates thread-safe configuration access. The Get method returns a deep copy, preventing accidental mutations.

package main

import (
	"fmt"
)

func main() {
	// Assume we have a Manager instance
	// safeConfig := configManager.GetConfig()

	// Get returns a deep copy - safe to use without locks
	// cfg := safeConfig.Get()

	// Read configuration values
	// platformID := cfg.Platform.ID
	// natsURLs := cfg.NATS.URLs

	// The returned config is a copy, so modifications don't affect
	// the shared state
	// cfg.Platform.ID = "modified" // Only affects this copy

	fmt.Println("Thread-safe configuration access")
}
Output:

Thread-safe configuration access

func (*SafeConfig) Update

func (sc *SafeConfig) Update(cfg *Config) error

Update atomically updates the configuration after validation

Example

ExampleSafeConfig_Update demonstrates atomic configuration updates.

package main

import (
	"fmt"
)

func main() {
	// Assume we have a Manager instance
	// safeConfig := configManager.GetConfig()

	// Update configuration atomically
	// safeConfig.Update(func(cfg *config.Config) {
	//     // Enable a component
	//     if comp, exists := cfg.Components["my-component"]; exists {
	//         comp.Enabled = true
	//         cfg.Components["my-component"] = comp
	//     }
	// })

	fmt.Println("Configuration updated atomically")
}
Output:

Configuration updated atomically

type StreamConfig

type StreamConfig struct {
	Subjects  []string `json:"subjects"`            // Subjects captured by this stream
	Storage   string   `json:"storage,omitempty"`   // "file" or "memory" (default: file)
	MaxAge    string   `json:"max_age,omitempty"`   // TTL for messages (e.g., "168h", "7d")
	MaxBytes  int64    `json:"max_bytes,omitempty"` // Max storage size in bytes (0 = unlimited)
	Retention string   `json:"retention,omitempty"` // "limits", "interest", "workqueue" (default: limits)
	Replicas  int      `json:"replicas,omitempty"`  // Replication factor (default: 1)
}

StreamConfig defines configuration for a JetStream stream.

type StreamConfigs

type StreamConfigs map[string]StreamConfig

StreamConfigs is a map of stream name to configuration.

type StreamsManager

type StreamsManager struct {
	// contains filtered or unexported fields
}

StreamsManager handles JetStream stream creation and management.

func NewStreamsManager

func NewStreamsManager(natsClient *natsclient.Client, logger *slog.Logger) *StreamsManager

NewStreamsManager creates a new StreamsManager.

func (*StreamsManager) EnsureStreams

func (sm *StreamsManager) EnsureStreams(ctx context.Context, cfg *Config) error

EnsureStreams creates all required JetStream streams based on: 1. System streams (LOGS for out-of-band logging) 2. Explicit streams defined in config.Streams (highest priority) 3. Streams derived from component JetStream output ports

type Update

type Update struct {
	Path   string      // Changed path (e.g., "services.metrics")
	Config *SafeConfig // Full latest configuration
}

Update represents a configuration change notification

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL