Documentation
¶
Overview ¶
Package rule - Action execution for ECA rules
Package rule provides rule-based processing of semantic message streams with support for entity state watching and event generation.
Overview ¶
The rule processor evaluates conditions against streaming messages and entity state changes, generating graph events when rules trigger. Rules are defined using JSON configuration and implemented using the factory pattern.
Architecture ¶
Rules consist of three main components:
- Definition: JSON configuration (conditions, logic, metadata)
- Factory: Creates Rule instances from definitions
- Rule: Evaluates conditions and generates events
Rule Interface ¶
Rules must implement this interface:
type Rule interface {
Name() string
Subscribe() []string
Evaluate(messages []message.Message) bool
ExecuteEvents(messages []message.Message) ([]rtypes.Event, error)
}
Creating Custom Rules ¶
1. Create a RuleFactory:
type MyRuleFactory struct {
ruleType string
}
func (f *MyRuleFactory) Create(id string, def Definition, deps Dependencies) (rtypes.Rule, error) {
return &MyRule{...}, nil
}
func (f *MyRuleFactory) Validate(def Definition) error {
// Validate rule configuration
return nil
}
2. Implement the Rule interface:
type MyRule struct {
id string
conditions []expression.ConditionExpression
}
func (r *MyRule) ExecuteEvents(messages []message.Message) ([]rtypes.Event, error) {
// Generate events directly in Go code
// gtypes.Event implements rtypes.Event interface
event := >ypes.Event{
Type: gtypes.EventEntityUpdate,
EntityID: "alert.my-rule." + r.id,
Properties: map[string]interface{}{
"triggered": true,
"timestamp": time.Now(),
},
}
return []rtypes.Event{event}, nil
}
3. Register the factory:
func init() {
RegisterRuleFactory("my_rule", &MyRuleFactory{ruleType: "my_rule"})
}
Entity State Watching ¶
Rules can watch entity state changes via NATS KV buckets:
{
"id": "my-rule",
"type": "my_rule",
"entity": {
"pattern": "c360.*.robotics.drone.>",
"watch_buckets": ["ENTITY_STATES"]
}
}
The processor watches these buckets using the KV watch pattern and converts entity state updates into messages for rule evaluation.
Runtime Configuration ¶
Rules support dynamic runtime updates via ApplyConfigUpdate():
- Add/remove rules without restart
- Update rule conditions and metadata
- Enable/disable rules on the fly
Event Generation ¶
Rules generate gtypes.Event directly (no template system):
- Events contain: Type, EntityID, Properties, Metadata
- Published to NATS graph subjects
- Consumed by graph processor for entity storage
Metrics ¶
The processor exposes Prometheus metrics:
- semstreams_rule_evaluations_total
- semstreams_rule_triggers_total
- semstreams_rule_evaluation_duration_seconds
- semstreams_rule_errors_total
Example Usage ¶
config := rule.DefaultConfig()
config.Ports = &component.PortConfig{
Inputs: []component.PortDefinition{
{
Name: "semantic_messages",
Type: "nats",
Subject: "process.>",
},
},
}
processor, err := rule.NewProcessor(natsClient, &config)
if err != nil {
log.Fatal(err)
}
err = processor.Initialize()
err = processor.Start(ctx)
For comprehensive documentation, see:
- /Users/coby/Code/c360/semdocs/docs/guides/rules-engine.md
- /Users/coby/Code/c360/semdocs/docs/specs/SPEC-001-generic-rules-engine.md
Package rule - Expression Rule Factory for condition-based rules ¶
Package rule provides interfaces and implementations for rule processing ¶
Package rule - NATS KV Configuration Integration for Rules ¶
Package rule provides a rule processing component that implements the Discoverable interface for processing message streams through rules
Package rule - Rule Factory Pattern for Dynamic Rule Creation ¶
Package rule - State tracking for stateful ECA rules ¶
Package rule - Stateful rule evaluation with state tracking ¶
Package rule - Test Rule Factory for Integration Tests ¶
Package rule provides triple mutation support via NATS request/response
Package rule - Workflow trigger payload for rule-to-workflow integration
Index ¶
- Constants
- Variables
- func AssertEventuallyTrue(t *testing.T, condition func() bool, timeout time.Duration, tick time.Duration, ...)
- func CreateBatteryEntity(id string, level float64, voltage float64) *gtypes.EntityState
- func CreateDroneEntity(id string, armed bool, mode string, altitude float64) *gtypes.EntityState
- func CreateRuleProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func CreateTestEntityPattern(domain, category, instance string) string
- func GetRegisteredRuleTypes() []string
- func GetRuleSchemas() map[string]Schema
- func Register(registry *component.Registry) error
- func RegisterRuleFactory(ruleType string, factory Factory) error
- func SetupKVBucketForTesting(t *testing.T, js jetstream.JetStream, bucketName string) jetstream.KeyValue
- func UnregisterRuleFactory(ruleType string) error
- func VerifyRuleTriggered(_ *testing.T, processor *Processor, ruleName string) bool
- func WaitForEntityProcessing(_ *testing.T, _ time.Duration)
- func WaitForKVWatcher(t *testing.T, processor *Processor, timeout time.Duration)
- type Action
- type ActionExecutor
- func (e *ActionExecutor) Execute(ctx context.Context, action Action, entityID string, relatedID string) error
- func (e *ActionExecutor) ExecuteAddTriple(ctx context.Context, action Action, entityID, relatedID string) (message.Triple, error)
- func (e *ActionExecutor) ExecuteRemoveTriple(ctx context.Context, action Action, entityID, relatedID string) error
- type ActionExecutorInterface
- type BaseRuleFactory
- type Config
- type ConfigManager
- func (rcm *ConfigManager) DeleteRule(ctx context.Context, ruleID string) error
- func (rcm *ConfigManager) GetRule(ctx context.Context, ruleID string) (*Definition, error)
- func (rcm *ConfigManager) InitializeKVStore(natsClient *natsclient.Client) error
- func (rcm *ConfigManager) ListRules(_ context.Context) (map[string]Definition, error)
- func (rcm *ConfigManager) SaveRule(ctx context.Context, ruleID string, ruleDef Definition) error
- func (rcm *ConfigManager) Start(_ context.Context) error
- func (rcm *ConfigManager) Stop() error
- func (rcm *ConfigManager) WatchRules(_ context.Context, _ func(ruleID string, rule Rule, operation string)) error
- type Definition
- type Dependencies
- type EntityConfig
- type EntityContext
- type EntityStateEvaluator
- type Event
- type Example
- type ExpressionRule
- func (r *ExpressionRule) Evaluate(messages []message.Message) bool
- func (r *ExpressionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
- func (r *ExpressionRule) ExecuteEvents(messages []message.Message) ([]Event, error)
- func (r *ExpressionRule) Name() string
- func (r *ExpressionRule) Subscribe() []string
- type ExpressionRuleFactory
- type Factory
- type KVTestHelper
- type MatchState
- type Metrics
- type Processor
- func (rp *Processor) ApplyConfigUpdate(changes map[string]any) error
- func (rp *Processor) ConfigSchema() component.ConfigSchema
- func (rp *Processor) DataFlow() component.FlowMetrics
- func (rp *Processor) DebugStatus() any
- func (rp *Processor) GetRuleMetrics() map[string]any
- func (rp *Processor) GetRuntimeConfig() map[string]any
- func (rp *Processor) Health() component.HealthStatus
- func (rp *Processor) Initialize() error
- func (rp *Processor) InputPorts() []component.Port
- func (rp *Processor) Meta() component.Metadata
- func (rp *Processor) OutputPorts() []component.Port
- func (rp *Processor) Start(ctx context.Context) error
- func (rp *Processor) Stop(_ time.Duration) error
- func (rp *Processor) UpdateWatchBuckets(newBuckets map[string][]string) error
- func (rp *Processor) UpdateWatchPatterns(newPatterns []string) error
- func (rp *Processor) ValidateConfigUpdate(changes map[string]any) error
- type Publisher
- type Rule
- type Schema
- type StateTracker
- func (st *StateTracker) Delete(ctx context.Context, ruleID, entityKey string) error
- func (st *StateTracker) DeleteAllForEntity(ctx context.Context, entityID string) error
- func (st *StateTracker) Get(ctx context.Context, ruleID, entityKey string) (MatchState, error)
- func (st *StateTracker) Set(ctx context.Context, state MatchState) error
- type StatefulEvaluator
- type Status
- type TestRule
- type TestRuleFactory
- type Transition
- type TripleMutator
- type ValidationError
- type WorkflowTriggerPayload
Constants ¶
const ( // ActionTypePublish publishes a message to a NATS subject ActionTypePublish = "publish" // ActionTypeAddTriple creates a relationship triple in the graph ActionTypeAddTriple = "add_triple" // ActionTypeRemoveTriple removes a relationship triple from the graph ActionTypeRemoveTriple = "remove_triple" // ActionTypeUpdateTriple updates metadata on an existing triple ActionTypeUpdateTriple = "update_triple" // ActionTypePublishAgent triggers an agentic loop by publishing a TaskMessage ActionTypePublishAgent = "publish_agent" // ActionTypeTriggerWorkflow triggers a reactive workflow by publishing to workflow.trigger.<workflow_id> ActionTypeTriggerWorkflow = "trigger_workflow" // ActionTypePublishBoidSignal publishes a Boid steering signal for agent coordination ActionTypePublishBoidSignal = "publish_boid_signal" )
Action type constants define the supported action types for rule execution.
const ( SubjectTripleAdd = "graph.mutation.triple.add" SubjectTripleRemove = "graph.mutation.triple.remove" MutationTimeout = 5 * time.Second )
NATS subjects for graph mutations (must match processor/graph/mutations.go)
const ( WorkflowTriggerDomain = "rule" WorkflowTriggerCategory = "workflow_trigger" WorkflowTriggerVersion = "v1" )
Workflow trigger payload type constants. Uses "rule" domain to avoid conflict with processor/workflow's workflow.trigger.v1
Variables ¶
var ErrStateNotFound = errors.New("rule state not found")
ErrStateNotFound is returned when no rule state exists for the given key
Functions ¶
func AssertEventuallyTrue ¶
func AssertEventuallyTrue(t *testing.T, condition func() bool, timeout time.Duration, tick time.Duration, msgAndArgs ...interface{})
AssertEventuallyTrue asserts that a condition becomes true within timeout
func CreateBatteryEntity ¶
func CreateBatteryEntity(id string, level float64, voltage float64) *gtypes.EntityState
CreateBatteryEntity creates a battery entity for testing
func CreateDroneEntity ¶
CreateDroneEntity creates a drone entity for testing
func CreateRuleProcessor ¶
func CreateRuleProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateRuleProcessor creates a rule processor with the new factory pattern
func CreateTestEntityPattern ¶
CreateTestEntityPattern creates entity IDs matching test patterns
func GetRegisteredRuleTypes ¶
func GetRegisteredRuleTypes() []string
GetRegisteredRuleTypes returns all registered rule types
func GetRuleSchemas ¶
GetRuleSchemas returns schemas for all registered rule types
func RegisterRuleFactory ¶
RegisterRuleFactory registers a rule factory
func SetupKVBucketForTesting ¶
func SetupKVBucketForTesting(t *testing.T, js jetstream.JetStream, bucketName string) jetstream.KeyValue
SetupKVBucketForTesting creates and initializes a KV bucket for testing
func UnregisterRuleFactory ¶
UnregisterRuleFactory removes a rule factory for a given type This is primarily for testing or dynamic factory management
func VerifyRuleTriggered ¶
VerifyRuleTriggered checks if a rule has been triggered by examining metrics or events
func WaitForEntityProcessing ¶
WaitForEntityProcessing waits for an entity update to be processed This is useful when testing async KV watchers
Types ¶
type Action ¶
type Action struct {
// Type specifies the action type (publish, add_triple, remove_triple, update_triple, publish_agent)
Type string `json:"type"`
// Subject is the NATS subject for publish actions
Subject string `json:"subject,omitempty"`
// Predicate is the relationship type for triple actions
Predicate string `json:"predicate,omitempty"`
// Object is the target entity or value for triple actions
Object string `json:"object,omitempty"`
// TTL specifies optional expiration time for triples (e.g., "5m", "1h")
TTL string `json:"ttl,omitempty"`
// Properties contains additional metadata for the action
Properties map[string]any `json:"properties,omitempty"`
// Role is the agent role for publish_agent actions (e.g., "general", "architect", "editor")
Role string `json:"role,omitempty"`
// Model is the model endpoint name for publish_agent actions
Model string `json:"model,omitempty"`
// Prompt is the task prompt template for publish_agent actions
// Supports variable substitution: $entity.id, $related.id
Prompt string `json:"prompt,omitempty"`
// WorkflowID is the workflow identifier for trigger_workflow actions
WorkflowID string `json:"workflow_id,omitempty"`
// ContextData provides additional context passed to the workflow
ContextData map[string]any `json:"context_data,omitempty"`
// BoidSignalType specifies the type of boid signal: separation, cohesion, or alignment
BoidSignalType string `json:"boid_signal_type,omitempty"`
// BoidStrength specifies the steering strength for boid signals (0.0-1.0)
BoidStrength float64 `json:"boid_strength,omitempty"`
}
Action represents an action to execute when a rule fires. Actions are triggered by state transitions (OnEnter, OnExit) or while a condition remains true (WhileTrue).
type ActionExecutor ¶
type ActionExecutor struct {
// contains filtered or unexported fields
}
ActionExecutor executes actions for rules. It handles triple mutations, NATS publishing, and other action types.
func NewActionExecutor ¶
func NewActionExecutor(logger *slog.Logger) *ActionExecutor
NewActionExecutor creates a new ActionExecutor with the given logger. If logger is nil, uses the default logger.
func NewActionExecutorFull ¶
func NewActionExecutorFull(logger *slog.Logger, mutator TripleMutator, publisher Publisher) *ActionExecutor
NewActionExecutorFull creates a new ActionExecutor with full functionality. The mutator enables triple persistence, and the publisher enables NATS publishing.
func NewActionExecutorWithMutator ¶
func NewActionExecutorWithMutator(logger *slog.Logger, mutator TripleMutator) *ActionExecutor
NewActionExecutorWithMutator creates a new ActionExecutor with triple mutation support. The mutator enables actual persistence of triple operations via NATS request/response.
func (*ActionExecutor) Execute ¶
func (e *ActionExecutor) Execute(ctx context.Context, action Action, entityID string, relatedID string) error
Execute runs the given action in the context of an entity. The entityID is the subject entity, and relatedID is an optional related entity (used for pair rules and relationship triples).
func (*ActionExecutor) ExecuteAddTriple ¶
func (e *ActionExecutor) ExecuteAddTriple(ctx context.Context, action Action, entityID, relatedID string) (message.Triple, error)
ExecuteAddTriple executes an add_triple action, creating a new semantic triple. Returns the created triple and any error that occurred. If a TripleMutator is configured, the triple is persisted via NATS request/response.
func (*ActionExecutor) ExecuteRemoveTriple ¶
func (e *ActionExecutor) ExecuteRemoveTriple(ctx context.Context, action Action, entityID, relatedID string) error
ExecuteRemoveTriple executes a remove_triple action, removing a semantic triple. If a TripleMutator is configured, the triple is removed via NATS request/response.
type ActionExecutorInterface ¶
type ActionExecutorInterface interface {
Execute(ctx context.Context, action Action, entityID string, relatedID string) error
}
ActionExecutorInterface defines the interface for action execution. This allows for easy mocking in tests.
type BaseRuleFactory ¶
type BaseRuleFactory struct {
// contains filtered or unexported fields
}
BaseRuleFactory provides common factory functionality
func NewBaseRuleFactory ¶
func NewBaseRuleFactory(ruleType, displayName, description, category string) *BaseRuleFactory
NewBaseRuleFactory creates a base factory implementation
func (*BaseRuleFactory) ValidateExpression ¶
func (f *BaseRuleFactory) ValidateExpression(def Definition) error
ValidateExpression validates expression configuration
type Config ¶
type Config struct {
// Port configuration for inputs and outputs
Ports *component.PortConfig `` /* 168-byte string literal not displayed */
// Rule configuration sources
RulesFiles []string `json:"rules_files" schema:"type:array,description:Paths to JSON rule definition files,default:[],category:basic"`
InlineRules []Definition `json:"inline_rules,omitempty" schema:"type:array,description:Inline rule definitions (alternative to files),category:basic"`
// Message cache configuration (not exposed in schema - internal config)
MessageCache cache.Config `json:"message_cache"`
// Buffer window size for time-window analysis
BufferWindowSize string `` /* 135-byte string literal not displayed */
// Alert cooldown period to prevent spam
AlertCooldownPeriod string `` /* 139-byte string literal not displayed */
// Graph processor integration
EnableGraphIntegration bool `` /* 130-byte string literal not displayed */
// NATS KV patterns to watch for entity changes (e.g., 'telemetry.robotics.>')
// DEPRECATED: Use EntityWatchBuckets for multi-bucket support. This field is still
// supported for backwards compatibility and applies to ENTITY_STATES bucket.
EntityWatchPatterns []string `` /* 153-byte string literal not displayed */
// EntityWatchBuckets maps bucket names to watch patterns.
// This enables rules to observe operational results from multiple components.
// Example: {"ENTITY_STATES": ["telemetry.>"], "WORKFLOW_EXECUTIONS": ["COMPLETE_*"]}
// If not specified, falls back to EntityWatchPatterns for ENTITY_STATES bucket.
EntityWatchBuckets map[string][]string `` /* 147-byte string literal not displayed */
// Debounce delay for rule evaluation (settling time for entity state)
// Default is 0 (disabled) to ensure rules evaluate against each state change.
// Set to a positive value (e.g., 100) to batch rapid updates and evaluate final state only.
DebounceDelayMs time.Duration `` /* 146-byte string literal not displayed */
// JetStream consumer configuration (not exposed in schema - internal config)
Consumer struct {
Enabled bool `json:"enabled"` // Enable JetStream consumer
AckWaitSeconds int `json:"ack_wait_seconds"` // Acknowledgment timeout
MaxDeliver int `json:"max_deliver"` // Max delivery attempts
ReplayPolicy string `json:"replay_policy"` // "instant" or "original"
} `json:"consumer"`
}
Config holds configuration for the RuleProcessor
func CreateRuleTestConfig ¶
CreateRuleTestConfig creates a test configuration for rule processor
func (Config) MarshalJSON ¶
MarshalJSON implements custom JSON marshaling for Config
func (*Config) UnmarshalJSON ¶
UnmarshalJSON implements custom JSON unmarshaling for Config
type ConfigManager ¶
type ConfigManager struct {
// contains filtered or unexported fields
}
ConfigManager manages rules through NATS KV configuration
func NewConfigManager ¶
func NewConfigManager(processor *Processor, configMgr *config.Manager, logger *slog.Logger) *ConfigManager
NewConfigManager creates a new rule configuration manager
func (*ConfigManager) DeleteRule ¶
func (rcm *ConfigManager) DeleteRule(ctx context.Context, ruleID string) error
DeleteRule removes a rule configuration from NATS KV
func (*ConfigManager) GetRule ¶
func (rcm *ConfigManager) GetRule(ctx context.Context, ruleID string) (*Definition, error)
GetRule retrieves a rule configuration from NATS KV
func (*ConfigManager) InitializeKVStore ¶
func (rcm *ConfigManager) InitializeKVStore(natsClient *natsclient.Client) error
InitializeKVStore initializes the KVStore for direct KV operations
func (*ConfigManager) ListRules ¶
func (rcm *ConfigManager) ListRules(_ context.Context) (map[string]Definition, error)
ListRules returns all rule configurations
func (*ConfigManager) SaveRule ¶
func (rcm *ConfigManager) SaveRule(ctx context.Context, ruleID string, ruleDef Definition) error
SaveRule saves a rule configuration to NATS KV
func (*ConfigManager) Start ¶
func (rcm *ConfigManager) Start(_ context.Context) error
Start begins watching for rule configuration updates
func (*ConfigManager) Stop ¶
func (rcm *ConfigManager) Stop() error
Stop stops the configuration manager
func (*ConfigManager) WatchRules ¶
func (rcm *ConfigManager) WatchRules(_ context.Context, _ func(ruleID string, rule Rule, operation string)) error
WatchRules watches for rule changes and returns active rules
type Definition ¶
type Definition struct {
ID string `json:"id"`
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
Enabled bool `json:"enabled"`
Conditions []expression.ConditionExpression `json:"conditions"`
Logic string `json:"logic"`
Cooldown string `json:"cooldown,omitempty"`
Entity EntityConfig `json:"entity,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
OnEnter []Action `json:"on_enter,omitempty"` // Fires on false→true transition
OnExit []Action `json:"on_exit,omitempty"` // Fires on true→false transition
WhileTrue []Action `json:"while_true,omitempty"` // Fires on every update while true
RelatedPatterns []string `json:"related_patterns,omitempty"` // For pair rules
}
Definition represents a JSON rule configuration
type Dependencies ¶
type Dependencies struct {
NATSClient *natsclient.Client
Logger *slog.Logger
}
Dependencies provides dependencies for rule creation
type EntityConfig ¶
type EntityConfig struct {
Pattern string `json:"pattern"` // Entity ID pattern to match
WatchBuckets []string `json:"watch_buckets"` // KV buckets to watch
}
EntityConfig defines entity-specific configuration
type EntityContext ¶
type EntityContext struct {
ID string // The entity/loop ID
Role string // Agent role (architect, editor, general, etc.)
Result string // The result/output content
Model string // Model used for the agent
TaskID string // Task identifier
ParentLoop string // Parent loop ID (for chained workflows)
Iterations int // Number of iterations completed
}
EntityContext provides entity data for variable substitution in rules. This is used for rules-based workflow orchestration where completion state from agentic loops is used to trigger follow-up actions.
type EntityStateEvaluator ¶
type EntityStateEvaluator interface {
// EvaluateEntityState evaluates the rule directly against EntityState triples.
// Rule conditions should use full predicate paths (e.g., "sensor.measurement.fahrenheit").
EvaluateEntityState(entityState *gtypes.EntityState) bool
}
EntityStateEvaluator is an optional interface for rules that can evaluate directly against EntityState triples, bypassing the message transformation layer. This is more efficient for rules that need to access triple predicates directly.
type Event ¶
type Event interface {
// EventType returns the type identifier for this event
EventType() string
// Subject returns the NATS subject for publishing this event
Subject() string
// Payload returns the event data as a generic map
Payload() map[string]any
// Validate checks if the event is valid and ready to publish
Validate() error
}
Event represents a generic event that can be published by rules. This interface allows rules to be decoupled from specific event types while still integrating with graph events when EnableGraphIntegration is enabled.
type Example ¶
type Example struct {
Name string `json:"name"`
Description string `json:"description"`
Config Definition `json:"config"`
}
Example provides example configurations
type ExpressionRule ¶
type ExpressionRule struct {
// contains filtered or unexported fields
}
ExpressionRule implements Rule interface for expression-based condition evaluation
func NewExpressionRule ¶
func NewExpressionRule(def Definition) (*ExpressionRule, error)
NewExpressionRule creates a new expression-based rule
func (*ExpressionRule) Evaluate ¶
func (r *ExpressionRule) Evaluate(messages []message.Message) bool
Evaluate evaluates the rule against messages
func (*ExpressionRule) EvaluateEntityState ¶
func (r *ExpressionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
EvaluateEntityState evaluates the rule directly against EntityState triples. This bypasses the message transformation layer and evaluates conditions directly against triple predicates (e.g., "sensor.measurement.fahrenheit").
func (*ExpressionRule) ExecuteEvents ¶
func (r *ExpressionRule) ExecuteEvents(messages []message.Message) ([]Event, error)
ExecuteEvents generates events when rule triggers
func (*ExpressionRule) Subscribe ¶
func (r *ExpressionRule) Subscribe() []string
Subscribe returns subjects this rule subscribes to
type ExpressionRuleFactory ¶
type ExpressionRuleFactory struct {
// contains filtered or unexported fields
}
ExpressionRuleFactory creates expression-based rules
func NewExpressionRuleFactory ¶
func NewExpressionRuleFactory() *ExpressionRuleFactory
NewExpressionRuleFactory creates a new expression rule factory
func (*ExpressionRuleFactory) Create ¶
func (f *ExpressionRuleFactory) Create(_ string, def Definition, _ Dependencies) (Rule, error)
Create creates an expression rule from definition
func (*ExpressionRuleFactory) Schema ¶
func (f *ExpressionRuleFactory) Schema() Schema
Schema returns the expression rule schema
func (*ExpressionRuleFactory) Type ¶
func (f *ExpressionRuleFactory) Type() string
Type returns the factory type
func (*ExpressionRuleFactory) Validate ¶
func (f *ExpressionRuleFactory) Validate(def Definition) error
Validate validates the rule definition
type Factory ¶
type Factory interface {
// Create creates a rule instance from configuration
Create(id string, config Definition, deps Dependencies) (Rule, error)
// Type returns the rule type this factory creates
Type() string
// Schema returns the configuration schema for UI discovery
Schema() Schema
// Validate validates a rule configuration
Validate(config Definition) error
}
Factory creates rules from configuration
func GetRuleFactory ¶
GetRuleFactory returns a registered rule factory
type KVTestHelper ¶
type KVTestHelper struct {
// contains filtered or unexported fields
}
KVTestHelper provides utilities for KV-based rule testing
func NewKVTestHelper ¶
func NewKVTestHelper(t *testing.T, bucket jetstream.KeyValue) *KVTestHelper
NewKVTestHelper creates a new KV test helper
func (*KVTestHelper) DeleteEntity ¶
func (h *KVTestHelper) DeleteEntity(entityID string)
DeleteEntity removes an entity from the KV bucket
func (*KVTestHelper) UpdateEntityProperty ¶
func (h *KVTestHelper) UpdateEntityProperty(entityID string, predicateFull string, value interface{})
UpdateEntityProperty updates a specific property of an entity
func (*KVTestHelper) WriteEntityState ¶
func (h *KVTestHelper) WriteEntityState(entity *gtypes.EntityState)
WriteEntityState writes an entity state to the KV bucket
type MatchState ¶
type MatchState struct {
RuleID string `json:"rule_id"`
EntityKey string `json:"entity_key"`
IsMatching bool `json:"is_matching"`
LastTransition string `json:"last_transition"` // ""|"entered"|"exited"
TransitionAt time.Time `json:"transition_at,omitempty"`
SourceRevision uint64 `json:"source_revision"`
LastChecked time.Time `json:"last_checked"`
}
MatchState tracks whether a rule's condition is currently matching for a specific entity or entity pair.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics holds Prometheus metrics for RuleProcessor component
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a component that processes messages through rules
func NewProcessor ¶
func NewProcessor(natsClient *natsclient.Client, config *Config) (*Processor, error)
NewProcessor creates a new rule processor
func NewProcessorWithMetrics ¶
func NewProcessorWithMetrics(natsClient *natsclient.Client, config *Config, metricsRegistry *metric.MetricsRegistry) (*Processor, error)
NewProcessorWithMetrics creates a new rule processor with optional metrics
func (*Processor) ApplyConfigUpdate ¶
ApplyConfigUpdate applies validated configuration changes
func (*Processor) ConfigSchema ¶
func (rp *Processor) ConfigSchema() component.ConfigSchema
ConfigSchema returns configuration schema for component interface
func (*Processor) DataFlow ¶
func (rp *Processor) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Processor) DebugStatus ¶
DebugStatus returns extended debug information for the rule processor. Implements component.DebugStatusProvider.
func (*Processor) GetRuleMetrics ¶
GetRuleMetrics returns metrics for all rules
func (*Processor) GetRuntimeConfig ¶
GetRuntimeConfig returns current runtime configuration
func (*Processor) Health ¶
func (rp *Processor) Health() component.HealthStatus
Health returns current health status
func (*Processor) Initialize ¶
Initialize loads rules and prepares the processor
func (*Processor) InputPorts ¶
InputPorts returns declared input ports
func (*Processor) OutputPorts ¶
OutputPorts returns declared output ports
func (*Processor) UpdateWatchBuckets ¶
UpdateWatchBuckets dynamically updates the entity watch buckets and patterns.
func (*Processor) UpdateWatchPatterns ¶
UpdateWatchPatterns dynamically updates the entity watch patterns for ENTITY_STATES. DEPRECATED: Use UpdateWatchBuckets for multi-bucket support.
type Publisher ¶
type Publisher interface {
// Publish sends a message to a NATS subject.
// The implementation determines whether to use core NATS or JetStream
// based on port configuration.
Publish(ctx context.Context, subject string, data []byte) error
}
Publisher handles publishing messages to NATS subjects. It abstracts the decision between core NATS and JetStream publishing.
type Rule ¶
type Rule interface {
// Name returns the human-readable name of this rule
Name() string
// Subscribe returns the NATS subjects this rule should listen to
Subscribe() []string
// Evaluate checks if the rule conditions are met for the given messages
Evaluate(messages []message.Message) bool
// ExecuteEvents generates events when rule conditions are satisfied
ExecuteEvents(messages []message.Message) ([]Event, error)
}
Rule interface defines the event-based contract for processing rules. Rules evaluate messages and generate events when conditions are met.
func CreateRuleFromDefinition ¶
func CreateRuleFromDefinition(def Definition, deps Dependencies) (Rule, error)
CreateRuleFromDefinition creates a rule using the appropriate factory
type Schema ¶
type Schema struct {
Type string `json:"type"`
DisplayName string `json:"display_name"`
Description string `json:"description"`
Category string `json:"category"`
Icon string `json:"icon,omitempty"`
Properties map[string]component.PropertySchema `json:"properties"`
Required []string `json:"required"`
Examples []Example `json:"examples,omitempty"`
}
Schema describes the configuration schema for a rule type
type StateTracker ¶
type StateTracker struct {
// contains filtered or unexported fields
}
StateTracker manages rule match state persistence in NATS KV. It tracks which rules are currently matching for which entities to enable proper transition detection for stateful ECA rules.
func NewStateTracker ¶
func NewStateTracker(bucket jetstream.KeyValue, logger *slog.Logger) *StateTracker
NewStateTracker creates a new StateTracker with the given KV bucket.
func (*StateTracker) Delete ¶
func (st *StateTracker) Delete(ctx context.Context, ruleID, entityKey string) error
Delete removes the match state for a rule and entity key.
func (*StateTracker) DeleteAllForEntity ¶
func (st *StateTracker) DeleteAllForEntity(ctx context.Context, entityID string) error
DeleteAllForEntity removes all rule states associated with an entity. This is called when an entity is deleted to clean up orphaned state.
func (*StateTracker) Get ¶
func (st *StateTracker) Get(ctx context.Context, ruleID, entityKey string) (MatchState, error)
Get retrieves the current match state for a rule and entity key. Returns ErrStateNotFound if no state exists for this combination.
func (*StateTracker) Set ¶
func (st *StateTracker) Set(ctx context.Context, state MatchState) error
Set persists the match state for a rule and entity.
type StatefulEvaluator ¶
type StatefulEvaluator struct {
// contains filtered or unexported fields
}
StatefulEvaluator handles rule evaluation with state tracking. It manages state transitions (entered/exited/none) and executes the appropriate actions based on the transition type.
func NewStatefulEvaluator ¶
func NewStatefulEvaluator(stateTracker *StateTracker, actionExecutor ActionExecutorInterface, logger *slog.Logger) *StatefulEvaluator
NewStatefulEvaluator creates a new stateful evaluator with the given dependencies. If logger is nil, uses the default logger.
func (*StatefulEvaluator) EvaluateWithState ¶
func (e *StatefulEvaluator) EvaluateWithState( ctx context.Context, ruleDef Definition, entityID string, relatedID string, currentlyMatching bool, ) (Transition, error)
EvaluateWithState evaluates a rule and fires appropriate actions based on state transitions. It:
- Retrieves previous state from StateTracker (treats missing state as false)
- Detects transition (entered/exited/none) by comparing previous and current match state
- Fires appropriate actions based on transition: - TransitionEntered: Execute all OnEnter actions - TransitionExited: Execute all OnExit actions - TransitionNone + currentlyMatching: Execute all WhileTrue actions
- Persists new state to StateTracker
Returns the transition that occurred and any error encountered.
type Status ¶
type Status struct {
DebounceDelayMs int `json:"debounce_delay_ms"`
PendingEvaluations int `json:"pending_evaluations"`
TotalEvaluations int `json:"total_evaluations"`
TotalTriggers int `json:"total_triggers"`
DebouncedCount int `json:"debounced_count"` // Matches Tester's test expectations
RulesLoaded int `json:"rules_loaded"`
LastEvaluationTime time.Time `json:"last_evaluation_time,omitempty"`
}
Status represents the current status of rule evaluation for debug observability
type TestRule ¶
type TestRule struct {
// contains filtered or unexported fields
}
TestRule is a functional rule implementation for testing
func NewTestRule ¶
func NewTestRule(id, name string, subjects []string, conditions []expression.ConditionExpression) *TestRule
NewTestRule creates a new test rule
func (*TestRule) EvaluateEntityState ¶
func (r *TestRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
EvaluateEntityState evaluates the rule directly against EntityState triples. This implements the EntityStateEvaluator interface for KV watch-based evaluation.
func (*TestRule) ExecuteEvents ¶
ExecuteEvents generates events when rule triggers
type TestRuleFactory ¶
type TestRuleFactory struct {
// contains filtered or unexported fields
}
TestRuleFactory creates test rules for integration testing
func NewTestRuleFactory ¶
func NewTestRuleFactory() *TestRuleFactory
NewTestRuleFactory creates a new test rule factory
func (*TestRuleFactory) Create ¶
func (f *TestRuleFactory) Create(ruleID string, def Definition, _ Dependencies) (Rule, error)
Create creates a test rule from definition
func (*TestRuleFactory) Examples ¶
func (f *TestRuleFactory) Examples() []Example
Examples returns test rule examples
func (*TestRuleFactory) Schema ¶
func (f *TestRuleFactory) Schema() Schema
Schema returns the test rule schema
func (*TestRuleFactory) Type ¶
func (f *TestRuleFactory) Type() string
Type returns the factory type
func (*TestRuleFactory) Validate ¶
func (f *TestRuleFactory) Validate(_ Definition) error
Validate validates the rule definition
type Transition ¶
type Transition string
Transition represents the type of state change detected
const ( // TransitionNone indicates no state change TransitionNone Transition = "" // TransitionEntered indicates condition became true TransitionEntered Transition = "entered" // TransitionExited indicates condition became false TransitionExited Transition = "exited" )
func DetectTransition ¶
func DetectTransition(wasMatching, nowMatching bool) Transition
DetectTransition compares previous and current match state to determine transition type. Returns TransitionEntered if condition changed from false to true. Returns TransitionExited if condition changed from true to false. Returns TransitionNone if state did not change.
type TripleMutator ¶
type TripleMutator interface {
// AddTriple adds a triple via NATS request/response and returns the KV revision
AddTriple(ctx context.Context, triple message.Triple) (uint64, error)
// RemoveTriple removes a triple via NATS request/response and returns the KV revision
RemoveTriple(ctx context.Context, subject, predicate string) (uint64, error)
}
TripleMutator handles triple mutations via NATS request/response. The returned uint64 is the KV revision after the write, used for feedback loop prevention.
type ValidationError ¶
ValidationError represents a payload validation error.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
type WorkflowTriggerPayload ¶
type WorkflowTriggerPayload struct {
WorkflowID string `json:"workflow_id"`
EntityID string `json:"entity_id"`
TriggeredAt time.Time `json:"triggered_at"`
RelatedID string `json:"related_id,omitempty"`
Context map[string]any `json:"context,omitempty"`
}
WorkflowTriggerPayload represents a message sent by the rule processor to trigger a reactive workflow. This payload is wrapped in a BaseMessage for proper deserialization by the reactive workflow engine.
func (*WorkflowTriggerPayload) MarshalJSON ¶
func (p *WorkflowTriggerPayload) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler. This marshals just the payload fields - BaseMessage handles the wrapping.
func (*WorkflowTriggerPayload) Schema ¶
func (p *WorkflowTriggerPayload) Schema() message.Type
Schema returns the message type for workflow trigger payloads.
func (*WorkflowTriggerPayload) UnmarshalJSON ¶
func (p *WorkflowTriggerPayload) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler.
func (*WorkflowTriggerPayload) Validate ¶
func (p *WorkflowTriggerPayload) Validate() error
Validate ensures the payload has required fields.
Source Files
¶
- actions.go
- config.go
- config_validation.go
- doc.go
- entity_watcher.go
- expression_factory.go
- factory.go
- interfaces.go
- kv_config_integration.go
- kv_test_helpers.go
- message_handler.go
- metrics.go
- processor.go
- publisher.go
- rule_factory.go
- rule_loader.go
- runtime_config.go
- state_tracker.go
- stateful_evaluator.go
- status.go
- test_rule_factory.go
- triple_mutator.go
- workflow_trigger_payload.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
|
Package boid implements Boids-inspired local coordination rules for multi-agent teams. |
|
Package expression - Expression evaluator implementation
|
Package expression - Expression evaluator implementation |