objectstore

package
v1.0.0-alpha.39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: MIT Imports: 21 Imported by: 0

README

ObjectStore

NATS JetStream ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.

Purpose

ObjectStore provides persistent, versioned storage for messages and binary data using NATS JetStream's ObjectStore feature. It implements the storage.Store interface with optional LRU/TTL/Hybrid caching for high-performance reads, time-bucketed key organization for efficient temporal queries, and component integration via NATS ports for request/response API access and event streaming.

Configuration

components:
  - type: objectstore
    config:
      ports:
        inputs:
          - name: write
            type: nats
            subject: storage.objectstore.write
            description: Fire-and-forget async writes
          - name: api
            type: nats-request
            subject: storage.objectstore.api
            description: Request/Response API (get, store, list)
        outputs:
          - name: events
            type: nats
            subject: storage.objectstore.events
            description: Storage events (stored, retrieved)
          - name: stored
            type: jetstream
            subject: storage.objectstore.stored
            interface: storage.stored.v1
            description: StoredMessage with StorageRef for downstream processing
      bucket_name: MESSAGES
      data_cache:
        enabled: true
        max_size: 1000
        ttl: 300  # 5 minutes in seconds
Configuration Options
Field Type Default Description
bucket_name string MESSAGES NATS ObjectStore bucket name
data_cache.enabled bool true Enable in-memory caching
data_cache.max_size int 1000 Maximum cache entries (LRU/Hybrid)
data_cache.ttl int 300 Cache TTL in seconds (TTL/Hybrid)
Caching Strategies

Choose caching strategy based on workload characteristics:

LRU (Least Recently Used)

  • Best for immutable data with fixed working set
  • Example: Video archive, historical events
  • Memory: max_size * avg_message_size

TTL (Time-To-Live)

  • Best for fresh data requirements
  • Example: Sensor readings, real-time events
  • Memory: write_rate * ttl * avg_message_size

Hybrid (LRU + TTL)

  • Best for read-heavy workloads with freshness requirements
  • Example: Entity states, recent events
  • Memory: min(lru_memory, ttl_memory)

No Cache

  • Best for write-heavy, large messages, or unique reads
  • Example: Video ingestion, one-time analytics
  • Set data_cache.enabled: false

Input/Output Ports

Input Ports

write (NATS/JetStream)

  • Pattern: Fire-and-forget async writes
  • Subject: storage.objectstore.write
  • Payload: Raw message data (any format)
  • Behavior: Stores message and publishes storage event
  • Auto-detects ContentStorable payloads for enhanced storage

api (NATS Request/Response)

  • Pattern: Synchronous request/response
  • Subject: storage.objectstore.api
  • Payload: JSON Request with action (get, store, list)
  • Response: JSON Response with data or keys
  • Timeout: 2 seconds
Output Ports

events (NATS)

  • Pattern: Publish
  • Subject: storage.objectstore.events
  • Payload: JSON Event (stored, retrieved)
  • Purpose: Monitoring and audit trail

stored (JetStream)

  • Pattern: Publish
  • Subject: storage.objectstore.stored
  • Interface: storage.stored.v1
  • Payload: StoredMessage with StorageReference
  • Purpose: Semantic processing of stored content

Storage Operations

Put Operation

Stores data as immutable versioned object:

{
  "action": "store",
  "data": {"key": "value"}
}

Response:

{
  "success": true,
  "key": "events/2024/10/08/14/abc123-def456"
}

Performance:

  • Latency: 10-50ms (NATS RTT dependent)
  • Memory: O(message_size)
  • Versioning: Automatic, preserves history
Get Operation

Retrieves latest version with optional caching:

{
  "action": "get",
  "key": "events/2024/10/08/14/abc123-def456"
}

Response:

{
  "success": true,
  "key": "events/2024/10/08/14/abc123-def456",
  "data": {"key": "value"}
}

Performance:

  • Cache hit: ~100μs (in-memory)
  • Cache miss: 10-50ms (NATS ObjectStore read)
  • Memory: O(message_size) + cache overhead
List Operation

Returns all keys matching prefix:

{
  "action": "list",
  "prefix": "events/2024/10/08/"
}

Response:

{
  "success": true,
  "keys": [
    "events/2024/10/08/14/abc123-def456",
    "events/2024/10/08/14/def789-ghi012"
  ]
}

Performance:

  • Latency: O(n) where n = total objects in bucket
  • Network: Fetches metadata for all objects
  • Client-side filtering (NATS limitation)
  • Memory: O(num_objects)
Delete Operation

Marks latest version as deleted (preserves history):

err := store.Delete(ctx, "events/2024/10/08/14/abc123-def456")

Key Generation

Time-Bucketed Keys (Default)

Format: {prefix}/year/month/day/hour/{unique-id}

Example: events/2024/10/08/14/abc123-def456

Benefits:

  • Chronological organization for range queries
  • Distributes objects across hierarchy
  • Efficient cleanup by time bucket
  • Mirrors time-series access patterns
Custom Key Generation

Implement storage.KeyGenerator interface for entity-based keys:

type EntityKeyGenerator struct {
    prefix string
}

func (g *EntityKeyGenerator) GenerateKey(msg any) string {
    entity := msg.(Identifiable)
    timestamp := time.Now().Format(time.RFC3339)
    return fmt.Sprintf("%s/%s/%s", g.prefix, entity.GetID(), timestamp)
}

Example Use Cases

Video Archive Storage

Store large binary files with LRU caching for frequently accessed content:

components:
  - type: objectstore
    config:
      bucket_name: video-archive
      data_cache:
        enabled: true
        max_size: 100
        ttl: 3600

Workflow:

  1. Receive video file on write port
  2. Store to ObjectStore with time-bucketed key
  3. Publish stored event to events port
  4. Cache popular videos for fast retrieval
Sensor Data Storage

Store real-time sensor readings with TTL caching for fresh data:

components:
  - type: objectstore
    config:
      bucket_name: sensor-data
      data_cache:
        enabled: true
        max_size: 5000
        ttl: 300

Workflow:

  1. Ingest sensor readings via write port
  2. Store with entity-based keys (sensor ID + timestamp)
  3. Query recent readings via api port with List operation
  4. Expire old cache entries after 5 minutes
Event Sourcing

Store immutable event history with hybrid caching:

components:
  - type: objectstore
    config:
      bucket_name: event-store
      data_cache:
        enabled: true
        max_size: 10000
        ttl: 600

Workflow:

  1. Append events via write port (fire-and-forget)
  2. Automatic versioning preserves complete history
  3. Replay events using List operation by time bucket
  4. Recent events cached for fast access
Content-Addressable Storage

Store ContentStorable payloads with automatic key generation and StorageRef emission:

components:
  - type: objectstore
    config:
      ports:
        outputs:
          - name: stored
            type: jetstream
            subject: storage.objectstore.stored
      bucket_name: content-store

Workflow:

  1. Send message implementing ContentStorable to write port
  2. ObjectStore detects interface and calls StoreContent(ctx, ContentStorable)
  3. Generates key from EntityID() and ContentType()
  4. Wraps content in StoredContent envelope with metadata
  5. Emits StoredMessage with StorageReference to downstream processors

Architecture Patterns

Immutable Storage with Versioning

NATS ObjectStore creates new versions on each Put rather than overwriting:

Benefits:

  • Preserves complete history for audit/replay
  • Enables time-travel queries
  • Prevents accidental data loss
  • Supports append-only semantics

Trade-off: Cannot modify data in-place. Use Delete + Put to "update".

Client-Side List Filtering

List operation fetches all objects from NATS, then filters by prefix client-side:

Limitations:

  • O(n) performance where n = total objects
  • May be slow with >1000 objects
  • Consider pagination or external indexing for large datasets

Rationale: NATS ObjectStore doesn't support server-side prefix filtering (current version).

Request/Response API Pattern

Component exposes storage operations via NATS Request/Response:

Benefits:

  • Remote access without direct Go API
  • Supports web clients and non-Go systems
  • Enables service-to-service storage queries

Trade-off: 2-second timeout per request.

Thread Safety

All operations are safe for concurrent use:

  • Store methods: Thread-safe via NATS ObjectStore and cache concurrency
  • Component handlers: Each NATS message processed in separate goroutine
  • Metrics: Atomic counters
  • Cache: Thread-safe by cache implementation contract

No explicit locks required in application code.

Performance Characteristics

Operation Latency Throughput Memory
Put 10-50ms NATS-limited O(message_size)
Get (cache hit) ~100μs Very high O(message_size) + cache
Get (cache miss) 10-50ms NATS-limited O(message_size) + cache
List O(n) objects Network-limited O(num_objects)

Caching Impact:

  • Read latency: 100x faster (50ms → 100μs)
  • Memory overhead: O(cache_size * avg_message_size)
  • Write latency: Unaffected (write-through caching)

