Documentation
¶
Overview ¶
Package datamanager consolidates entity and edge operations into a unified data management service.
Package datamanager consolidates entity and edge operations into a unified data management service.
Package datamanager provides unified entity and triple data management for the knowledge graph.
Overview ¶
DataManager is the single writer to the ENTITY_STATES KV bucket, providing atomic entity persistence, triple management, and L1/L2 cache hierarchies. It consolidates what were previously separate EntityStore and EdgeManager components into a unified service following the Interface Segregation Principle.
The package provides 6 focused interfaces for different use cases:
- EntityReader: Read-only entity access with caching
- EntityWriter: Basic entity mutation operations
- EntityManager: Complete entity lifecycle management
- TripleManager: Semantic triple operations for relationships
- DataConsistency: Graph integrity checking and maintenance
- DataLifecycle: Component lifecycle and observability
Architecture ¶
DataManager uses a write-behind buffer pattern for high throughput:
Incoming Writes → Buffer (10K capacity) → Batch Workers (5) → NATS KV
↓
L1 Cache (LRU, 1K)
↓
L2 Cache (TTL, 10K)
↓
NATS KV
Usage ¶
Basic entity operations:
// Create manager with NATS client
cfg := datamanager.DefaultConfig()
mgr, err := datamanager.New(natsClient, cfg, registry, logger)
// Start the manager (blocks until context cancelled)
go mgr.Run(ctx, func() {
log.Info("DataManager ready")
})
// Create entity with triples atomically
entity := &graph.EntityState{ID: "org.platform.domain.system.type.instance"}
triples := []message.Triple{
{Subject: entity.ID, Predicate: "type", Object: "Drone"},
}
created, err := mgr.CreateEntityWithTriples(ctx, entity, triples)
// Read entity (uses L1/L2 cache)
entity, err := mgr.GetEntity(ctx, "org.platform.domain.system.type.instance")
Configuration ¶
Key configuration options:
Buffer:
Capacity: 10000 # Write buffer size
BatchingEnabled: true # Enable batch writes
FlushInterval: 50ms # Batch flush frequency
MaxBatchSize: 100 # Max writes per batch
OverflowPolicy: drop_oldest # Buffer overflow handling
Cache:
L1Hot:
Type: lru # LRU eviction for hot data
Size: 1000 # Hot cache capacity
L2Warm:
Type: ttl # TTL eviction for warm data
Size: 10000 # Warm cache capacity
TTL: 5m # Time-to-live
Workers: 5 # Concurrent write workers
WriteTimeout: 5s # KV write timeout
ReadTimeout: 2s # KV read timeout
MaxRetries: 10 # CAS retry attempts
Entity ID Format ¶
Entity IDs must follow the 6-part hierarchical format:
org.platform.domain.system.type.instance
Example: c360.platform1.robotics.gcs1.drone.1
Thread Safety ¶
All Manager methods are safe for concurrent use. The write buffer uses non-blocking submission with configurable overflow policies. Cache access is protected by the underlying cache implementations.
Metrics ¶
DataManager exports Prometheus metrics under the semstreams_datamanager namespace:
- writes_total: Total KV write operations by status and operation
- write_latency_seconds: KV write latency distribution
- queue_depth: Current write queue depth
- batch_size: Size of write batches
- dropped_writes: Writes dropped due to queue overflow
- cache_hits: Cache hits by level (l1, l2)
- cache_misses: Cache misses requiring KV fetch
See Also ¶
Related packages:
- github.com/c360studio/semstreams/graph: Core graph types and EntityState
- github.com/c360studio/semstreams/message: Triple and message types
- github.com/c360studio/semstreams/graph/query: Query execution using EntityReader
Package datamanager consolidates entity and triple operations into a unified data management service. This package is the result of Phase 1 consolidation, providing atomic entity+triple operations and simplified transaction management.
The DataManager is the single writer to ENTITY_STATES KV bucket and handles all entity persistence, triple management, and maintains L1/L2 cache hierarchies.
The package follows Interface Segregation Principle with 5 focused interfaces: - EntityReader: Read-only entity access with caching - EntityWriter: Basic entity mutation operations - EntityManager: Complete entity lifecycle management - TripleManager: Semantic triple operations (replaces EdgeManager) - DataConsistency: Graph integrity checking and maintenance - DataLifecycle: Component lifecycle and observability
Package datamanager consolidates entity and edge operations into a unified data management service. This implementation properly uses semstreams framework components without reimplementing them.
Package datamanager consolidates entity and triple operations into a unified data management service.
Index ¶
- Variables
- func IsBufferError(err error) bool
- func IsEntityError(err error) bool
- func IsLifecycleError(err error) bool
- func IsRetryableError(err error) bool
- type BatchWriteResult
- type BucketConfig
- type BufferConfig
- type CacheConfig
- type CacheStats
- type Config
- type DataConsistency
- type DataLifecycle
- type Dependencies
- type EdgeConfig
- type EntityCreatedCallback
- type EntityIndexStatus
- type EntityManager
- type EntityReader
- type EntityWrite
- type EntityWriter
- type L1CacheConfig
- type L2CacheConfig
- type Manager
- func (m *Manager) AddTriple(ctx context.Context, triple message.Triple) error
- func (m *Manager) BatchGet(ctx context.Context, ids []string) ([]*gtypes.EntityState, error)
- func (m *Manager) BatchWrite(ctx context.Context, writes []EntityWrite) error
- func (m *Manager) CheckOutgoingTriplesConsistency(ctx context.Context, _ string, entity *gtypes.EntityState, ...)
- func (m *Manager) CleanupIncomingReferences(ctx context.Context, deletedEntityID string, outgoingTriples []message.Triple) error
- func (m *Manager) CreateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
- func (m *Manager) CreateEntityWithTriples(ctx context.Context, entity *gtypes.EntityState, triples []message.Triple) (*gtypes.EntityState, error)
- func (m *Manager) CreateRelationship(ctx context.Context, fromEntityID, toEntityID string, predicate string, ...) error
- func (m *Manager) DeleteEntity(ctx context.Context, id string) error
- func (m *Manager) DeleteRelationship(ctx context.Context, fromEntityID, toEntityID string, predicate string) error
- func (m *Manager) ExistsEntity(ctx context.Context, id string) (bool, error)
- func (m *Manager) FlushPendingWrites(ctx context.Context) error
- func (m *Manager) GetCacheStats() CacheStats
- func (m *Manager) GetEntity(ctx context.Context, id string) (*gtypes.EntityState, error)
- func (m *Manager) GetPendingWriteCount() int
- func (m *Manager) HasRelationshipToEntity(entity *gtypes.EntityState, targetEntityID string, predicate string) bool
- func (m *Manager) List(ctx context.Context, pattern string) ([]string, error)
- func (m *Manager) ListWithPrefix(ctx context.Context, prefix string) ([]string, error)
- func (m *Manager) RemoveTriple(ctx context.Context, subject, predicate string) error
- func (m *Manager) Run(ctx context.Context, onReady func()) error
- func (m *Manager) SetEntityCreatedCallback(callback EntityCreatedCallback)
- func (m *Manager) UpdateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
- func (m *Manager) UpdateEntityWithTriples(ctx context.Context, entity *gtypes.EntityState, addTriples []message.Triple, ...) (*gtypes.EntityState, error)
- func (m *Manager) UpsertEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
- type Metrics
- type Operation
- type TripleManager
- type WriteResult
- type WriteStrategy
Constants ¶
This section is empty.
Variables ¶
var ( // DataManager-specific errors (ones not in shared types/graph package) ErrInvalidOperation = fmt.Errorf("invalid operation") ErrBufferOverflow = fmt.Errorf("write buffer overflow") ErrBatchChannelFull = fmt.Errorf("batch channel full") )
Local error types specific to data manager (Using shared sentinel errors from types/graph package for common cases)
Functions ¶
func IsBufferError ¶
IsBufferError checks if an error is related to buffer operations
func IsEntityError ¶
IsEntityError checks if an error is related to entity operations
func IsLifecycleError ¶
IsLifecycleError checks if an error is related to service lifecycle
func IsRetryableError ¶
IsRetryableError determines if an error should be retried Using proper error type checking instead of string matching
Types ¶
type BatchWriteResult ¶
type BatchWriteResult struct {
Results []WriteResult // Individual write results
Succeeded int // Number of successful writes
Failed int // Number of failed writes
Duration time.Duration // Total operation duration
}
BatchWriteResult represents the result of a batch write operation
type BucketConfig ¶
type BucketConfig struct {
Name string `default:"ENTITY_STATES"`
TTL time.Duration `default:"0s"` // 0 = no expiry
History int `default:"5"`
Replicas int `default:"3"`
}
BucketConfig configures the NATS KV bucket settings
type BufferConfig ¶
type BufferConfig struct {
// Buffer capacity
Capacity int `json:"capacity,omitempty" default:"10000"`
// Batching settings - groups writes for efficient processing
BatchingEnabled bool `json:"batching_enabled" default:"true"`
FlushInterval time.Duration `json:"flush_interval,omitempty" default:"50ms"` // How often to flush the buffer
MaxBatchSize int `json:"max_batch_size,omitempty" default:"100"` // Max writes per batch
MaxBatchAge time.Duration `json:"max_batch_age,omitempty" default:"100ms"` // Max age before forced flush
// Overflow policy when buffer is full
// "drop_oldest" (default) - drop oldest items to make room (NATS MaxAckPending provides backpressure)
// "drop_newest" - drop incoming items when full
// "block" - wait for space (WARNING: can cause IndexManager deadlock, use only with careful tuning)
OverflowPolicy string `json:"overflow_policy,omitempty" default:"drop_oldest"` // drop_oldest, drop_newest, block
}
BufferConfig configures the write buffer behavior
type CacheConfig ¶
type CacheConfig struct {
// L1 Cache (Hot - LRU)
L1Hot L1CacheConfig
// L2 Cache (Warm - TTL)
L2Warm L2CacheConfig
// Metrics configuration
Metrics bool `default:"true"`
Component string `default:"data_manager"`
}
CacheConfig configures the L1/L2 cache hierarchy
type CacheStats ¶
type CacheStats struct {
// L1 Stats
L1Hits int64 `json:"l1_hits"`
L1Misses int64 `json:"l1_misses"`
L1Size int `json:"l1_size"`
L1HitRatio float64 `json:"l1_hit_ratio"`
L1Evictions int64 `json:"l1_evictions"`
// L2 Stats
L2Hits int64 `json:"l2_hits"`
L2Misses int64 `json:"l2_misses"`
L2Size int `json:"l2_size"`
L2HitRatio float64 `json:"l2_hit_ratio"`
L2Evictions int64 `json:"l2_evictions"`
// Overall Stats
TotalHits int64 `json:"total_hits"`
TotalMisses int64 `json:"total_misses"`
OverallHitRatio float64 `json:"overall_hit_ratio"`
KVFetches int64 `json:"kv_fetches"`
// Invalidation Stats
InvalidationsTotal int64 `json:"invalidations_total"`
InvalidationsBatched int64 `json:"invalidations_batched"`
}
CacheStats holds cache statistics
type Config ¶
type Config struct {
// Buffer configuration (from EntityStore)
BufferConfig BufferConfig `json:"buffer,omitempty"`
// KV bucket configuration
BucketConfig BucketConfig `json:"bucket,omitempty"`
// Cache configuration (from EntityStore)
Cache CacheConfig
// Edge configuration (from EdgeManager)
Edge EdgeConfig
// Worker pool configuration
Workers int `default:"5"`
// Operation timeouts
WriteTimeout time.Duration `default:"5s"`
ReadTimeout time.Duration `default:"2s"`
// Retry configuration
MaxRetries int `default:"10"`
RetryDelay time.Duration `default:"100ms"`
}
Config configures the DataManager service
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns default configuration for DataManager
type DataConsistency ¶
type DataConsistency interface {
CleanupIncomingReferences(ctx context.Context, deletedEntityID string, outgoingTriples []message.Triple) error
CheckOutgoingTriplesConsistency(ctx context.Context, entityID string, entity *gtypes.EntityState, status *EntityIndexStatus)
HasRelationshipToEntity(entity *gtypes.EntityState, targetEntityID string, predicate string) bool
}
DataConsistency provides graph integrity checking and maintenance. Used by components that need to verify or repair graph consistency.
type DataLifecycle ¶
type DataLifecycle interface {
// Run starts the DataManager and blocks until context is cancelled.
// If onReady is provided, it is called once initialization completes successfully.
Run(ctx context.Context, onReady func()) error
FlushPendingWrites(ctx context.Context) error
GetPendingWriteCount() int
GetCacheStats() CacheStats
}
DataLifecycle manages component lifecycle and observability. Used by the main processor to start/stop the manager and monitor health.
type Dependencies ¶
type Dependencies struct {
KVBucket jetstream.KeyValue // NATS KV bucket for persistence
MetricsRegistry *metric.MetricsRegistry // Framework metrics registry
Logger *slog.Logger // Structured logging
Config Config // Configuration
}
Dependencies defines all dependencies needed by DataManager
type EdgeConfig ¶
type EdgeConfig struct {
// Edge validation settings
ValidateEdgeTargets bool `default:"true"`
AtomicOperations bool `default:"true"`
}
EdgeConfig configures edge-specific settings
type EntityCreatedCallback ¶
type EntityCreatedCallback func(entityID string)
EntityCreatedCallback is invoked when a new entity is created in the data store. The callback receives the entity ID and should be non-blocking to avoid impacting write performance. Used by the graph processor for adaptive clustering.
type EntityIndexStatus ¶
type EntityIndexStatus struct {
OutgoingEdgesConsistent bool `json:"outgoing_edges_consistent"`
InconsistentEdges []string `json:"inconsistent_edges,omitempty"`
}
EntityIndexStatus represents index consistency status
type EntityManager ¶
type EntityManager interface {
EntityReader
EntityWriter
CreateEntityWithTriples(ctx context.Context, entity *gtypes.EntityState, triples []message.Triple) (*gtypes.EntityState, error)
UpdateEntityWithTriples(ctx context.Context, entity *gtypes.EntityState, addTriples []message.Triple, removePredicates []string) (*gtypes.EntityState, error)
BatchWrite(ctx context.Context, writes []EntityWrite) error
List(ctx context.Context, pattern string) ([]string, error)
// ListWithPrefix returns entity IDs that have the given prefix.
// Used for hierarchical entity queries like finding siblings in PathRAG.
ListWithPrefix(ctx context.Context, prefix string) ([]string, error)
}
EntityManager provides complete entity lifecycle management. Combines reader, writer, and advanced operations like atomic entity+triple writes.
type EntityReader ¶
type EntityReader interface {
GetEntity(ctx context.Context, id string) (*gtypes.EntityState, error)
ExistsEntity(ctx context.Context, id string) (bool, error)
BatchGet(ctx context.Context, ids []string) ([]*gtypes.EntityState, error)
// ListWithPrefix returns entity IDs that have the given prefix.
// Used for hierarchical entity queries like finding siblings in PathRAG.
ListWithPrefix(ctx context.Context, prefix string) ([]string, error)
}
EntityReader provides read-only entity access with caching. Used by components that only need to query entities (e.g., QueryManager).
type EntityWrite ¶
type EntityWrite struct {
Operation Operation // create|update|delete
Entity *gtypes.EntityState // Entity data (nil for delete)
Triples []message.Triple // Triples to add (for create/update)
Callback func(error) // Optional completion callback
RequestID string // Optional request ID for tracing
Timestamp time.Time // When request was created
Strategy WriteStrategy // CAS vs Put for updates (default: CAS)
}
EntityWrite represents a buffered write operation
type EntityWriter ¶
type EntityWriter interface {
CreateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
UpdateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
// UpsertEntity atomically creates or updates an entity using Put semantics.
// This is the preferred method for streaming data where idempotency is required.
// It avoids TOCTOU races that occur with separate GetEntity → Create/Update patterns.
UpsertEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
DeleteEntity(ctx context.Context, id string) error
}
EntityWriter provides basic entity mutation operations. Used by components that need simple CRUD operations without edge management.
type L1CacheConfig ¶
type L1CacheConfig struct {
Type string `default:"lru"`
Size int `default:"1000"`
Metrics bool `default:"true"`
Component string `default:"entity_l1"`
}
L1CacheConfig configures the L1 (hot) LRU cache
type L2CacheConfig ¶
type L2CacheConfig struct {
Type string `default:"ttl"`
Size int `default:"10000"`
TTL time.Duration `default:"5m"`
CleanupInterval time.Duration `default:"1m"`
Metrics bool `default:"true"`
Component string `default:"entity_l2"`
}
L2CacheConfig configures the L2 (warm) TTL cache
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager is the consolidated data management service using framework components.
func NewDataManager ¶
func NewDataManager(deps Dependencies) (*Manager, error)
NewDataManager creates a new data manager using framework components
func (*Manager) BatchWrite ¶
func (m *Manager) BatchWrite(ctx context.Context, writes []EntityWrite) error
BatchWrite performs batch write operations
func (*Manager) CheckOutgoingTriplesConsistency ¶
func (m *Manager) CheckOutgoingTriplesConsistency( ctx context.Context, _ string, entity *gtypes.EntityState, status *EntityIndexStatus, )
CheckOutgoingTriplesConsistency checks relationship triple consistency
func (*Manager) CleanupIncomingReferences ¶
func (m *Manager) CleanupIncomingReferences( ctx context.Context, deletedEntityID string, outgoingTriples []message.Triple, ) error
CleanupIncomingReferences is a no-op - actual cleanup is handled by IndexManager The IndexManager watches for entity delete events and uses CleanupOrphanedIncomingReferences to remove references from INCOMING_INDEX before deleting from OUTGOING_INDEX. This method exists for interface compatibility.
func (*Manager) CreateEntity ¶
func (m *Manager) CreateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
CreateEntity creates a new entity in the store
func (*Manager) CreateEntityWithTriples ¶
func (m *Manager) CreateEntityWithTriples( ctx context.Context, entity *gtypes.EntityState, triples []message.Triple, ) (*gtypes.EntityState, error)
CreateEntityWithTriples creates an entity with triples atomically
func (*Manager) CreateRelationship ¶
func (m *Manager) CreateRelationship( ctx context.Context, fromEntityID, toEntityID string, predicate string, metadata map[string]any, ) error
CreateRelationship creates a relationship between two entities using a triple
func (*Manager) DeleteEntity ¶
DeleteEntity deletes an entity from the store
func (*Manager) DeleteRelationship ¶
func (m *Manager) DeleteRelationship(ctx context.Context, fromEntityID, toEntityID string, predicate string) error
DeleteRelationship deletes a relationship between entities
func (*Manager) ExistsEntity ¶
ExistsEntity checks if an entity exists
func (*Manager) FlushPendingWrites ¶
FlushPendingWrites forces all buffered writes to be processed immediately. This is primarily for testing and graceful shutdown scenarios.
func (*Manager) GetCacheStats ¶
func (m *Manager) GetCacheStats() CacheStats
GetCacheStats returns cache statistics
func (*Manager) GetPendingWriteCount ¶
GetPendingWriteCount returns the number of writes currently in the buffer. This is useful for monitoring and testing async operations.
func (*Manager) HasRelationshipToEntity ¶
func (m *Manager) HasRelationshipToEntity(entity *gtypes.EntityState, targetEntityID string, predicate string) bool
HasRelationshipToEntity checks if an entity has a relationship triple to a target
func (*Manager) List ¶
List returns entity IDs matching a pattern. When pattern is non-empty it is forwarded to the NATS server as a subject filter (e.g. "foo.>"); an empty pattern returns all keys.
func (*Manager) ListWithPrefix ¶
ListWithPrefix returns entity IDs that have the given prefix. This is used for hierarchical entity queries, such as finding all siblings (entities with the same type-level prefix) for PathRAG traversal.
Example: prefix "c360.logistics.environmental.sensor.temperature" returns all temperature sensor entities like "c360.logistics.environmental.sensor.temperature.cold-storage-01"
func (*Manager) RemoveTriple ¶
RemoveTriple removes a triple from an entity by predicate
func (*Manager) Run ¶
Run starts the DataManager and blocks until context is cancelled or fatal error occurs. If onReady is provided, it is called once initialization completes successfully.
func (*Manager) SetEntityCreatedCallback ¶
func (m *Manager) SetEntityCreatedCallback(callback EntityCreatedCallback)
SetEntityCreatedCallback sets a callback function that is invoked when a new entity is created. This is used by the graph processor to track entity changes for adaptive clustering. The callback is invoked synchronously, so it should be non-blocking.
func (*Manager) UpdateEntity ¶
func (m *Manager) UpdateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
UpdateEntity updates an existing entity
func (*Manager) UpdateEntityWithTriples ¶
func (m *Manager) UpdateEntityWithTriples( ctx context.Context, entity *gtypes.EntityState, addTriples []message.Triple, removePredicates []string, ) (*gtypes.EntityState, error)
UpdateEntityWithTriples updates an entity and modifies triples atomically
func (*Manager) UpsertEntity ¶
func (m *Manager) UpsertEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
UpsertEntity atomically creates or updates an entity using Put semantics. This is the preferred method for streaming data where idempotency is required. Unlike the GetEntity → Create/Update pattern, this avoids TOCTOU race conditions.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics holds Prometheus metrics for DataManager KV operations
type Operation ¶
type Operation string
Operation represents the type of entity operation
type TripleManager ¶
type TripleManager interface {
AddTriple(ctx context.Context, triple message.Triple) error
RemoveTriple(ctx context.Context, subject, predicate string) error
CreateRelationship(ctx context.Context, fromEntityID, toEntityID string, predicate string, metadata map[string]any) error
DeleteRelationship(ctx context.Context, fromEntityID, toEntityID string, predicate string) error
}
TripleManager provides semantic triple operations. Used by components that manage entity relationships and properties as triples.
type WriteResult ¶
type WriteResult struct {
EntityID string // ID of the entity written
Version int64 // Final version after write
Created bool // Whether entity was created (vs updated)
Entity *gtypes.EntityState // Final entity state
Error error // Error if operation failed
}
WriteResult represents the result of a write operation
type WriteStrategy ¶
type WriteStrategy int
WriteStrategy determines how concurrent writes are handled
const ( // WriteStrategyCAS uses Compare-And-Swap with version checking and retries. // Best for synchronous mutation requests where caller can handle conflicts. WriteStrategyCAS WriteStrategy = iota // WriteStrategyPut uses last-write-wins without version checking. // Best for async streaming data where CAS would cause race conditions. WriteStrategyPut )