Documentation
¶
Overview ¶
Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.
Package objectstore provides a NATS ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.
Package objectstore provides a NATS JetStream ObjectStore-based storage component for immutable message storage with time-bucketed keys and caching.
Overview ¶
The objectstore package implements the storage.Store interface using NATS JetStream's ObjectStore feature. It provides:
- Immutable storage with automatic versioning
- Time-bucketed key generation for efficient organization
- Optional caching (LRU, TTL, or Hybrid)
- Component wrapper for NATS-based integration
- Configurable metadata extraction
Core Components ¶
Store:
The Store type implements storage.Store using NATS ObjectStore:
- Put: Stores data as immutable objects (versioned)
- Get: Retrieves latest version with optional caching
- List: Returns all keys matching prefix (client-side filter)
- Delete: Marks latest version as deleted (preserves history)
Component:
The Component wrapper exposes Store via NATS ports:
- "api" port: Request/Response for Get/Put/List operations
- "write" port: Fire-and-forget async writes
- "events" port: Publishes storage events (stored, retrieved, deleted)
Architecture Decisions ¶
Immutable Storage with Versioning:
NATS ObjectStore is immutable - each Put creates a new version rather than overwriting. This design:
- Preserves complete history for audit/replay
- Enables time-travel queries
- Prevents accidental data loss
- Supports append-only semantics naturally
Trade-off: Cannot modify existing data in-place. Use Delete + Put to "update".
Time-Bucketed Keys:
The DefaultKeyGenerator creates hierarchical keys by timestamp:
- Format: "{prefix}/year/month/day/hour/{unique-id}"
- Example: "events/2024/10/08/14/abc123-def456"
This bucketing:
- Organizes data chronologically for range queries
- Distributes objects across hierarchy to avoid hot-spots
- Enables efficient cleanup by time bucket
- Mirrors common time-series access patterns
Alternative considered: Flat keys with embedded timestamps Rejected because: Harder to navigate, less efficient for time-range queries, doesn't leverage hierarchical storage systems.
Optional Caching Layer:
The Store can optionally use cache.Cache for retrieved objects:
- LRU: Fixed-size cache, evicts least recently used
- TTL: Time-based expiration for fresh data
- Hybrid: Combines LRU size limits with TTL expiration
Caching decisions:
- Read-heavy workloads: Use Hybrid cache (size + TTL)
- Write-heavy workloads: Disable cache or use small TTL
- Immutable data: Use LRU (data never changes)
Client-Side List Filtering:
The List() operation fetches all objects from NATS ObjectStore, then filters client-side by prefix. This is required because NATS ObjectStore doesn't support server-side prefix filtering (as of current version).
Performance implications:
- O(n) where n = total objects in bucket
- May be slow with thousands of objects
- Consider pagination or external indexing for large datasets
Alternative considered: Maintain separate index in KV store Rejected because: Adds complexity, consistency challenges, NATS may add native filtering in future.
Request/Response API Port:
The Component exposes a Request/Response API via NATS:
- Client sends Request JSON to "api" subject
- Component processes and replies with Response JSON
- Enables remote storage access without direct Go API
This enables web clients, other services, or non-Go systems to use storage without linking against the Go library.
Usage Examples ¶
Direct Store Usage:
// Create store with configuration
config := objectstore.DefaultConfig()
config.BucketName = "video-storage"
config.EnableCache = true
config.CacheStrategy = "hybrid"
config.CacheTTL = 5 * time.Minute
store, err := objectstore.NewStoreWithConfig(ctx, natsClient, config)
if err != nil {
log.Fatal(err)
}
defer store.Close()
// Store video file
videoData, _ := os.ReadFile("sensor-123.mp4")
key := "video/sensor-123/2024-10-08T14:30:00Z"
err = store.Put(ctx, key, videoData)
// Retrieve with caching
data, err := store.Get(ctx, key) // First call: fetches from NATS
data, err = store.Get(ctx, key) // Second call: served from cache
// List all videos for sensor
keys, err := store.List(ctx, "video/sensor-123/")
Component Usage (NATS Integration):
// Configure component
config := objectstore.Config{
InstanceName: "video-storage",
Enabled: true,
BucketName: "video-storage",
EnableCache: true,
}
// Create component with dependencies
deps := component.Dependencies{
NATSClient: natsClient,
MetricsRegistry: metricsRegistry,
}
comp, err := objectstore.NewComponent(config, deps)
if err != nil {
log.Fatal(err)
}
// Start component
err = comp.Start(ctx)
defer comp.Stop(5 * time.Second)
// Clients can now use NATS Request/Response:
// nats.Request("storage.api", `{"action":"get","key":"video/..."}`)
Custom Key Generation:
// Implement storage.KeyGenerator
type EntityKeyGenerator struct {
prefix string
}
func (g *EntityKeyGenerator) GenerateKey(msg any) string {
entity, ok := msg.(Identifiable)
if !ok {
return g.prefix + "/" + uuid.New().String()
}
timestamp := time.Now().Format(time.RFC3339)
return fmt.Sprintf("%s/%s/%s", g.prefix, entity.GetID(), timestamp)
}
// Use with Component
store.SetKeyGenerator(&EntityKeyGenerator{prefix: "entities"})
Performance Characteristics ¶
Put Operation:
- Latency: ~10-50ms (depends on NATS RTT)
- Throughput: Limited by NATS ObjectStore write rate
- Memory: O(message_size) during write
Get Operation:
- Cache hit: ~100μs (in-memory cache lookup)
- Cache miss: ~10-50ms (NATS ObjectStore read)
- Memory: O(message_size) + cache overhead
List Operation:
- Latency: O(n) where n = total objects in bucket
- Network: Fetches metadata for all objects
- Memory: O(num_objects) for key list
- Performance degrades with >1000 objects - consider pagination
Caching Impact:
- Read latency: 100x faster (50ms → 100μs)
- Memory: O(cache_size * avg_message_size)
- Write latency: Unaffected (write-through caching)
Caching Strategy Guidelines ¶
Choose cache strategy based on workload:
LRU (Least Recently Used):
- Best for: Immutable data, fixed working set
- Example: Video archive, historical events
- Config: CacheStrategy="lru", MaxCacheSize=1000
TTL (Time-To-Live):
- Best for: Fresh data requirements, cache invalidation
- Example: Sensor readings, real-time events
- Config: CacheStrategy="ttl", CacheTTL=5*time.Minute
Hybrid (LRU + TTL):
- Best for: Read-heavy with freshness requirements
- Example: Entity states, recent events
- Config: CacheStrategy="hybrid", MaxCacheSize=1000, CacheTTL=5*time.Minute
No Cache:
- Best for: Write-heavy, large messages, unique reads
- Example: Video ingestion, one-time analytics
- Config: EnableCache=false
Memory usage formula:
- LRU: memory = MaxCacheSize * avg_message_size
- TTL: memory = write_rate * CacheTTL * avg_message_size
- Hybrid: memory = min(LRU_calc, TTL_calc)
NATS ObjectStore Semantics ¶
The NATS JetStream ObjectStore provides:
Immutability:
- Each Put creates a new version
- Previous versions preserved (configurable retention)
- Get retrieves latest version by default
Versioning:
- Automatic version tracking
- Can retrieve specific versions (not currently exposed)
- Supports rollback and audit trails
Metadata:
- HTTP-style headers (map[string][]string)
- Automatically tracked: size, chunks, digest
- Custom metadata via MetadataExtractor
Storage Model:
- Large objects chunked automatically
- Efficient for video, images, binary data
- Not optimized for tiny messages (<1KB)
Component Port Configuration ¶
The Component exposes three NATS ports:
API Port (Request/Response):
- Subject: "{namespace}.{instance}.api" (default: "storage.{instance}.api")
- Pattern: Request/Response
- Payload: JSON Request → JSON Response
- Operations: Get, Put, List
- Timeout: 2 seconds
Write Port (Fire-and-Forget):
- Subject: "{namespace}.{instance}.write" (default: "storage.{instance}.write")
- Pattern: Fire-and-forget
- Payload: Raw binary data
- Key: Generated via KeyGenerator
- No response (async)
Events Port (Publish-Only):
- Subject: "{namespace}.{instance}.events" (default: "storage.{instance}.events")
- Pattern: Publish
- Payload: JSON Event (action, key, success, error)
- Published after: Put, Get, Delete operations
Configuration Options ¶
See Config struct for all options:
type Config struct {
InstanceName string // Component instance name
Enabled bool // Enable/disable component
BucketName string // NATS ObjectStore bucket
EnableCache bool // Enable caching
CacheStrategy string // "lru", "ttl", "hybrid"
MaxCacheSize int // LRU/Hybrid: max entries
CacheTTL time.Duration // TTL/Hybrid: expiration
}
Thread Safety ¶
All operations are safe for concurrent use:
- Store methods: Thread-safe via NATS ObjectStore and cache concurrency
- Component handlers: Each NATS message processed in separate goroutine
- Metrics: Atomic counters (atomic.AddUint64)
- Cache: Thread-safe by cache implementation contract
No explicit locks required in application code.
Error Handling ¶
Store operations return errors for:
- Network failures: NATS connection lost, timeouts
- Not found: Key doesn't exist (Get, Delete)
- Invalid input: Empty keys, nil data
- Resource limits: Bucket quota exceeded
Component operations:
- Invalid requests: Malformed JSON, unknown actions
- Store errors: Wrapped with operation context
- Timeout errors: Operation exceeded deadline
Testing ¶
Test with real NATS using testcontainers:
func TestStore_Integration(t *testing.T) {
natsClient := getSharedNATSClient(t)
store, err := objectstore.NewStore(ctx, natsClient, "test-bucket")
require.NoError(t, err)
defer store.Close()
// Test with real NATS ObjectStore
data := []byte("test data")
err = store.Put(ctx, "test-key", data)
require.NoError(t, err)
retrieved, err := store.Get(ctx, "test-key")
require.NoError(t, err)
assert.Equal(t, data, retrieved)
}
Run with race detector:
go test -race ./storage/objectstore
Known Limitations ¶
- List() performance: O(n) with all objects - may be slow with >1000 objects
- No batch operations: Must Put/Get one at a time
- No version retrieval: Can only access latest version
- No server-side filtering: List() filters client-side
- Cache invalidation: No automatic invalidation on external updates
See Also ¶
- storage: Core storage interfaces
- cache: Caching implementations
- component: Component interface and lifecycle
- natsclient: NATS client wrapper
Package objectstore provides simple ObjectStore wrapper for immutable message storage. It uses NATS ObjectStore (NOT KeyValue) to store parsed messages as JSON blobs with time-bucketed slash-based keys for efficient organization and retrieval.
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type BinaryRef
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) IsStarted() bool
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(_ time.Duration) error
- type Config
- type DefaultKeyGenerator
- type Event
- type Request
- type Response
- type Store
- func (s *Store) Close() error
- func (s *Store) Delete(ctx context.Context, key string) error
- func (s *Store) FetchBinary(ctx context.Context, ref BinaryRef) ([]byte, error)
- func (s *Store) FetchContent(ctx context.Context, ref *message.StorageReference) (*StoredContent, error)
- func (s *Store) Get(ctx context.Context, key string) ([]byte, error)
- func (s *Store) GetMetadata(ctx context.Context, key string) (map[string][]string, error)
- func (s *Store) List(ctx context.Context, prefix string) ([]string, error)
- func (s *Store) Put(ctx context.Context, key string, data []byte) error
- func (s *Store) Store(ctx context.Context, msg any) (string, error)
- func (s *Store) StoreContent(ctx context.Context, cs message.ContentStorable) (*message.StorageReference, error)
- type StoredContent
- func (s *StoredContent) GetBinaryRefByRole(role string) *BinaryRef
- func (s *StoredContent) GetFieldByRole(role string) string
- func (s *StoredContent) HasBinaryContent() bool
- func (s *StoredContent) HasRole(role string) bool
- func (s *StoredContent) MarshalJSON() ([]byte, error)
- func (s *StoredContent) UnmarshalJSON(data []byte) error
- type StoredMessage
- func (s *StoredMessage) EntityID() string
- func (s *StoredMessage) MarshalJSON() ([]byte, error)
- func (s *StoredMessage) MessageType() string
- func (s *StoredMessage) Schema() message.Type
- func (s *StoredMessage) StorageRef() *message.StorageReference
- func (s *StoredMessage) StoredAt() time.Time
- func (s *StoredMessage) Triples() []message.Triple
- func (s *StoredMessage) UnmarshalJSON(data []byte) error
- func (s *StoredMessage) Validate() error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new ObjectStore component factory
func Register ¶
Register registers the ObjectStore storage component with the given registry.
The ObjectStore component provides:
- NATS JetStream ObjectStore-based immutable storage
- Time-bucketed key organization
- LRU/TTL/Hybrid caching support
- Pluggable key generation (composition-friendly)
- Pluggable metadata extraction (composition-friendly)
- Request/Response API for synchronous operations
- Fire-and-forget write operations
- Storage event publishing
Composition-Friendly Design: The ObjectStore is designed to be easily extended by SemStreams or other semantic layers. The key generation and metadata extraction are pluggable, allowing semantic behavior to be layered on top of the core infrastructure.
Types ¶
type BinaryRef ¶
type BinaryRef struct {
// ContentType is the MIME type of the binary content.
// Examples: "image/jpeg", "video/mp4", "application/pdf"
ContentType string `json:"content_type"`
// Size is the size of the binary content in bytes.
Size int64 `json:"size"`
// Key is the ObjectStore key where the binary data is stored.
Key string `json:"key"`
}
BinaryRef is a reference to binary content stored separately in ObjectStore. Binary data is stored as raw bytes (no base64) under its own key, and the JSON metadata envelope references it via this struct.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component wraps ObjectStore as a component with NATS ports
Composition-Friendly Design:
- Generic NATS port handling (no semantic requirements)
- Publishes simple storage events (not semantic messages)
- Allows SemStreams to wrap/extend for semantic behavior
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema for this component References the package-level objectstoreSchema variable for efficient retrieval
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns current health status
func (*Component) Initialize ¶
Initialize sets up the component (no I/O operations)
func (*Component) InputPorts ¶
InputPorts returns the input ports for this component
func (*Component) OutputPorts ¶
OutputPorts returns the output ports for this component
type Config ¶
type Config struct {
// Ports defines input/output port configuration
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration for inputs and outputs,category:basic"`
// BucketName is the NATS JetStream ObjectStore bucket name
BucketName string `json:"bucket_name" schema:"type:string,description:NATS ObjectStore bucket name,default:MESSAGES,category:basic"`
// DataCache configures the in-memory cache for retrieved objects
DataCache cache.Config `json:"data_cache" schema:"type:object,description:Cache configuration for stored objects,category:performance"`
// KeyGenerator optionally provides custom key generation strategy.
// If nil, the default time-based key generator is used.
// This allows SemStreams to provide entity-based keys while keeping
// StreamKit generic.
KeyGenerator storage.KeyGenerator `json:"-" schema:"-"`
// MetadataExtractor optionally provides custom metadata extraction.
// If nil, no metadata is stored with objects.
// This allows SemStreams to add semantic metadata (entity IDs, triples)
// while keeping StreamKit generic.
MetadataExtractor storage.MetadataExtractor `json:"-" schema:"-"`
}
Config holds configuration for ObjectStore storage component.
Design Philosophy: Composition-Friendly - No hardcoded interface requirements (SemStreams can layer semantic interfaces) - Pluggable key generation (via storage.KeyGenerator interface) - Pluggable metadata extraction (via storage.MetadataExtractor interface) - Flexible port configuration
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns the default configuration for ObjectStore. Creates a simple key-value store with:
- Generic input/output ports (no interface requirements)
- Time-based key generation
- No metadata extraction
- Default caching settings
type DefaultKeyGenerator ¶
type DefaultKeyGenerator struct{}
DefaultKeyGenerator provides time-based key generation. Keys are formatted as: type/year/month/day/hour/identifier_timestamp
The generator uses behavioral interfaces for optional enhancement:
- message.Typeable: Provides type information for key prefix
- message.Identifiable: Provides identifier for key uniqueness
If interfaces aren't implemented, sensible defaults are used.
func (*DefaultKeyGenerator) GenerateKey ¶
func (g *DefaultKeyGenerator) GenerateKey(msg any) string
GenerateKey creates a time-bucketed slash-based key. Format: type/year/month/day/hour/identifier_timestamp Example: sensor.temperature.v1/2024/01/19/14/device-123_1705677443
type Event ¶
type Event struct {
Type string `json:"type"` // "stored", "retrieved"
Key string `json:"key"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]any `json:"metadata,omitempty"`
}
Event represents a simple storage event published by ObjectStore core design: Just indicates what happened, no semantic payload
type Request ¶
type Request struct {
Action string `json:"action"` // "get", "store", "list"
Key string `json:"key,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Prefix string `json:"prefix,omitempty"` // For list operation
}
Request represents a request to the ObjectStore API
type Response ¶
type Response struct {
Success bool `json:"success"`
Key string `json:"key,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Keys []string `json:"keys,omitempty"` // For list operation
Error string `json:"error,omitempty"`
}
Response represents a response from the ObjectStore API
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store provides simple ObjectStore wrapper for message storage. Messages are stored as immutable JSON blobs with time-bucketed slash-based keys.
Composition-Friendly Design:
- Pluggable KeyGenerator (inject custom key generation)
- Pluggable MetadataExtractor (inject custom metadata extraction)
- No hardcoded semantic requirements
func NewStoreWithConfig ¶
NewStoreWithConfig creates a new ObjectStore with cache configuration. Uses NATS ObjectStore (NOT KeyValue) for immutable message storage.
func NewStoreWithConfigAndMetrics ¶
func NewStoreWithConfigAndMetrics( ctx context.Context, client *natsclient.Client, cfg Config, metricsRegistry *metric.MetricsRegistry, ) (*Store, error)
NewStoreWithConfigAndMetrics creates a new ObjectStore with cache configuration and optional metrics. Uses NATS ObjectStore (NOT KeyValue) for immutable message storage.
func (*Store) Close ¶
Close properly shuts down the ObjectStore and releases resources. This includes closing the cache if it's enabled.
func (*Store) Delete ¶
Delete removes a message (optional, messages are typically immutable). Also removes the message from cache if cached.
func (*Store) FetchBinary ¶
FetchBinary retrieves binary content by its reference. Returns the raw binary data (no JSON, no base64 - direct bytes).
func (*Store) FetchContent ¶
func (s *Store) FetchContent(ctx context.Context, ref *message.StorageReference) (*StoredContent, error)
FetchContent retrieves stored content by StorageReference. Returns the StoredContent with fields and content mapping.
func (*Store) Get ¶
Get retrieves a message by key. Checks cache first, then NATS ObjectStore if cache miss.
func (*Store) GetMetadata ¶
GetMetadata returns the metadata associated with a stored object. Returns empty map if the object has no metadata.
func (*Store) Put ¶
Put stores raw binary data at the specified key (implements storage.Store interface). This is the low-level storage operation - callers provide the key and raw bytes. For message-specific storage with automatic key generation, use Store() instead.
func (*Store) Store ¶
Store saves a message and returns the generated key. Messages are stored as JSON bytes with keys generated by the configured KeyGenerator. If a MetadataExtractor is configured, metadata is extracted and stored with the object.
Special handling for raw byte inputs:
- []byte: stored directly without re-marshaling (avoids base64 encoding)
- json.RawMessage: stored directly as raw JSON bytes
- other types: marshaled to JSON via json.Marshal
func (*Store) StoreContent ¶
func (s *Store) StoreContent(ctx context.Context, cs message.ContentStorable) (*message.StorageReference, error)
StoreContent stores content from a ContentStorable and returns a StorageReference. This is the high-level method for storing document content separately from triples.
Flow:
- Extract RawContent() and ContentFields() from ContentStorable
- Create StoredContent envelope with timestamp
- Store as JSON in ObjectStore
- Return StorageReference for embedding in messages
type StoredContent ¶
type StoredContent struct {
// EntityID is the federated entity ID this content belongs to
EntityID string `json:"entity_id"`
// Fields contains the raw text content by field name.
// Field names here match the values in ContentFields.
// Example: {"title": "Safety Manual", "body": "Full text...", "description": "Brief summary"}
Fields map[string]string `json:"fields,omitempty"`
// BinaryRefs contains references to binary content stored separately.
// The actual binary data is stored under BinaryRef.Key.
// Example: {"video": {ContentType: "video/mp4", Size: 5242880, Key: "binary/..."}}
BinaryRefs map[string]BinaryRef `json:"binary_refs,omitempty"`
// ContentFields maps semantic roles to field names in Fields or BinaryRefs.
// This enables consumers to find content without hardcoding field names.
// Standard roles: "body", "abstract", "title", "media", "thumbnail"
// Example: {"body": "body", "abstract": "description", "title": "title", "media": "video"}
ContentFields map[string]string `json:"content_fields"`
// StoredAt is when the content was stored
StoredAt time.Time `json:"stored_at"`
}
StoredContent is the envelope stored in ObjectStore for ContentStorable payloads. It contains the raw content fields along with the semantic mapping that describes how to interpret the content structure.
This type is stored in ObjectStore and retrieved by consumers (like embedding workers) who need access to the raw content. The ContentFields map provides a semantic role mapping that tells consumers how to find specific content types (body, abstract, title) without hardcoding field names.
For BinaryStorable payloads, binary content is stored separately (as raw bytes) and referenced via BinaryRefs. This avoids base64 bloat in the JSON envelope.
Storage flow:
- Processor receives raw document
- Processor creates ContentStorable with RawContent() and ContentFields()
- ObjectStore.StoreContent() serializes this StoredContent envelope
- Consumers fetch via StorageRef and use ContentFields to find text
Example stored JSON (with binary refs):
{
"entity_id": "acme.logistics.content.video.training.vid-001",
"fields": {
"title": "Safety Training Video",
"description": "Comprehensive safety training"
},
"binary_refs": {
"video": {
"content_type": "video/mp4",
"size": 5242880,
"key": "binary/2025/01/15/vid-001_video_1234567890"
}
},
"content_fields": {
"title": "title",
"abstract": "description",
"media": "video"
},
"stored_at": "2025-01-15T10:30:00Z"
}
func NewStoredContent ¶
func NewStoredContent(entityID string, fields, contentFields map[string]string) *StoredContent
NewStoredContent creates a StoredContent envelope from entity ID, raw content, and field mapping.
func NewStoredContentWithBinary ¶
func NewStoredContentWithBinary(entityID string, fields, contentFields map[string]string, binaryRefs map[string]BinaryRef) *StoredContent
NewStoredContentWithBinary creates a StoredContent envelope with both text and binary content.
func (*StoredContent) GetBinaryRefByRole ¶
func (s *StoredContent) GetBinaryRefByRole(role string) *BinaryRef
GetBinaryRefByRole returns the binary reference for a semantic role. Returns nil if the role is not mapped to a binary field.
func (*StoredContent) GetFieldByRole ¶
func (s *StoredContent) GetFieldByRole(role string) string
GetFieldByRole returns the content for a semantic role (e.g., "body", "abstract"). Returns empty string if the role is not mapped or the field is empty.
func (*StoredContent) HasBinaryContent ¶
func (s *StoredContent) HasBinaryContent() bool
HasBinaryContent returns true if this StoredContent has any binary references.
func (*StoredContent) HasRole ¶
func (s *StoredContent) HasRole(role string) bool
HasRole returns true if the semantic role is mapped to a non-empty field or binary ref.
func (*StoredContent) MarshalJSON ¶
func (s *StoredContent) MarshalJSON() ([]byte, error)
MarshalJSON provides JSON serialization for StoredContent.
func (*StoredContent) UnmarshalJSON ¶
func (s *StoredContent) UnmarshalJSON(data []byte) error
UnmarshalJSON provides JSON deserialization for StoredContent.
type StoredMessage ¶
type StoredMessage struct {
// contains filtered or unexported fields
}
StoredMessage implements message.Storable interface for messages that have been persisted to ObjectStore. It combines the semantic data (Graphable) with a storage reference pointing to the full message.
This enables the "store once, reference everywhere" pattern where:
- ObjectStore stores the full raw message
- Graph processor receives semantic data plus storage reference
- Components can retrieve full message via ObjectStore API if needed
func NewStoredMessage ¶
func NewStoredMessage(graphable graph.Graphable, storageRef *message.StorageReference, messageType string) *StoredMessage
NewStoredMessage creates a StoredMessage from a Graphable and storage metadata
func (*StoredMessage) EntityID ¶
func (s *StoredMessage) EntityID() string
EntityID implements graph.Graphable interface
func (*StoredMessage) MarshalJSON ¶
func (s *StoredMessage) MarshalJSON() ([]byte, error)
MarshalJSON provides JSON serialization for StoredMessage
func (*StoredMessage) MessageType ¶
func (s *StoredMessage) MessageType() string
MessageType returns the original message type
func (*StoredMessage) Schema ¶
func (s *StoredMessage) Schema() message.Type
Schema implements message.Payload interface
func (*StoredMessage) StorageRef ¶
func (s *StoredMessage) StorageRef() *message.StorageReference
StorageRef implements message.Storable interface
func (*StoredMessage) StoredAt ¶
func (s *StoredMessage) StoredAt() time.Time
StoredAt returns when the message was stored
func (*StoredMessage) Triples ¶
func (s *StoredMessage) Triples() []message.Triple
Triples implements graph.Graphable interface
func (*StoredMessage) UnmarshalJSON ¶
func (s *StoredMessage) UnmarshalJSON(data []byte) error
UnmarshalJSON provides JSON deserialization for StoredMessage
func (*StoredMessage) Validate ¶
func (s *StoredMessage) Validate() error
Validate implements message.Payload interface