Known Limitations

  1. List performance: O(n) with all objects, may be slow with >1000 objects
  2. No batch operations: Must Put/Get one at a time
  3. No version retrieval: Can only access latest version
  4. No server-side filtering: List filters client-side
  5. Cache invalidation: No automatic invalidation on external updates

Error Handling

Store operations return errors for:

  • Network failures: NATS connection lost, timeouts
  • Not found: Key doesn't exist (Get, Delete)
  • Invalid input: Empty keys, nil data
  • Resource limits: Bucket quota exceeded

Component operations:

  • Invalid requests: Malformed JSON, unknown actions
  • Store errors: Wrapped with operation context
  • Timeout errors: Operation exceeded deadline

Testing

Integration tests use testcontainers for real NATS instances:

task test:integration  # Run integration tests with testcontainers
task test:race         # Run with race detector

Example test:

func TestStore_Integration(t *testing.T) {
    natsClient := getSharedNATSClient(t)
    store, err := objectstore.NewStore(ctx, natsClient, "test-bucket")
    require.NoError(t, err)
    defer store.Close()

    data := []byte("test data")
    err = store.Put(ctx, "test-key", data)
    require.NoError(t, err)

    retrieved, err := store.Get(ctx, "test-key")
    require.NoError(t, err)
    assert.Equal(t, data, retrieved)
}

See Also

  • /storage - Core storage interfaces
  • /pkg/cache - Caching implementations (LRU, TTL, Hybrid)
  • /component - Component interface and lifecycle
  • /natsclient - NATS client wrapper

Documentation

Overview

Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.

Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.

Package objectstore provides a NATS JetStream ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.

Overview

The objectstore package implements the storage.Store interface using NATS JetStream's ObjectStore feature. It provides:

  • Immutable storage with automatic versioning
  • Time-bucketed key generation for efficient organization
  • Optional caching (LRU, TTL, or Hybrid)
  • Component wrapper for NATS-based integration
  • Configurable metadata extraction

Core Components

Store:

The Store type implements storage.Store using NATS ObjectStore:

  • Put: Stores data as immutable objects (versioned)
  • Get: Retrieves latest version with optional caching
  • List: Returns all keys matching prefix (client-side filter)
  • Delete: Marks latest version as deleted (preserves history)

Component:

The Component wrapper exposes Store via NATS ports:

  • "api" port: Request/Response for Get/Put/List operations
  • "write" port: Fire-and-forget async writes
  • "events" port: Publishes storage events (stored, retrieved, deleted)

Architecture Decisions

Immutable Storage with Versioning:

NATS ObjectStore is immutable - each Put creates a new version rather than overwriting. This design:

  • Preserves complete history for audit/replay
  • Enables time-travel queries
  • Prevents accidental data loss
  • Supports append-only semantics naturally

Trade-off: Cannot modify existing data in-place. Use Delete + Put to "update".

Time-Bucketed Keys:

The DefaultKeyGenerator creates hierarchical keys by timestamp:

  • Format: "{prefix}/year/month/day/hour/{unique-id}"
  • Example: "events/2024/10/08/14/abc123-def456"

This bucketing:

  • Organizes data chronologically for range queries
  • Distributes objects across hierarchy to avoid hot-spots
  • Enables efficient cleanup by time bucket
  • Mirrors common time-series access patterns

Alternative considered: Flat keys with embedded timestamps Rejected because: Harder to navigate, less efficient for time-range queries, doesn't leverage hierarchical storage systems.

Optional Caching Layer:

The Store can optionally use cache.Cache for retrieved objects:

  • LRU: Fixed-size cache, evicts least recently used
  • TTL: Time-based expiration for fresh data
  • Hybrid: Combines LRU size limits with TTL expiration

Caching decisions:

  • Read-heavy workloads: Use Hybrid cache (size + TTL)
  • Write-heavy workloads: Disable cache or use small TTL
  • Immutable data: Use LRU (data never changes)

Client-Side List Filtering:

The List() operation fetches all objects from NATS ObjectStore, then filters client-side by prefix. This is required because NATS ObjectStore doesn't support server-side prefix filtering (as of current version).

Performance implications:

  • O(n) where n = total objects in bucket
  • May be slow with thousands of objects
  • Consider pagination or external indexing for large datasets

Alternative considered: Maintain separate index in KV store Rejected because: Adds complexity, consistency challenges, NATS may add native filtering in future.

Request/Response API Port:

The Component exposes a Request/Response API via NATS:

  • Client sends Request JSON to "api" subject
  • Component processes and replies with Response JSON
  • Enables remote storage access without direct Go API

This enables web clients, other services, or non-Go systems to use storage without linking against the Go library.

Usage Examples

Direct Store Usage:

// Create store with configuration
config := objectstore.DefaultConfig()
config.BucketName = "video-storage"
config.EnableCache = true
config.CacheStrategy = "hybrid"
config.CacheTTL = 5 * time.Minute

store, err := objectstore.NewStoreWithConfig(ctx, natsClient, config)
if err != nil {
    log.Fatal(err)
}
defer store.Close()

// Store video file
videoData, _ := os.ReadFile("sensor-123.mp4")
key := "video/sensor-123/2024-10-08T14:30:00Z"
err = store.Put(ctx, key, videoData)

// Retrieve with caching
data, err := store.Get(ctx, key) // First call: fetches from NATS
data, err = store.Get(ctx, key) // Second call: served from cache

// List all videos for sensor
keys, err := store.List(ctx, "video/sensor-123/")

Component Usage (NATS Integration):

// Configure component
config := objectstore.Config{
    InstanceName: "video-storage",
    Enabled:      true,
    BucketName:   "video-storage",
    EnableCache:  true,
}

// Create component with dependencies
deps := component.Dependencies{
    NATSClient:      natsClient,
    MetricsRegistry: metricsRegistry,
}

comp, err := objectstore.NewComponent(config, deps)
if err != nil {
    log.Fatal(err)
}

// Start component
err = comp.Start(ctx)
defer comp.Stop(5 * time.Second)

// Clients can now use NATS Request/Response:
// nats.Request("storage.api", `{"action":"get","key":"video/..."}`)

Custom Key Generation:

// Implement storage.KeyGenerator
type EntityKeyGenerator struct {
    prefix string
}

func (g *EntityKeyGenerator) GenerateKey(msg any) string {
    entity, ok := msg.(Identifiable)
    if !ok {
        return g.prefix + "/" + uuid.New().String()
    }
    timestamp := time.Now().Format(time.RFC3339)
    return fmt.Sprintf("%s/%s/%s", g.prefix, entity.GetID(), timestamp)
}

// Use with Component
store.SetKeyGenerator(&EntityKeyGenerator{prefix: "entities"})

Performance Characteristics

Put Operation:

  • Latency: ~10-50ms (depends on NATS RTT)
  • Throughput: Limited by NATS ObjectStore write rate
  • Memory: O(message_size) during write

Get Operation:

  • Cache hit: ~100μs (in-memory cache lookup)
  • Cache miss: ~10-50ms (NATS ObjectStore read)
  • Memory: O(message_size) + cache overhead

List Operation:

  • Latency: O(n) where n = total objects in bucket
  • Network: Fetches metadata for all objects
  • Memory: O(num_objects) for key list
  • Performance degrades with >1000 objects - consider pagination

Caching Impact:

  • Read latency: 100x faster (50ms → 100μs)
  • Memory: O(cache_size * avg_message_size)
  • Write latency: Unaffected (write-through caching)

Caching Strategy Guidelines

Choose cache strategy based on workload:

LRU (Least Recently Used):

  • Best for: Immutable data, fixed working set
  • Example: Video archive, historical events
  • Config: CacheStrategy="lru", MaxCacheSize=1000

TTL (Time-To-Live):

  • Best for: Fresh data requirements, cache invalidation
  • Example: Sensor readings, real-time events
  • Config: CacheStrategy="ttl", CacheTTL=5*time.Minute

Hybrid (LRU + TTL):

  • Best for: Read-heavy with freshness requirements
  • Example: Entity states, recent events
  • Config: CacheStrategy="hybrid", MaxCacheSize=1000, CacheTTL=5*time.Minute

No Cache:

  • Best for: Write-heavy, large messages, unique reads
  • Example: Video ingestion, one-time analytics
  • Config: EnableCache=false

Memory usage formula:

  • LRU: memory = MaxCacheSize * avg_message_size
  • TTL: memory = write_rate * CacheTTL * avg_message_size
  • Hybrid: memory = min(LRU_calc, TTL_calc)

NATS ObjectStore Semantics

The NATS JetStream ObjectStore provides:

Immutability:

  • Each Put creates a new version
  • Previous versions preserved (configurable retention)
  • Get retrieves latest version by default

Versioning:

  • Automatic version tracking
  • Can retrieve specific versions (not currently exposed)
  • Supports rollback and audit trails

Metadata:

  • HTTP-style headers (map[string][]string)
  • Automatically tracked: size, chunks, digest
  • Custom metadata via MetadataExtractor

Storage Model:

  • Large objects chunked automatically
  • Efficient for video, images, binary data
  • Not optimized for tiny messages (<1KB)

Component Port Configuration

The Component exposes three NATS ports:

API Port (Request/Response):

  • Subject: "{namespace}.{instance}.api" (default: "storage.{instance}.api")
  • Pattern: Request/Response
  • Payload: JSON Request → JSON Response
  • Operations: Get, Put, List
  • Timeout: 2 seconds

Write Port (Fire-and-Forget):

  • Subject: "{namespace}.{instance}.write" (default: "storage.{instance}.write")
  • Pattern: Fire-and-forget
  • Payload: Raw binary data
  • Key: Generated via KeyGenerator
  • No response (async)

Events Port (Publish-Only):

  • Subject: "{namespace}.{instance}.events" (default: "storage.{instance}.events")
  • Pattern: Publish
  • Payload: JSON Event (action, key, success, error)
  • Published after: Put, Get, Delete operations

Configuration Options

See Config struct for all options:

type Config struct {
    InstanceName  string // Component instance name
    Enabled       bool   // Enable/disable component
    BucketName    string // NATS ObjectStore bucket
    EnableCache   bool   // Enable caching
    CacheStrategy string // "lru", "ttl", "hybrid"
    MaxCacheSize  int    // LRU/Hybrid: max entries
    CacheTTL      time.Duration // TTL/Hybrid: expiration
}

Thread Safety

All operations are safe for concurrent use:

  • Store methods: Thread-safe via NATS ObjectStore and cache concurrency
  • Component handlers: Each NATS message processed in separate goroutine
  • Metrics: Atomic counters (atomic.AddUint64)
  • Cache: Thread-safe by cache implementation contract

No explicit locks required in application code.

Error Handling

Store operations return errors for:

  • Network failures: NATS connection lost, timeouts
  • Not found: Key doesn't exist (Get, Delete)
  • Invalid input: Empty keys, nil data
  • Resource limits: Bucket quota exceeded

Component operations:

  • Invalid requests: Malformed JSON, unknown actions
  • Store errors: Wrapped with operation context
  • Timeout errors: Operation exceeded deadline

Testing

Test with real NATS using testcontainers:

func TestStore_Integration(t *testing.T) {
    natsClient := getSharedNATSClient(t)
    store, err := objectstore.NewStore(ctx, natsClient, "test-bucket")
    require.NoError(t, err)
    defer store.Close()

    // Test with real NATS ObjectStore
    data := []byte("test data")
    err = store.Put(ctx, "test-key", data)
    require.NoError(t, err)

    retrieved, err := store.Get(ctx, "test-key")
    require.NoError(t, err)
    assert.Equal(t, data, retrieved)
}

Run with race detector:

go test -race ./storage/objectstore

Known Limitations

  1. List() performance: O(n) with all objects - may be slow with >1000 objects
  2. No batch operations: Must Put/Get one at a time
  3. No version retrieval: Can only access latest version
  4. No server-side filtering: List() filters client-side
  5. Cache invalidation: No automatic invalidation on external updates

See Also

  • storage: Core storage interfaces
  • cache: Caching implementations
  • component: Component interface and lifecycle
  • natsclient: NATS client wrapper

Package objectstore provides simple ObjectStore wrapper for immutable message storage. It uses NATS ObjectStore (NOT KeyValue) to store parsed messages as JSON blobs with time-bucketed slash-based keys for efficient organization and retrieval.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new ObjectStore component factory

func Register

func Register(registry *component.Registry) error

Register registers the ObjectStore storage component with the given registry.

The ObjectStore component provides:

  • NATS JetStream ObjectStore-based immutable storage
  • Time-bucketed key organization
  • LRU/TTL/Hybrid caching support
  • Pluggable key generation (composition-friendly)
  • Pluggable metadata extraction (composition-friendly)
  • Request/Response API for synchronous operations
  • Fire-and-forget write operations
  • Storage event publishing

Composition-Friendly Design: The ObjectStore is designed to be easily extended by SemStreams or other semantic layers. The key generation and metadata extraction are pluggable, allowing semantic behavior to be layered on top of the core infrastructure.

Types

type BinaryRef

type BinaryRef struct {
	// ContentType is the MIME type of the binary content.
	// Examples: "image/jpeg", "video/mp4", "application/pdf"
	ContentType string `json:"content_type"`

	// Size is the size of the binary content in bytes.
	Size int64 `json:"size"`

	// Key is the ObjectStore key where the binary data is stored.
	Key string `json:"key"`
}

BinaryRef is a reference to binary content stored separately in ObjectStore. Binary data is stored as raw bytes (no base64) under its own key, and the JSON metadata envelope references it via this struct.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component wraps ObjectStore as a component with NATS ports

Composition-Friendly Design:

  • Generic NATS port handling (no semantic requirements)
  • Publishes simple storage events (not semantic messages)
  • Allows SemStreams to wrap/extend for semantic behavior

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema for this component References the package-level objectstoreSchema variable for efficient retrieval

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize sets up the component (no I/O operations)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns the input ports for this component

func (*Component) IsStarted

func (c *Component) IsStarted() bool

IsStarted returns whether the component is running

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns the output ports for this component

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start initializes the ObjectStore and sets up NATS handlers

func (*Component) Stop

func (c *Component) Stop(_ time.Duration) error

Stop cleanly shuts down the component

type Config

type Config struct {
	// Ports defines input/output port configuration
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration for inputs and outputs,category:basic"`

	// BucketName is the NATS JetStream ObjectStore bucket name
	BucketName string `json:"bucket_name" schema:"type:string,description:NATS ObjectStore bucket name,default:MESSAGES,category:basic"`

	// DataCache configures the in-memory cache for retrieved objects
	DataCache cache.Config `json:"data_cache" schema:"type:object,description:Cache configuration for stored objects,category:performance"`

	// KeyGenerator optionally provides custom key generation strategy.
	// If nil, the default time-based key generator is used.
	// This allows SemStreams to provide entity-based keys while keeping
	// StreamKit generic.
	KeyGenerator storage.KeyGenerator `json:"-" schema:"-"`

	// MetadataExtractor optionally provides custom metadata extraction.
	// If nil, no metadata is stored with objects.
	// This allows SemStreams to add semantic metadata (entity IDs, triples)
	// while keeping StreamKit generic.
	MetadataExtractor storage.MetadataExtractor `json:"-" schema:"-"`
}

Config holds configuration for ObjectStore storage component.

Design Philosophy: Composition-Friendly - No hardcoded interface requirements (SemStreams can layer semantic interfaces) - Pluggable key generation (via storage.KeyGenerator interface) - Pluggable metadata extraction (via storage.MetadataExtractor interface) - Flexible port configuration

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration for ObjectStore. Creates a simple key-value store with:

  • Generic input/output ports (no interface requirements)
  • Time-based key generation
  • No metadata extraction
  • Default caching settings

func (Config) Validate

func (c Config) Validate() error

Validate checks if the configuration is valid.

type DefaultKeyGenerator

type DefaultKeyGenerator struct{}

DefaultKeyGenerator provides time-based key generation. Keys are formatted as: type/year/month/day/hour/identifier_timestamp

The generator uses behavioral interfaces for optional enhancement:

  • message.Typeable: Provides type information for key prefix
  • message.Identifiable: Provides identifier for key uniqueness

If interfaces aren't implemented, sensible defaults are used.

func (*DefaultKeyGenerator) GenerateKey

func (g *DefaultKeyGenerator) GenerateKey(msg any) string

GenerateKey creates a time-bucketed slash-based key. Format: type/year/month/day/hour/identifier_timestamp Example: sensor.temperature.v1/2024/01/19/14/device-123_1705677443

type Event

type Event struct {
	Type      string         `json:"type"` // "stored", "retrieved"
	Key       string         `json:"key"`
	Timestamp time.Time      `json:"timestamp"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

Event represents a simple storage event published by ObjectStore core design: Just indicates what happened, no semantic payload

type Request

type Request struct {
	Action string          `json:"action"` // "get", "store", "list"
	Key    string          `json:"key,omitempty"`
	Data   json.RawMessage `json:"data,omitempty"`
	Prefix string          `json:"prefix,omitempty"` // For list operation
}

Request represents a request to the ObjectStore API

type Response

type Response struct {
	Success bool            `json:"success"`
	Key     string          `json:"key,omitempty"`
	Data    json.RawMessage `json:"data,omitempty"`
	Keys    []string        `json:"keys,omitempty"` // For list operation
	Error   string          `json:"error,omitempty"`
}

Response represents a response from the ObjectStore API

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store provides simple ObjectStore wrapper for message storage. Messages are stored as immutable JSON blobs with time-bucketed slash-based keys.

Composition-Friendly Design:

  • Pluggable KeyGenerator (inject custom key generation)
  • Pluggable MetadataExtractor (inject custom metadata extraction)
  • No hardcoded semantic requirements

func NewStoreWithConfig

func NewStoreWithConfig(ctx context.Context, client *natsclient.Client, cfg Config) (*Store, error)

NewStoreWithConfig creates a new ObjectStore with cache configuration. Uses NATS ObjectStore (NOT KeyValue) for immutable message storage.

func NewStoreWithConfigAndMetrics

func NewStoreWithConfigAndMetrics(
	ctx context.Context,
	client *natsclient.Client,
	cfg Config,
	metricsRegistry *metric.MetricsRegistry,
) (*Store, error)

NewStoreWithConfigAndMetrics creates a new ObjectStore with cache configuration and optional metrics. Uses NATS ObjectStore (NOT KeyValue) for immutable message storage.

func (*Store) Close

func (s *Store) Close() error

Close properly shuts down the ObjectStore and releases resources. This includes closing the cache if it's enabled.

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, key string) error

Delete removes a message (optional, messages are typically immutable). Also removes the message from cache if cached.

func (*Store) FetchBinary

func (s *Store) FetchBinary(ctx context.Context, ref BinaryRef) ([]byte, error)

FetchBinary retrieves binary content by its reference. Returns the raw binary data (no JSON, no base64 - direct bytes).

func (*Store) FetchContent

func (s *Store) FetchContent(ctx context.Context, ref *message.StorageReference) (*StoredContent, error)

FetchContent retrieves stored content by StorageReference. Returns the StoredContent with fields and content mapping.

func (*Store) Get

func (s *Store) Get(ctx context.Context, key string) ([]byte, error)

Get retrieves a message by key. Checks cache first, then NATS ObjectStore if cache miss.

func (*Store) GetMetadata

func (s *Store) GetMetadata(ctx context.Context, key string) (map[string][]string, error)

GetMetadata returns the metadata associated with a stored object. Returns empty map if the object has no metadata.

func (*Store) List

func (s *Store) List(ctx context.Context, prefix string) ([]string, error)

List returns keys matching a prefix (for debugging/inspection).

func (*Store) Put

func (s *Store) Put(ctx context.Context, key string, data []byte) error

Put stores raw binary data at the specified key (implements storage.Store interface). This is the low-level storage operation - callers provide the key and raw bytes. For message-specific storage with automatic key generation, use Store() instead.

func (*Store) Store

func (s *Store) Store(ctx context.Context, msg any) (string, error)

Store saves a message and returns the generated key. Messages are stored as JSON bytes with keys generated by the configured KeyGenerator. If a MetadataExtractor is configured, metadata is extracted and stored with the object.

Special handling for raw byte inputs:

  • []byte: stored directly without re-marshaling (avoids base64 encoding)
  • json.RawMessage: stored directly as raw JSON bytes
  • other types: marshaled to JSON via json.Marshal

func (*Store) StoreContent

StoreContent stores content from a ContentStorable and returns a StorageReference. This is the high-level method for storing document content separately from triples.

Flow:

  1. Extract RawContent() and ContentFields() from ContentStorable
  2. Create StoredContent envelope with timestamp
  3. Store as JSON in ObjectStore
  4. Return StorageReference for embedding in messages

type StoredContent

type StoredContent struct {
	// EntityID is the federated entity ID this content belongs to
	EntityID string `json:"entity_id"`

	// Fields contains the raw text content by field name.
	// Field names here match the values in ContentFields.
	// Example: {"title": "Safety Manual", "body": "Full text...", "description": "Brief summary"}
	Fields map[string]string `json:"fields,omitempty"`

	// BinaryRefs contains references to binary content stored separately.
	// The actual binary data is stored under BinaryRef.Key.
	// Example: {"video": {ContentType: "video/mp4", Size: 5242880, Key: "binary/..."}}
	BinaryRefs map[string]BinaryRef `json:"binary_refs,omitempty"`

	// ContentFields maps semantic roles to field names in Fields or BinaryRefs.
	// This enables consumers to find content without hardcoding field names.
	// Standard roles: "body", "abstract", "title", "media", "thumbnail"
	// Example: {"body": "body", "abstract": "description", "title": "title", "media": "video"}
	ContentFields map[string]string `json:"content_fields"`

	// StoredAt is when the content was stored
	StoredAt time.Time `json:"stored_at"`
}

StoredContent is the envelope stored in ObjectStore for ContentStorable payloads. It contains the raw content fields along with the semantic mapping that describes how to interpret the content structure.

This type is stored in ObjectStore and retrieved by consumers (like embedding workers) who need access to the raw content. The ContentFields map provides a semantic role mapping that tells consumers how to find specific content types (body, abstract, title) without hardcoding field names.

For BinaryStorable payloads, binary content is stored separately (as raw bytes) and referenced via BinaryRefs. This avoids base64 bloat in the JSON envelope.

Storage flow:

  1. Processor receives raw document
  2. Processor creates ContentStorable with RawContent() and ContentFields()
  3. ObjectStore.StoreContent() serializes this StoredContent envelope
  4. Consumers fetch via StorageRef and use ContentFields to find text

Example stored JSON (with binary refs):

{
    "entity_id": "acme.logistics.content.video.training.vid-001",
    "fields": {
        "title": "Safety Training Video",
        "description": "Comprehensive safety training"
    },
    "binary_refs": {
        "video": {
            "content_type": "video/mp4",
            "size": 5242880,
            "key": "binary/2025/01/15/vid-001_video_1234567890"
        }
    },
    "content_fields": {
        "title": "title",
        "abstract": "description",
        "media": "video"
    },
    "stored_at": "2025-01-15T10:30:00Z"
}

func NewStoredContent

func NewStoredContent(entityID string, fields, contentFields map[string]string) *StoredContent

NewStoredContent creates a StoredContent envelope from entity ID, raw content, and field mapping.

func NewStoredContentWithBinary

func NewStoredContentWithBinary(entityID string, fields, contentFields map[string]string, binaryRefs map[string]BinaryRef) *StoredContent

NewStoredContentWithBinary creates a StoredContent envelope with both text and binary content.

func (*StoredContent) GetBinaryRefByRole

func (s *StoredContent) GetBinaryRefByRole(role string) *BinaryRef

GetBinaryRefByRole returns the binary reference for a semantic role. Returns nil if the role is not mapped to a binary field.

func (*StoredContent) GetFieldByRole

func (s *StoredContent) GetFieldByRole(role string) string

GetFieldByRole returns the content for a semantic role (e.g., "body", "abstract"). Returns empty string if the role is not mapped or the field is empty.

func (*StoredContent) HasBinaryContent

func (s *StoredContent) HasBinaryContent() bool

HasBinaryContent returns true if this StoredContent has any binary references.

func (*StoredContent) HasRole

func (s *StoredContent) HasRole(role string) bool

HasRole returns true if the semantic role is mapped to a non-empty field or binary ref.

func (*StoredContent) MarshalJSON

func (s *StoredContent) MarshalJSON() ([]byte, error)

MarshalJSON provides JSON serialization for StoredContent.

func (*StoredContent) UnmarshalJSON

func (s *StoredContent) UnmarshalJSON(data []byte) error

UnmarshalJSON provides JSON deserialization for StoredContent.

type StoredMessage

type StoredMessage struct {
	// contains filtered or unexported fields
}

StoredMessage implements message.Storable interface for messages that have been persisted to ObjectStore. It combines the semantic data (Graphable) with a storage reference pointing to the full message.

This enables the "store once, reference everywhere" pattern where:

  • ObjectStore stores the full raw message
  • Graph processor receives semantic data plus storage reference
  • Components can retrieve full message via ObjectStore API if needed

func NewStoredMessage

func NewStoredMessage(graphable graph.Graphable, storageRef *message.StorageReference, messageType string) *StoredMessage

NewStoredMessage creates a StoredMessage from a Graphable and storage metadata

func (*StoredMessage) EntityID

func (s *StoredMessage) EntityID() string

EntityID implements graph.Graphable interface

func (*StoredMessage) MarshalJSON

func (s *StoredMessage) MarshalJSON() ([]byte, error)

MarshalJSON provides JSON serialization for StoredMessage

func (*StoredMessage) MessageType

func (s *StoredMessage) MessageType() string

MessageType returns the original message type

func (*StoredMessage) Schema

func (s *StoredMessage) Schema() message.Type

Schema implements message.Payload interface

func (*StoredMessage) StorageRef

func (s *StoredMessage) StorageRef() *message.StorageReference

StorageRef implements message.Storable interface

func (*StoredMessage) StoredAt

func (s *StoredMessage) StoredAt() time.Time

StoredAt returns when the message was stored

func (*StoredMessage) Triples

func (s *StoredMessage) Triples() []message.Triple

Triples implements graph.Graphable interface

func (*StoredMessage) UnmarshalJSON

func (s *StoredMessage) UnmarshalJSON(data []byte) error

UnmarshalJSON provides JSON deserialization for StoredMessage

func (*StoredMessage) Validate

func (s *StoredMessage) Validate() error

Validate implements message.Payload interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL