storage

package
v1.0.0-alpha.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 1 Imported by: 0

Documentation

Overview

Package storage provides pluggable backend interfaces for storage operations.

Overview

The storage package defines the core Store interface and related abstractions for persisting binary data with hierarchical key-value semantics. It provides a clean, implementation-agnostic API that supports multiple storage backends:

  • NATS JetStream ObjectStore (immutable, versioned)
  • AWS S3 or MinIO (mutable, object storage) - future
  • PostgreSQL (mutable, relational) - future

Core Concepts

Store Interface:

The Store interface uses a simple key-value pattern where:

  • Keys are strings (hierarchical paths supported via "/" separators)
  • Values are binary data ([]byte) - supports any format
  • Operations are context-aware for cancellation and timeouts

This simplicity enables a wide range of use cases: JSON messages, video files, images, or any binary data can be stored with the same interface.

Pluggable Abstractions:

The package defines two pluggable interfaces to enable framework composition:

  • KeyGenerator: Generates storage keys from messages
  • MetadataExtractor: Extracts metadata headers from messages

These abstractions allow semantic layers (like SemStreams) to provide custom key generation and metadata extraction strategies without modifying core storage.

Architecture Decisions

Simple Key-Value Model:

The Store interface intentionally uses a simple key-value model rather than richer abstractions like queries, indexes, or transactions. This decision:

  • Keeps implementations simple and focused
  • Allows diverse backends (object stores, databases, filesystems)
  • Pushes complex logic to higher layers where it belongs
  • Enables easy testing with mock implementations

Alternative considered: Query-based interface with filtering Rejected because: Too complex, limits backend options, semantic queries belong in semantic layers not generic storage.

Hierarchical Keys via "/" Convention:

Keys support hierarchical organization using "/" separators:

  • "video/sensor-123/2024-10-08.mp4"
  • "events/robotics/entity-456/2024-10-08T10:30:00Z"

This convention (not enforced by interface):

  • Works naturally with object stores (S3, NATS ObjectStore)
  • Enables prefix-based listing and filtering
  • Mirrors filesystem-like organization users expect

No Forced Immutability:

The Store interface allows implementations to choose mutability semantics:

  • Immutable stores (NATS ObjectStore): Put() may append version/timestamp
  • Mutable stores (S3, SQL): Put() overwrites existing values

This flexibility enables diverse backends while documenting behavior per implementation.

Context Everywhere:

All Store operations accept context.Context as the first parameter. This enables:

  • Cancellation of long-running operations
  • Timeout enforcement per operation
  • Request-scoped tracing and logging
  • Graceful shutdown of in-flight requests

Usage Examples

Basic Store Usage:

// Create NATS ObjectStore backend
store, err := objectstore.NewStoreWithConfig(ctx, natsClient, config)
if err != nil {
    log.Fatal(err)
}
defer store.Close()

// Store JSON message
msgJSON := []byte(`{"sensor": "temp-123", "value": 22.5}`)
key := "events/sensors/temp-123/2024-10-08T10:30:00Z"
err = store.Put(ctx, key, msgJSON)

// Retrieve message
data, err := store.Get(ctx, key)

// List all sensor events
keys, err := store.List(ctx, "events/sensors/")

// Delete old data
err = store.Delete(ctx, key)

Implementing Custom KeyGenerator:

type TimestampKeyGenerator struct {
    prefix string
}

func (g *TimestampKeyGenerator) GenerateKey(msg any) string {
    // Extract timestamp from message (handle type assertion)
    timestamp := time.Now().Format(time.RFC3339)
    return fmt.Sprintf("%s/%s", g.prefix, timestamp)
}

// Use with objectstore Component
component := objectstore.NewComponent(config, deps)
component.SetKeyGenerator(&TimestampKeyGenerator{prefix: "events"})

Implementing Custom MetadataExtractor:

type EntityMetadataExtractor struct{}

func (e *EntityMetadataExtractor) ExtractMetadata(msg any) map[string][]string {
    // Extract entity ID from message
    if entity, ok := msg.(Identifiable); ok {
        return map[string][]string{
            "entity-id":   {entity.GetID()},
            "entity-type": {entity.GetType()},
        }
    }
    return nil
}

Performance Characteristics

The performance of Store operations depends entirely on the backend implementation:

NATS ObjectStore (storage/objectstore):

  • Put: O(1) - direct write to JetStream
  • Get: O(1) - direct read with optional caching
  • List: O(n) - fetches all objects, filters client-side
  • Delete: O(1) - marks latest version as deleted

Future S3/MinIO Backend:

  • Put: O(1) - direct S3 upload
  • Get: O(1) - direct S3 download with caching
  • List: O(n) - S3 ListObjects with pagination
  • Delete: O(1) - S3 DeleteObject

Future PostgreSQL Backend:

  • Put: O(1) - INSERT or UPDATE
  • Get: O(log n) - indexed key lookup
  • List: O(n) - prefix query with index scan
  • Delete: O(log n) - indexed DELETE

Memory:

Store implementations should have bounded memory usage:

  • Get operations: O(message_size) for returned data
  • List operations: O(num_matching_keys) for key list
  • Put operations: O(message_size) during write

Caching (when enabled) adds:

  • O(cache_size * avg_message_size) persistent memory

Thread Safety

All Store implementations MUST be safe for concurrent use from multiple goroutines. This is a contract requirement of the Store interface.

Example implementations demonstrate thread safety:

  • objectstore.Store: NATS ObjectStore is thread-safe, cache is thread-safe

Integration with Components

The storage/objectstore package provides a Component wrapper that:

  • Exposes Store operations via NATS ports (Request/Response pattern)
  • Publishes storage events to NATS for observability
  • Integrates with component discovery and lifecycle
  • Supports configurable key generation and metadata extraction

This enables storage to participate in NATS-based flows without requiring direct Go API access.

Error Handling

Store implementations should return errors classified by the framework's error package:

  • errs.WrapInvalid: Invalid keys, malformed input
  • errs.WrapTransient: Network timeouts, temporary failures
  • errs.WrapFatal: Programming errors, nil pointers

Callers can distinguish error types for appropriate retry/recovery strategies.

Testing

The storage package emphasizes testing with real backends:

  • Use testcontainers for NATS JetStream
  • Avoid mocks - test actual storage behavior
  • Test with race detector enabled
  • Test context cancellation and timeout behavior

Example test pattern:

func TestStore_PutGet(t *testing.T) {
    natsClient := getSharedNATSClient(t) // Real NATS
    store, err := objectstore.NewStore(ctx, natsClient, "test-bucket")
    require.NoError(t, err)
    defer store.Close()

    // Test actual storage operations
    data := []byte("test data")
    err = store.Put(ctx, "test-key", data)
    require.NoError(t, err)

    retrieved, err := store.Get(ctx, "test-key")
    require.NoError(t, err)
    assert.Equal(t, data, retrieved)
}

See Also

  • storage/objectstore: NATS ObjectStore implementation
  • component: Component interface and lifecycle
  • message: Message types for semantic storage

Package storage provides pluggable backend interfaces for storage operations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KeyGenerator

type KeyGenerator interface {
	// GenerateKey creates a storage key for the given message.
	// The msg parameter can be any type - the generator should handle
	// type assertions gracefully and provide sensible defaults.
	GenerateKey(msg any) string
}

KeyGenerator generates storage keys for messages. This interface allows for pluggable key generation strategies, enabling SemStreams and other layers to provide custom key formats while keeping the core storage implementation generic.

Example implementations:

  • TimeBasedKeyGenerator: Uses timestamp-based keys
  • EntityBasedKeyGenerator: Uses entity IDs from Identifiable payloads
  • CompositeKeyGenerator: Combines multiple strategies

type MetadataExtractor

type MetadataExtractor interface {
	// ExtractMetadata returns metadata headers for the given message.
	// Returns nil if the message has no extractable metadata.
	// The msg parameter can be any type - the extractor should handle
	// type assertions gracefully.
	ExtractMetadata(msg any) map[string][]string
}

MetadataExtractor extracts metadata from messages for storage. This interface enables pluggable metadata extraction strategies, allowing SemStreams to add semantic metadata while keeping the core storage implementation generic.

Metadata is stored as HTTP-style headers (map[string][]string) to support NATS JetStream ObjectStore and other header-based systems.

type Store

type Store interface {
	// Put stores binary data at the specified key.
	// If the key already exists, behavior is implementation-specific:
	//   - Immutable stores (NATS ObjectStore) may append a version/timestamp
	//   - Mutable stores (S3, SQL) will overwrite the existing value
	//
	// The data parameter accepts any binary format:
	//   - JSON-encoded messages
	//   - Video files (MP4, etc.)
	//   - Images (JPEG, PNG, etc.)
	//   - Any []byte data
	Put(ctx context.Context, key string, data []byte) error

	// Get retrieves binary data for the specified key.
	// Returns an error if the key does not exist.
	//
	// The returned []byte should be interpreted by the caller based on
	// their knowledge of what was stored (JSON, video, etc.).
	Get(ctx context.Context, key string) ([]byte, error)

	// List returns all keys matching the specified prefix.
	// The prefix parameter supports hierarchical key patterns:
	//   - "" (empty) lists all keys
	//   - "video/" lists all keys starting with "video/"
	//   - "video/sensor-123/" lists keys for a specific sensor
	//
	// Keys are returned in lexicographic order.
	// Returns an empty slice if no keys match the prefix.
	List(ctx context.Context, prefix string) ([]string, error)

	// Delete removes the data at the specified key.
	// Returns nil if the key doesn't exist (idempotent operation).
	//
	// For immutable stores that maintain versions, this may only mark
	// the latest version as deleted rather than removing historical versions.
	Delete(ctx context.Context, key string) error
}

Store is the pluggable backend interface for storage operations.

Each storage component instance creates its own Store implementation with its own configuration (bucket name, connection string, etc.). Multiple Store instances can run concurrently, each backing a different storage component.

The Store interface uses a simple key-value pattern where:

  • Keys are strings (hierarchical paths supported via "/" separators)
  • Values are binary data ([]byte) - supports JSON, videos, images, any binary format
  • Operations are context-aware for cancellation and timeouts

Example implementations:

  • objectstore.Store: NATS JetStream ObjectStore backend
  • s3store.Store: AWS S3 or MinIO backend (future)
  • sqlstore.Store: PostgreSQL with (key, data) table (future)

Thread Safety: All Store implementations must be safe for concurrent use from multiple goroutines.

Example Usage:

// Create a NATS ObjectStore backend
store, err := objectstore.NewStoreWithConfig(ctx, natsClient, config)

// Store video file
videoData := []byte{...}
err = store.Put(ctx, "video/sensor-123/2024-10-08.mp4", videoData)

// Retrieve video
data, err := store.Get(ctx, "video/sensor-123/2024-10-08.mp4")

// List all videos for a sensor
keys, err := store.List(ctx, "video/sensor-123/")

Directories

Path Synopsis
Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.
Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL