datamanager

package
v1.0.0-alpha.42 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package datamanager consolidates entity and edge operations into a unified data management service.

Package datamanager consolidates entity and edge operations into a unified data management service.

Package datamanager provides unified entity and triple data management for the knowledge graph.

Overview

DataManager is the single writer to the ENTITY_STATES KV bucket, providing atomic entity persistence, triple management, and L1/L2 cache hierarchies. It consolidates what were previously separate EntityStore and EdgeManager components into a unified service following the Interface Segregation Principle.

The package provides 6 focused interfaces for different use cases:

Architecture

DataManager uses a write-behind buffer pattern for high throughput:

Incoming Writes → Buffer (10K capacity) → Batch Workers (5) → NATS KV
                     ↓
               L1 Cache (LRU, 1K)
                     ↓
               L2 Cache (TTL, 10K)
                     ↓
                 NATS KV

Usage

Basic entity operations:

// Create manager with NATS client
cfg := datamanager.DefaultConfig()
mgr, err := datamanager.New(natsClient, cfg, registry, logger)

// Start the manager (blocks until context cancelled)
go mgr.Run(ctx, func() {
    log.Info("DataManager ready")
})

// Create entity with triples atomically
entity := &graph.EntityState{ID: "org.platform.domain.system.type.instance"}
triples := []message.Triple{
    {Subject: entity.ID, Predicate: "type", Object: "Drone"},
}
created, err := mgr.CreateEntityWithTriples(ctx, entity, triples)

// Read entity (uses L1/L2 cache)
entity, err := mgr.GetEntity(ctx, "org.platform.domain.system.type.instance")

Configuration

Key configuration options:

Buffer:
  Capacity:        10000      # Write buffer size
  BatchingEnabled: true       # Enable batch writes
  FlushInterval:   50ms       # Batch flush frequency
  MaxBatchSize:    100        # Max writes per batch
  OverflowPolicy:  drop_oldest # Buffer overflow handling

Cache:
  L1Hot:
    Type: lru                 # LRU eviction for hot data
    Size: 1000                # Hot cache capacity
  L2Warm:
    Type: ttl                 # TTL eviction for warm data
    Size: 10000               # Warm cache capacity
    TTL:  5m                  # Time-to-live

Workers:      5               # Concurrent write workers
WriteTimeout: 5s              # KV write timeout
ReadTimeout:  2s              # KV read timeout
MaxRetries:   10              # CAS retry attempts

Entity ID Format

Entity IDs must follow the 6-part hierarchical format:

org.platform.domain.system.type.instance

Example: c360.platform1.robotics.gcs1.drone.1

Thread Safety

All Manager methods are safe for concurrent use. The write buffer uses non-blocking submission with configurable overflow policies. Cache access is protected by the underlying cache implementations.

Metrics

DataManager exports Prometheus metrics under the semstreams_datamanager namespace:

  • writes_total: Total KV write operations by status and operation
  • write_latency_seconds: KV write latency distribution
  • queue_depth: Current write queue depth
  • batch_size: Size of write batches
  • dropped_writes: Writes dropped due to queue overflow
  • cache_hits: Cache hits by level (l1, l2)
  • cache_misses: Cache misses requiring KV fetch

See Also

Related packages:

Package datamanager consolidates entity and triple operations into a unified data management service. This package is the result of Phase 1 consolidation, providing atomic entity+triple operations and simplified transaction management.

The DataManager is the single writer to ENTITY_STATES KV bucket and handles all entity persistence, triple management, and maintains L1/L2 cache hierarchies.

The package follows Interface Segregation Principle with 5 focused interfaces: - EntityReader: Read-only entity access with caching - EntityWriter: Basic entity mutation operations - EntityManager: Complete entity lifecycle management - TripleManager: Semantic triple operations (replaces EdgeManager) - DataConsistency: Graph integrity checking and maintenance - DataLifecycle: Component lifecycle and observability

Package datamanager consolidates entity and edge operations into a unified data management service. This implementation properly uses semstreams framework components without reimplementing them.

Package datamanager consolidates entity and triple operations into a unified data management service.

Index

Constants

This section is empty.

Variables

View Source
var (
	// DataManager-specific errors (ones not in shared types/graph package)
	ErrInvalidOperation = fmt.Errorf("invalid operation")
	ErrBufferOverflow   = fmt.Errorf("write buffer overflow")
	ErrBatchChannelFull = fmt.Errorf("batch channel full")
)

Local error types specific to data manager (Using shared sentinel errors from types/graph package for common cases)

Functions

func IsBufferError

func IsBufferError(err error) bool

IsBufferError checks if an error is related to buffer operations

func IsEntityError

func IsEntityError(err error) bool

IsEntityError checks if an error is related to entity operations

func IsLifecycleError

func IsLifecycleError(err error) bool

IsLifecycleError checks if an error is related to service lifecycle

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError determines if an error should be retried Using proper error type checking instead of string matching

Types

type BatchWriteResult

type BatchWriteResult struct {
	Results   []WriteResult // Individual write results
	Succeeded int           // Number of successful writes
	Failed    int           // Number of failed writes
	Duration  time.Duration // Total operation duration
}

BatchWriteResult represents the result of a batch write operation

type BucketConfig

type BucketConfig struct {
	Name     string        `default:"ENTITY_STATES"`
	TTL      time.Duration `default:"0s"` // 0 = no expiry
	History  int           `default:"5"`
	Replicas int           `default:"3"`
}

BucketConfig configures the NATS KV bucket settings

type BufferConfig

type BufferConfig struct {
	// Buffer capacity
	Capacity int `json:"capacity,omitempty" default:"10000"`

	// Batching settings - groups writes for efficient processing
	BatchingEnabled bool          `json:"batching_enabled" default:"true"`
	FlushInterval   time.Duration `json:"flush_interval,omitempty" default:"50ms"` // How often to flush the buffer
	MaxBatchSize    int           `json:"max_batch_size,omitempty" default:"100"`  // Max writes per batch
	MaxBatchAge     time.Duration `json:"max_batch_age,omitempty" default:"100ms"` // Max age before forced flush

	// Overflow policy when buffer is full
	// "drop_oldest" (default) - drop oldest items to make room (NATS MaxAckPending provides backpressure)
	// "drop_newest" - drop incoming items when full
	// "block" - wait for space (WARNING: can cause IndexManager deadlock, use only with careful tuning)
	OverflowPolicy string `json:"overflow_policy,omitempty" default:"drop_oldest"` // drop_oldest, drop_newest, block
}

BufferConfig configures the write buffer behavior

type CacheConfig

type CacheConfig struct {
	// L1 Cache (Hot - LRU)
	L1Hot L1CacheConfig

	// L2 Cache (Warm - TTL)
	L2Warm L2CacheConfig

	// Metrics configuration
	Metrics   bool   `default:"true"`
	Component string `default:"data_manager"`
}

CacheConfig configures the L1/L2 cache hierarchy

type CacheStats

type CacheStats struct {
	// L1 Stats
	L1Hits      int64   `json:"l1_hits"`
	L1Misses    int64   `json:"l1_misses"`
	L1Size      int     `json:"l1_size"`
	L1HitRatio  float64 `json:"l1_hit_ratio"`
	L1Evictions int64   `json:"l1_evictions"`

	// L2 Stats
	L2Hits      int64   `json:"l2_hits"`
	L2Misses    int64   `json:"l2_misses"`
	L2Size      int     `json:"l2_size"`
	L2HitRatio  float64 `json:"l2_hit_ratio"`
	L2Evictions int64   `json:"l2_evictions"`

	// Overall Stats
	TotalHits       int64   `json:"total_hits"`
	TotalMisses     int64   `json:"total_misses"`
	OverallHitRatio float64 `json:"overall_hit_ratio"`
	KVFetches       int64   `json:"kv_fetches"`

	// Invalidation Stats
	InvalidationsTotal   int64 `json:"invalidations_total"`
	InvalidationsBatched int64 `json:"invalidations_batched"`
}

CacheStats holds cache statistics

type Config

type Config struct {
	// Buffer configuration (from EntityStore)
	BufferConfig BufferConfig `json:"buffer,omitempty"`

	// KV bucket configuration
	BucketConfig BucketConfig `json:"bucket,omitempty"`

	// Cache configuration (from EntityStore)
	Cache CacheConfig

	// Edge configuration (from EdgeManager)
	Edge EdgeConfig

	// Worker pool configuration
	Workers int `default:"5"`

	// Operation timeouts
	WriteTimeout time.Duration `default:"5s"`
	ReadTimeout  time.Duration `default:"2s"`

	// Retry configuration
	MaxRetries int           `default:"10"`
	RetryDelay time.Duration `default:"100ms"`
}

Config configures the DataManager service

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration for DataManager

type DataConsistency

type DataConsistency interface {
	CleanupIncomingReferences(ctx context.Context, deletedEntityID string, outgoingTriples []message.Triple) error
	CheckOutgoingTriplesConsistency(ctx context.Context, entityID string, entity *gtypes.EntityState, status *EntityIndexStatus)
	HasRelationshipToEntity(entity *gtypes.EntityState, targetEntityID string, predicate string) bool
}

DataConsistency provides graph integrity checking and maintenance. Used by components that need to verify or repair graph consistency.

type DataLifecycle

type DataLifecycle interface {
	// Run starts the DataManager and blocks until context is cancelled.
	// If onReady is provided, it is called once initialization completes successfully.
	Run(ctx context.Context, onReady func()) error
	FlushPendingWrites(ctx context.Context) error
	GetPendingWriteCount() int
	GetCacheStats() CacheStats
}

DataLifecycle manages component lifecycle and observability. Used by the main processor to start/stop the manager and monitor health.

type Dependencies

type Dependencies struct {
	KVBucket        jetstream.KeyValue      // NATS KV bucket for persistence
	MetricsRegistry *metric.MetricsRegistry // Framework metrics registry
	Logger          *slog.Logger            // Structured logging
	Config          Config                  // Configuration
}

Dependencies defines all dependencies needed by DataManager

type EdgeConfig

type EdgeConfig struct {
	// Edge validation settings
	ValidateEdgeTargets bool `default:"true"`
	AtomicOperations    bool `default:"true"`
}

EdgeConfig configures edge-specific settings

type EntityCreatedCallback

type EntityCreatedCallback func(entityID string)

EntityCreatedCallback is invoked when a new entity is created in the data store. The callback receives the entity ID and should be non-blocking to avoid impacting write performance. Used by the graph processor for adaptive clustering.

type EntityIndexStatus

type EntityIndexStatus struct {
	OutgoingEdgesConsistent bool     `json:"outgoing_edges_consistent"`
	InconsistentEdges       []string `json:"inconsistent_edges,omitempty"`
}

EntityIndexStatus represents index consistency status

type EntityManager

type EntityManager interface {
	EntityReader
	EntityWriter

	CreateEntityWithTriples(ctx context.Context, entity *gtypes.EntityState, triples []message.Triple) (*gtypes.EntityState, error)
	UpdateEntityWithTriples(ctx context.Context, entity *gtypes.EntityState, addTriples []message.Triple, removePredicates []string) (*gtypes.EntityState, error)
	BatchWrite(ctx context.Context, writes []EntityWrite) error
	List(ctx context.Context, pattern string) ([]string, error)
	// ListWithPrefix returns entity IDs that have the given prefix.
	// Used for hierarchical entity queries like finding siblings in PathRAG.
	ListWithPrefix(ctx context.Context, prefix string) ([]string, error)
}

EntityManager provides complete entity lifecycle management. Combines reader, writer, and advanced operations like atomic entity+triple writes.

type EntityReader

type EntityReader interface {
	GetEntity(ctx context.Context, id string) (*gtypes.EntityState, error)
	ExistsEntity(ctx context.Context, id string) (bool, error)
	BatchGet(ctx context.Context, ids []string) ([]*gtypes.EntityState, error)
	// ListWithPrefix returns entity IDs that have the given prefix.
	// Used for hierarchical entity queries like finding siblings in PathRAG.
	ListWithPrefix(ctx context.Context, prefix string) ([]string, error)
}

EntityReader provides read-only entity access with caching. Used by components that only need to query entities (e.g., QueryManager).

type EntityWrite

type EntityWrite struct {
	Operation Operation           // create|update|delete
	Entity    *gtypes.EntityState // Entity data (nil for delete)
	Triples   []message.Triple    // Triples to add (for create/update)
	Callback  func(error)         // Optional completion callback
	RequestID string              // Optional request ID for tracing
	Timestamp time.Time           // When request was created
	Strategy  WriteStrategy       // CAS vs Put for updates (default: CAS)
}

EntityWrite represents a buffered write operation

type EntityWriter

type EntityWriter interface {
	CreateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
	UpdateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
	// UpsertEntity atomically creates or updates an entity using Put semantics.
	// This is the preferred method for streaming data where idempotency is required.
	// It avoids TOCTOU races that occur with separate GetEntity → Create/Update patterns.
	UpsertEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)
	DeleteEntity(ctx context.Context, id string) error
}

EntityWriter provides basic entity mutation operations. Used by components that need simple CRUD operations without edge management.

type L1CacheConfig

type L1CacheConfig struct {
	Type      string `default:"lru"`
	Size      int    `default:"1000"`
	Metrics   bool   `default:"true"`
	Component string `default:"entity_l1"`
}

L1CacheConfig configures the L1 (hot) LRU cache

type L2CacheConfig

type L2CacheConfig struct {
	Type            string        `default:"ttl"`
	Size            int           `default:"10000"`
	TTL             time.Duration `default:"5m"`
	CleanupInterval time.Duration `default:"1m"`
	Metrics         bool          `default:"true"`
	Component       string        `default:"entity_l2"`
}

L2CacheConfig configures the L2 (warm) TTL cache

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is the consolidated data management service using framework components.

func NewDataManager

func NewDataManager(deps Dependencies) (*Manager, error)

NewDataManager creates a new data manager using framework components

func (*Manager) AddTriple

func (m *Manager) AddTriple(ctx context.Context, triple message.Triple) error

AddTriple adds a triple to an entity

func (*Manager) BatchGet

func (m *Manager) BatchGet(ctx context.Context, ids []string) ([]*gtypes.EntityState, error)

BatchGet retrieves multiple entities

func (*Manager) BatchWrite

func (m *Manager) BatchWrite(ctx context.Context, writes []EntityWrite) error

BatchWrite performs batch write operations

func (*Manager) CheckOutgoingTriplesConsistency

func (m *Manager) CheckOutgoingTriplesConsistency(
	ctx context.Context,
	_ string,
	entity *gtypes.EntityState,
	status *EntityIndexStatus,
)

CheckOutgoingTriplesConsistency checks relationship triple consistency

func (*Manager) CleanupIncomingReferences

func (m *Manager) CleanupIncomingReferences(
	ctx context.Context,
	deletedEntityID string,
	outgoingTriples []message.Triple,
) error

CleanupIncomingReferences is a no-op - actual cleanup is handled by IndexManager The IndexManager watches for entity delete events and uses CleanupOrphanedIncomingReferences to remove references from INCOMING_INDEX before deleting from OUTGOING_INDEX. This method exists for interface compatibility.

func (*Manager) CreateEntity

func (m *Manager) CreateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)

CreateEntity creates a new entity in the store

func (*Manager) CreateEntityWithTriples

func (m *Manager) CreateEntityWithTriples(
	ctx context.Context,
	entity *gtypes.EntityState,
	triples []message.Triple,
) (*gtypes.EntityState, error)

CreateEntityWithTriples creates an entity with triples atomically

func (*Manager) CreateRelationship

func (m *Manager) CreateRelationship(
	ctx context.Context,
	fromEntityID, toEntityID string,
	predicate string,
	metadata map[string]any,
) error

CreateRelationship creates a relationship between two entities using a triple

func (*Manager) DeleteEntity

func (m *Manager) DeleteEntity(ctx context.Context, id string) error

DeleteEntity deletes an entity from the store

func (*Manager) DeleteRelationship

func (m *Manager) DeleteRelationship(ctx context.Context, fromEntityID, toEntityID string, predicate string) error

DeleteRelationship deletes a relationship between entities

func (*Manager) ExistsEntity

func (m *Manager) ExistsEntity(ctx context.Context, id string) (bool, error)

ExistsEntity checks if an entity exists

func (*Manager) FlushPendingWrites

func (m *Manager) FlushPendingWrites(ctx context.Context) error

FlushPendingWrites forces all buffered writes to be processed immediately. This is primarily for testing and graceful shutdown scenarios.

func (*Manager) GetCacheStats

func (m *Manager) GetCacheStats() CacheStats

GetCacheStats returns cache statistics

func (*Manager) GetEntity

func (m *Manager) GetEntity(ctx context.Context, id string) (*gtypes.EntityState, error)

GetEntity retrieves an entity from the store

func (*Manager) GetPendingWriteCount

func (m *Manager) GetPendingWriteCount() int

GetPendingWriteCount returns the number of writes currently in the buffer. This is useful for monitoring and testing async operations.

