messagemanager

package
v1.0.0-alpha.36 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package messagemanager provides message processing and entity extraction for the knowledge graph.

Overview

The messagemanager package converts incoming messages into EntityState objects that can be stored in the knowledge graph. It handles multiple message types: Graphable interfaces, Storable interfaces with ObjectStore references, and legacy map-based messages.

Messages are processed through a unified pipeline that extracts entity IDs, triples (the single source of truth for properties and relationships), and optional storage references. Entity merging is performed atomically using upsert semantics to avoid race conditions.

Architecture

              Incoming Message ([]byte)
                       ↓
┌──────────────────────────────────────────────────────────────┐
│                   Manager.ProcessWork                        │
│  1. Parse BaseMessage envelope                               │
│  2. Extract payload type                                     │
│  3. Route to appropriate handler                             │
└──────────────────────────────────────────────────────────────┘
                       ↓
┌────────────┬─────────────────┬───────────────────────────────┐
│ Storable   │ Graphable       │ Map/Legacy                    │
│ (ObjectStore│ (EntityID +    │ (JSON map to                  │
│  reference) │  Triples)      │  entity state)                │
└────────────┴─────────────────┴───────────────────────────────┘
                       ↓
┌──────────────────────────────────────────────────────────────┐
│              EntityState (Triples = source of truth)         │
│                          ↓                                   │
│              EntityManager.UpsertEntity                      │
└──────────────────────────────────────────────────────────────┘

Usage

Create and configure the message manager:

config := messagemanager.DefaultConfig()
deps := messagemanager.Dependencies{
    EntityManager:   entityManager,
    IndexManager:    indexManager,
    Logger:          logger,
    MetricsRegistry: registry,
}

manager := messagemanager.NewManager(config, deps, func(err string) {
    log.Error("Message processing error", "error", err)
})

Process messages from a worker pool:

// ProcessWork handles raw message bytes from worker pool
err := manager.ProcessWork(ctx, messageData)

Process messages directly:

// ProcessMessage handles typed messages
entityStates, err := manager.ProcessMessage(ctx, graphableMsg)

Message Types

Storable messages:

Messages implementing the Storable interface have content stored in ObjectStore. The manager extracts the StorageReference and passes it to the EntityState:

type Storable interface {
    Graphable
    StorageRef() *message.StorageReference
}

Graphable messages:

Messages implementing Graphable provide entity ID and triples directly:

type Graphable interface {
    EntityID() string
    Triples() []message.Triple
}

Map messages:

JSON objects (map[string]any) are converted to entities with auto-generated IDs and map keys becoming triple predicates.

Entity Merging

When processing messages for existing entities, the manager:

  1. Fetches existing entity state (if any)
  2. Merges triples using gtypes.MergeTriples
  3. Increments entity version
  4. Uses UpsertEntity for atomic persistence

This avoids TOCTOU race conditions in concurrent message processing.

Alias Resolution

Entity IDs can be aliases resolved via IndexManager.ResolveAlias. If resolution fails, the original ID is used as-is.

Configuration

Configuration options:

DefaultNamespace: "default"      # Namespace for auto-generated entity IDs
DefaultPlatform:  "semstreams"   # Platform for auto-generated entity IDs

Thread Safety

The Manager is safe for concurrent use. Message processing uses atomic operations for statistics and upsert semantics for entity persistence.

Metrics

The package exports Prometheus metrics:

  • messages_processed_total: Total messages processed
  • messages_failed_total: Messages that failed processing
  • entities_extracted_total: Entities extracted from messages
  • entities_update_attempts_total: Entity upsert attempts
  • entities_update_success_total: Successful entity upserts
  • entities_update_failed_total: Failed entity upserts

See Also

Related packages:

Package messagemanager provides the MessageHandler interface and Manager implementation

Package messagemanager provides the MessageHandler interface and Manager implementation

Package messagemanager handles all message processing business logic

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	DefaultNamespace string
	DefaultPlatform  string
}

Config holds message manager configuration

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration

type Dependencies

type Dependencies struct {
	EntityManager   datamanager.EntityManager
	IndexManager    IndexManager
	Logger          Logger
	MetricsRegistry *metric.MetricsRegistry
}

Dependencies defines all dependencies needed by message manager

type IndexManager

type IndexManager interface {
	ResolveAlias(ctx context.Context, aliasOrID string) (string, error)
}

IndexManager interface for index operations

type Logger

type Logger interface {
	Debug(msg string, keysAndValues ...any)
	Error(msg string, keysAndValues ...any)
	Warn(msg string, keysAndValues ...any)
}

Logger interface for logging

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager implements the MessageHandler interface

func NewManager

func NewManager(config Config, deps Dependencies, errorCallback func(string)) *Manager

NewManager creates a new message manager

func (*Manager) GetStats

func (mp *Manager) GetStats() ProcessingStats

GetStats returns processing statistics

func (*Manager) ProcessMessage

func (mp *Manager) ProcessMessage(ctx context.Context, msg any) ([]*gtypes.EntityState, error)

ProcessMessage processes any message type into entity states

func (*Manager) ProcessWork

func (mp *Manager) ProcessWork(ctx context.Context, data []byte) error

ProcessWork processes raw message data from worker pool

func (*Manager) SetIndexManager

func (mp *Manager) SetIndexManager(indexManager IndexManager)

SetIndexManager sets the index manager dependency (for circular dependency resolution)

type MessageHandler

type MessageHandler interface {
	// ProcessMessage processes any message type into entity states
	ProcessMessage(ctx context.Context, msg any) ([]*gtypes.EntityState, error)

	// ProcessWork processes raw message data from worker pool
	ProcessWork(ctx context.Context, data []byte) error

	// SetIndexManager sets the index manager dependency (for circular dependency resolution)
	SetIndexManager(indexManager IndexManager)
}

MessageHandler handles all message processing business logic

type Metrics

type Metrics struct {
	// Entity extraction metrics
	EntitiesExtracted prometheus.Counter

	// Entity update metrics (StoredMessage path + generic path)
	EntitiesUpdateAttempts prometheus.Counter
	EntitiesUpdateSuccess  prometheus.Counter
	EntitiesUpdateFailed   prometheus.Counter

	// Entity create metrics
	EntitiesCreateAttempts prometheus.Counter
	EntitiesCreateSuccess  prometheus.Counter
	EntitiesCreateFailed   prometheus.Counter

	// Message processing metrics
	MessagesProcessed prometheus.Counter
	MessagesFailed    prometheus.Counter
}

Metrics holds Prometheus metrics for message manager operations

func NewMetrics

func NewMetrics(registry *metric.MetricsRegistry) *Metrics

NewMetrics creates and registers metrics with the provided registry.

type ProcessingStats

type ProcessingStats struct {
	MessagesProcessed int64     `json:"messages_processed"`
	LastActivity      time.Time `json:"last_activity"`
}

ProcessingStats holds processing statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL