Documentation
¶
Overview ¶
Package messagemanager provides message processing and entity extraction for the knowledge graph.
Overview ¶
The messagemanager package converts incoming messages into EntityState objects that can be stored in the knowledge graph. It handles multiple message types: Graphable interfaces, Storable interfaces with ObjectStore references, and legacy map-based messages.
Messages are processed through a unified pipeline that extracts entity IDs, triples (the single source of truth for properties and relationships), and optional storage references. Entity merging is performed atomically using upsert semantics to avoid race conditions.
Architecture ¶
Incoming Message ([]byte)
↓
┌──────────────────────────────────────────────────────────────┐
│ Manager.ProcessWork │
│ 1. Parse BaseMessage envelope │
│ 2. Extract payload type │
│ 3. Route to appropriate handler │
└──────────────────────────────────────────────────────────────┘
↓
┌────────────┬─────────────────┬───────────────────────────────┐
│ Storable │ Graphable │ Map/Legacy │
│ (ObjectStore│ (EntityID + │ (JSON map to │
│ reference) │ Triples) │ entity state) │
└────────────┴─────────────────┴───────────────────────────────┘
↓
┌──────────────────────────────────────────────────────────────┐
│ EntityState (Triples = source of truth) │
│ ↓ │
│ EntityManager.UpsertEntity │
└──────────────────────────────────────────────────────────────┘
Usage ¶
Create and configure the message manager:
config := messagemanager.DefaultConfig()
deps := messagemanager.Dependencies{
EntityManager: entityManager,
IndexManager: indexManager,
Logger: logger,
MetricsRegistry: registry,
}
manager := messagemanager.NewManager(config, deps, func(err string) {
log.Error("Message processing error", "error", err)
})
Process messages from a worker pool:
// ProcessWork handles raw message bytes from worker pool err := manager.ProcessWork(ctx, messageData)
Process messages directly:
// ProcessMessage handles typed messages entityStates, err := manager.ProcessMessage(ctx, graphableMsg)
Message Types ¶
Storable messages:
Messages implementing the Storable interface have content stored in ObjectStore. The manager extracts the StorageReference and passes it to the EntityState:
type Storable interface {
Graphable
StorageRef() *message.StorageReference
}
Graphable messages:
Messages implementing Graphable provide entity ID and triples directly:
type Graphable interface {
EntityID() string
Triples() []message.Triple
}
Map messages:
JSON objects (map[string]any) are converted to entities with auto-generated IDs and map keys becoming triple predicates.
Entity Merging ¶
When processing messages for existing entities, the manager:
- Fetches existing entity state (if any)
- Merges triples using gtypes.MergeTriples
- Increments entity version
- Uses UpsertEntity for atomic persistence
This avoids TOCTOU race conditions in concurrent message processing.
Alias Resolution ¶
Entity IDs can be aliases resolved via IndexManager.ResolveAlias. If resolution fails, the original ID is used as-is.
Configuration ¶
Configuration options:
DefaultNamespace: "default" # Namespace for auto-generated entity IDs DefaultPlatform: "semstreams" # Platform for auto-generated entity IDs
Thread Safety ¶
The Manager is safe for concurrent use. Message processing uses atomic operations for statistics and upsert semantics for entity persistence.
Metrics ¶
The package exports Prometheus metrics:
- messages_processed_total: Total messages processed
- messages_failed_total: Messages that failed processing
- entities_extracted_total: Entities extracted from messages
- entities_update_attempts_total: Entity upsert attempts
- entities_update_success_total: Successful entity upserts
- entities_update_failed_total: Failed entity upserts
See Also ¶
Related packages:
- github.com/c360studio/semstreams/graph: EntityState and Graphable interface
- github.com/c360studio/semstreams/graph/datamanager: Entity persistence
- github.com/c360studio/semstreams/message: Message types and Triple
- github.com/c360studio/semstreams/storage/objectstore: Large content storage
Package messagemanager provides the MessageHandler interface and Manager implementation ¶
Package messagemanager provides the MessageHandler interface and Manager implementation ¶
Package messagemanager handles all message processing business logic
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dependencies ¶
type Dependencies struct {
EntityManager datamanager.EntityManager
IndexManager IndexManager
Logger Logger
MetricsRegistry *metric.MetricsRegistry
}
Dependencies defines all dependencies needed by message manager
type IndexManager ¶
IndexManager interface for index operations
type Logger ¶
type Logger interface {
Debug(msg string, keysAndValues ...any)
Error(msg string, keysAndValues ...any)
Warn(msg string, keysAndValues ...any)
}
Logger interface for logging
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager implements the MessageHandler interface
func NewManager ¶
func NewManager(config Config, deps Dependencies, errorCallback func(string)) *Manager
NewManager creates a new message manager
func (*Manager) GetStats ¶
func (mp *Manager) GetStats() ProcessingStats
GetStats returns processing statistics
func (*Manager) ProcessMessage ¶
ProcessMessage processes any message type into entity states
func (*Manager) ProcessWork ¶
ProcessWork processes raw message data from worker pool
func (*Manager) SetIndexManager ¶
func (mp *Manager) SetIndexManager(indexManager IndexManager)
SetIndexManager sets the index manager dependency (for circular dependency resolution)
type MessageHandler ¶
type MessageHandler interface {
// ProcessMessage processes any message type into entity states
ProcessMessage(ctx context.Context, msg any) ([]*gtypes.EntityState, error)
// ProcessWork processes raw message data from worker pool
ProcessWork(ctx context.Context, data []byte) error
// SetIndexManager sets the index manager dependency (for circular dependency resolution)
SetIndexManager(indexManager IndexManager)
}
MessageHandler handles all message processing business logic
type Metrics ¶
type Metrics struct {
// Entity extraction metrics
EntitiesExtracted prometheus.Counter
// Entity update metrics (StoredMessage path + generic path)
EntitiesUpdateAttempts prometheus.Counter
EntitiesUpdateSuccess prometheus.Counter
EntitiesUpdateFailed prometheus.Counter
// Entity create metrics
EntitiesCreateAttempts prometheus.Counter
EntitiesCreateSuccess prometheus.Counter
EntitiesCreateFailed prometheus.Counter
// Message processing metrics
MessagesProcessed prometheus.Counter
MessagesFailed prometheus.Counter
}
Metrics holds Prometheus metrics for message manager operations
func NewMetrics ¶
func NewMetrics(registry *metric.MetricsRegistry) *Metrics
NewMetrics creates and registers metrics with the provided registry.
type ProcessingStats ¶
type ProcessingStats struct {
MessagesProcessed int64 `json:"messages_processed"`
LastActivity time.Time `json:"last_activity"`
}
ProcessingStats holds processing statistics