Documentation
¶
Overview ¶
Package storage provides pluggable backend interfaces for storage operations.
Overview ¶
The storage package defines the core Store interface and related abstractions for persisting binary data with hierarchical key-value semantics. It provides a clean, implementation-agnostic API that supports multiple storage backends:
- NATS JetStream ObjectStore (immutable, versioned)
- AWS S3 or MinIO (mutable, object storage) - future
- PostgreSQL (mutable, relational) - future
Core Concepts ¶
Store Interface:
The Store interface uses a simple key-value pattern where:
- Keys are strings (hierarchical paths supported via "/" separators)
- Values are binary data ([]byte) - supports any format
- Operations are context-aware for cancellation and timeouts
This simplicity enables a wide range of use cases: JSON messages, video files, images, or any binary data can be stored with the same interface.
Pluggable Abstractions:
The package defines two pluggable interfaces to enable framework composition:
- KeyGenerator: Generates storage keys from messages
- MetadataExtractor: Extracts metadata headers from messages
These abstractions allow semantic layers (like SemStreams) to provide custom key generation and metadata extraction strategies without modifying core storage.
Architecture Decisions ¶
Simple Key-Value Model:
The Store interface intentionally uses a simple key-value model rather than richer abstractions like queries, indexes, or transactions. This decision:
- Keeps implementations simple and focused
- Allows diverse backends (object stores, databases, filesystems)
- Pushes complex logic to higher layers where it belongs
- Enables easy testing with mock implementations
Alternative considered: Query-based interface with filtering Rejected because: Too complex, limits backend options, semantic queries belong in semantic layers not generic storage.
Hierarchical Keys via "/" Convention:
Keys support hierarchical organization using "/" separators:
- "video/sensor-123/2024-10-08.mp4"
- "events/robotics/entity-456/2024-10-08T10:30:00Z"
This convention (not enforced by interface):
- Works naturally with object stores (S3, NATS ObjectStore)
- Enables prefix-based listing and filtering
- Mirrors filesystem-like organization users expect
No Forced Immutability:
The Store interface allows implementations to choose mutability semantics:
- Immutable stores (NATS ObjectStore): Put() may append version/timestamp
- Mutable stores (S3, SQL): Put() overwrites existing values
This flexibility enables diverse backends while documenting behavior per implementation.
Context Everywhere:
All Store operations accept context.Context as the first parameter. This enables:
- Cancellation of long-running operations
- Timeout enforcement per operation
- Request-scoped tracing and logging
- Graceful shutdown of in-flight requests
Usage Examples ¶
Basic Store Usage:
// Create NATS ObjectStore backend
store, err := objectstore.NewStoreWithConfig(ctx, natsClient, config)
if err != nil {
log.Fatal(err)
}
defer store.Close()
// Store JSON message
msgJSON := []byte(`{"sensor": "temp-123", "value": 22.5}`)
key := "events/sensors/temp-123/2024-10-08T10:30:00Z"
err = store.Put(ctx, key, msgJSON)
// Retrieve message
data, err := store.Get(ctx, key)
// List all sensor events
keys, err := store.List(ctx, "events/sensors/")
// Delete old data
err = store.Delete(ctx, key)
Implementing Custom KeyGenerator:
type TimestampKeyGenerator struct {
prefix string
}
func (g *TimestampKeyGenerator) GenerateKey(msg any) string {
// Extract timestamp from message (handle type assertion)
timestamp := time.Now().Format(time.RFC3339)
return fmt.Sprintf("%s/%s", g.prefix, timestamp)
}
// Use with objectstore Component
component := objectstore.NewComponent(config, deps)
component.SetKeyGenerator(&TimestampKeyGenerator{prefix: "events"})
Implementing Custom MetadataExtractor:
type EntityMetadataExtractor struct{}
func (e *EntityMetadataExtractor) ExtractMetadata(msg any) map[string][]string {
// Extract entity ID from message
if entity, ok := msg.(Identifiable); ok {
return map[string][]string{
"entity-id": {entity.GetID()},
"entity-type": {entity.GetType()},
}
}
return nil
}
Performance Characteristics ¶
The performance of Store operations depends entirely on the backend implementation:
NATS ObjectStore (storage/objectstore):
- Put: O(1) - direct write to JetStream
- Get: O(1) - direct read with optional caching
- List: O(n) - fetches all objects, filters client-side
- Delete: O(1) - marks latest version as deleted
Future S3/MinIO Backend:
- Put: O(1) - direct S3 upload
- Get: O(1) - direct S3 download with caching
- List: O(n) - S3 ListObjects with pagination
- Delete: O(1) - S3 DeleteObject
Future PostgreSQL Backend:
- Put: O(1) - INSERT or UPDATE
- Get: O(log n) - indexed key lookup
- List: O(n) - prefix query with index scan
- Delete: O(log n) - indexed DELETE
Memory:
Store implementations should have bounded memory usage:
- Get operations: O(message_size) for returned data
- List operations: O(num_matching_keys) for key list
- Put operations: O(message_size) during write
Caching (when enabled) adds:
- O(cache_size * avg_message_size) persistent memory
Thread Safety ¶
All Store implementations MUST be safe for concurrent use from multiple goroutines. This is a contract requirement of the Store interface.
Example implementations demonstrate thread safety:
- objectstore.Store: NATS ObjectStore is thread-safe, cache is thread-safe
Integration with Components ¶
The storage/objectstore package provides a Component wrapper that:
- Exposes Store operations via NATS ports (Request/Response pattern)
- Publishes storage events to NATS for observability
- Integrates with component discovery and lifecycle
- Supports configurable key generation and metadata extraction
This enables storage to participate in NATS-based flows without requiring direct Go API access.
Error Handling ¶
Store implementations should return errors classified by the framework's error package:
- errs.WrapInvalid: Invalid keys, malformed input
- errs.WrapTransient: Network timeouts, temporary failures
- errs.WrapFatal: Programming errors, nil pointers
Callers can distinguish error types for appropriate retry/recovery strategies.
Testing ¶
The storage package emphasizes testing with real backends:
- Use testcontainers for NATS JetStream
- Avoid mocks - test actual storage behavior
- Test with race detector enabled
- Test context cancellation and timeout behavior
Example test pattern:
func TestStore_PutGet(t *testing.T) {
natsClient := getSharedNATSClient(t) // Real NATS
store, err := objectstore.NewStore(ctx, natsClient, "test-bucket")
require.NoError(t, err)
defer store.Close()
// Test actual storage operations
data := []byte("test data")
err = store.Put(ctx, "test-key", data)
require.NoError(t, err)
retrieved, err := store.Get(ctx, "test-key")
require.NoError(t, err)
assert.Equal(t, data, retrieved)
}
See Also ¶
- storage/objectstore: NATS ObjectStore implementation
- component: Component interface and lifecycle
- message: Message types for semantic storage
Package storage provides pluggable backend interfaces for storage operations.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KeyGenerator ¶
type KeyGenerator interface {
// GenerateKey creates a storage key for the given message.
// The msg parameter can be any type - the generator should handle
// type assertions gracefully and provide sensible defaults.
GenerateKey(msg any) string
}
KeyGenerator generates storage keys for messages. This interface allows for pluggable key generation strategies, enabling SemStreams and other layers to provide custom key formats while keeping the core storage implementation generic.
Example implementations:
- TimeBasedKeyGenerator: Uses timestamp-based keys
- EntityBasedKeyGenerator: Uses entity IDs from Identifiable payloads
- CompositeKeyGenerator: Combines multiple strategies
type MetadataExtractor ¶
type MetadataExtractor interface {
// ExtractMetadata returns metadata headers for the given message.
// Returns nil if the message has no extractable metadata.
// The msg parameter can be any type - the extractor should handle
// type assertions gracefully.
ExtractMetadata(msg any) map[string][]string
}
MetadataExtractor extracts metadata from messages for storage. This interface enables pluggable metadata extraction strategies, allowing SemStreams to add semantic metadata while keeping the core storage implementation generic.
Metadata is stored as HTTP-style headers (map[string][]string) to support NATS JetStream ObjectStore and other header-based systems.
type Store ¶
type Store interface {
// Put stores binary data at the specified key.
// If the key already exists, behavior is implementation-specific:
// - Immutable stores (NATS ObjectStore) may append a version/timestamp
// - Mutable stores (S3, SQL) will overwrite the existing value
//
// The data parameter accepts any binary format:
// - JSON-encoded messages
// - Video files (MP4, etc.)
// - Images (JPEG, PNG, etc.)
// - Any []byte data
Put(ctx context.Context, key string, data []byte) error
// Get retrieves binary data for the specified key.
// Returns an error if the key does not exist.
//
// The returned []byte should be interpreted by the caller based on
// their knowledge of what was stored (JSON, video, etc.).
Get(ctx context.Context, key string) ([]byte, error)
// List returns all keys matching the specified prefix.
// The prefix parameter supports hierarchical key patterns:
// - "" (empty) lists all keys
// - "video/" lists all keys starting with "video/"
// - "video/sensor-123/" lists keys for a specific sensor
//
// Keys are returned in lexicographic order.
// Returns an empty slice if no keys match the prefix.
List(ctx context.Context, prefix string) ([]string, error)
// Delete removes the data at the specified key.
// Returns nil if the key doesn't exist (idempotent operation).
//
// For immutable stores that maintain versions, this may only mark
// the latest version as deleted rather than removing historical versions.
Delete(ctx context.Context, key string) error
}
Store is the pluggable backend interface for storage operations.
Each storage component instance creates its own Store implementation with its own configuration (bucket name, connection string, etc.). Multiple Store instances can run concurrently, each backing a different storage component.
The Store interface uses a simple key-value pattern where:
- Keys are strings (hierarchical paths supported via "/" separators)
- Values are binary data ([]byte) - supports JSON, videos, images, any binary format
- Operations are context-aware for cancellation and timeouts
Example implementations:
- objectstore.Store: NATS JetStream ObjectStore backend
- s3store.Store: AWS S3 or MinIO backend (future)
- sqlstore.Store: PostgreSQL with (key, data) table (future)
Thread Safety: All Store implementations must be safe for concurrent use from multiple goroutines.
Example Usage:
// Create a NATS ObjectStore backend
store, err := objectstore.NewStoreWithConfig(ctx, natsClient, config)
// Store video file
videoData := []byte{...}
err = store.Put(ctx, "video/sensor-123/2024-10-08.mp4", videoData)
// Retrieve video
data, err := store.Get(ctx, "video/sensor-123/2024-10-08.mp4")
// List all videos for a sensor
keys, err := store.List(ctx, "video/sensor-123/")
Directories
¶
| Path | Synopsis |
|---|---|
|
Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.
|
Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching. |