func (*Manager) HasRelationshipToEntity

func (m *Manager) HasRelationshipToEntity(entity *gtypes.EntityState, targetEntityID string, predicate string) bool

HasRelationshipToEntity checks if an entity has a relationship triple to a target

func (*Manager) List

func (m *Manager) List(ctx context.Context, pattern string) ([]string, error)

List returns entity IDs matching a pattern. When pattern is non-empty it is forwarded to the NATS server as a subject filter (e.g. "foo.>"); an empty pattern returns all keys.

func (*Manager) ListWithPrefix

func (m *Manager) ListWithPrefix(ctx context.Context, prefix string) ([]string, error)

ListWithPrefix returns entity IDs that have the given prefix. This is used for hierarchical entity queries, such as finding all siblings (entities with the same type-level prefix) for PathRAG traversal.

Example: prefix "c360.logistics.environmental.sensor.temperature" returns all temperature sensor entities like "c360.logistics.environmental.sensor.temperature.cold-storage-01"

func (*Manager) RemoveTriple

func (m *Manager) RemoveTriple(ctx context.Context, subject, predicate string) error

RemoveTriple removes a triple from an entity by predicate

func (*Manager) Run

func (m *Manager) Run(ctx context.Context, onReady func()) error

Run starts the DataManager and blocks until context is cancelled or fatal error occurs. If onReady is provided, it is called once initialization completes successfully.

func (*Manager) SetEntityCreatedCallback

func (m *Manager) SetEntityCreatedCallback(callback EntityCreatedCallback)

SetEntityCreatedCallback sets a callback function that is invoked when a new entity is created. This is used by the graph processor to track entity changes for adaptive clustering. The callback is invoked synchronously, so it should be non-blocking.

func (*Manager) UpdateEntity

func (m *Manager) UpdateEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)

UpdateEntity updates an existing entity

func (*Manager) UpdateEntityWithTriples

func (m *Manager) UpdateEntityWithTriples(
	ctx context.Context,
	entity *gtypes.EntityState,
	addTriples []message.Triple,
	removePredicates []string,
) (*gtypes.EntityState, error)

UpdateEntityWithTriples updates an entity and modifies triples atomically

func (*Manager) UpsertEntity

func (m *Manager) UpsertEntity(ctx context.Context, entity *gtypes.EntityState) (*gtypes.EntityState, error)

UpsertEntity atomically creates or updates an entity using Put semantics. This is the preferred method for streaming data where idempotency is required. Unlike the GetEntity → Create/Update pattern, this avoids TOCTOU race conditions.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds Prometheus metrics for DataManager KV operations

type Operation

type Operation string

Operation represents the type of entity operation

const (
	// OperationCreate represents creating a new entity.
	OperationCreate Operation = "create"
	// OperationUpdate represents updating an existing entity.
	OperationUpdate Operation = "update"
	// OperationDelete represents deleting an entity.
	OperationDelete Operation = "delete"
)

func (Operation) IsValid

func (o Operation) IsValid() bool

IsValid checks if the operation is valid

func (Operation) String

func (o Operation) String() string

String returns the string representation of the operation

type TripleManager

type TripleManager interface {
	AddTriple(ctx context.Context, triple message.Triple) error
	RemoveTriple(ctx context.Context, subject, predicate string) error
	CreateRelationship(ctx context.Context, fromEntityID, toEntityID string, predicate string, metadata map[string]any) error
	DeleteRelationship(ctx context.Context, fromEntityID, toEntityID string, predicate string) error
}

TripleManager provides semantic triple operations. Used by components that manage entity relationships and properties as triples.

type WriteResult

type WriteResult struct {
	EntityID string              // ID of the entity written
	Version  int64               // Final version after write
	Created  bool                // Whether entity was created (vs updated)
	Entity   *gtypes.EntityState // Final entity state
	Error    error               // Error if operation failed
}

WriteResult represents the result of a write operation

type WriteStrategy

type WriteStrategy int

WriteStrategy determines how concurrent writes are handled

const (
	// WriteStrategyCAS uses Compare-And-Swap with version checking and retries.
	// Best for synchronous mutation requests where caller can handle conflicts.
	WriteStrategyCAS WriteStrategy = iota

	// WriteStrategyPut uses last-write-wins without version checking.
	// Best for async streaming data where CAS would cause race conditions.
	WriteStrategyPut
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL