rule

package
v1.0.0-alpha.35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 26 Imported by: 0

README

Rule Processor

Processes message streams through configurable rules and evaluates conditions against semantic messages and entity state changes.

What It Does

  • Watches ENTITY_STATES KV bucket for entity changes
  • Evaluates rules against semantic messages and entity state
  • Publishes rule events and graph mutation requests
  • Provides time-window analysis with message buffering

Quick Start

config := rule.DefaultConfig()
config.Ports = &component.PortsDefinition{
    Inputs: []component.PortDefinition{{Subject: "process.>"}},
    Outputs: []component.PortDefinition{{Subject: "rule.events.>"}},
}

processor, err := rule.NewProcessor(natsClient, &config)
if err != nil {
    log.Fatal(err)
}

if err := processor.Initialize(); err != nil {
    log.Fatal(err)
}

if err := processor.Start(ctx); err != nil {
    log.Fatal(err)
}

Rule Definition

Rules are defined in JSON:

{
  "id": "battery-low",
  "name": "Battery Low Alert",
  "enabled": true,
  "conditions": [
    {"field": "drone.telemetry.battery", "operator": "lt", "value": 20}
  ],
  "on_enter": [
    {"type": "add_triple", "predicate": "alert.status", "object": "battery_low"},
    {"type": "publish", "subject": "alerts.battery"}
  ],
  "on_exit": [
    {"type": "remove_triple", "predicate": "alert.status"}
  ]
}

Documentation

Topic Location
Conceptual Overview docs/advanced/06-rules-engine.md
Rule Syntax docs/syntax.md
Conditions docs/conditions.md
Actions docs/actions.md
State Tracking docs/state-tracking.md
Entity Watching docs/entity-watching.md
Configuration docs/configuration.md
Custom Rules docs/custom-rules.md
Operations docs/operations.md
Examples docs/examples.md

Package Structure

File Purpose
processor.go Core processor, lifecycle, ports
config.go Config struct and defaults
factory.go Component registration
entity_watcher.go KV entity state watching
message_handler.go Message processing and rule evaluation
rule_loader.go Rule loading from JSON files
publisher.go Event publishing to NATS
metrics.go Prometheus metrics

Metrics

Key metrics exposed:

Metric Type Description
semstreams_rule_evaluations_total Counter Rule evaluations performed
semstreams_rule_triggers_total Counter Successful rule triggers
semstreams_rule_evaluation_duration_seconds Histogram Evaluation latency
semstreams_rule_active_rules Gauge Active rules count
semstreams_rule_errors_total Counter Processing errors

Design Decisions

Entity ID Format

Uses 6-part hierarchical dotted notation for global uniqueness:

<org>.<platform>.<system>.<domain>.<type>.<instance>
Example: c360.platform1.gcs1.robotics.drone.1
Nil Safety Pattern

Metrics use "nil input = nil feature" pattern - zero overhead when disabled.

Graph Integration

When enabled (default), rule actions directly affect the graph via add_triple and remove_triple actions.

Documentation

Overview

Package rule - Action execution for ECA rules

Package rule provides rule-based processing of semantic message streams with support for entity state watching and event generation.

Overview

The rule processor evaluates conditions against streaming messages and entity state changes, generating graph events when rules trigger. Rules are defined using JSON configuration and implemented using the factory pattern.

Architecture

Rules consist of three main components:

  • Definition: JSON configuration (conditions, logic, metadata)
  • Factory: Creates Rule instances from definitions
  • Rule: Evaluates conditions and generates events

Rule Interface

Rules must implement this interface:

type Rule interface {
    Name() string
    Subscribe() []string
    Evaluate(messages []message.Message) bool
    ExecuteEvents(messages []message.Message) ([]rtypes.Event, error)
}

Creating Custom Rules

1. Create a RuleFactory:

type MyRuleFactory struct {
    ruleType string
}

func (f *MyRuleFactory) Create(id string, def Definition, deps Dependencies) (rtypes.Rule, error) {
    return &MyRule{...}, nil
}

func (f *MyRuleFactory) Validate(def Definition) error {
    // Validate rule configuration
    return nil
}

2. Implement the Rule interface:

type MyRule struct {
    id         string
    conditions []expression.ConditionExpression
}

func (r *MyRule) ExecuteEvents(messages []message.Message) ([]rtypes.Event, error) {
    // Generate events directly in Go code
    // gtypes.Event implements rtypes.Event interface
    event := &gtypes.Event{
        Type:     gtypes.EventEntityUpdate,
        EntityID: "alert.my-rule." + r.id,
        Properties: map[string]interface{}{
            "triggered": true,
            "timestamp": time.Now(),
        },
    }
    return []rtypes.Event{event}, nil
}

3. Register the factory:

func init() {
    RegisterRuleFactory("my_rule", &MyRuleFactory{ruleType: "my_rule"})
}

Entity State Watching

Rules can watch entity state changes via NATS KV buckets:

{
  "id": "my-rule",
  "type": "my_rule",
  "entity": {
    "pattern": "c360.*.robotics.drone.>",
    "watch_buckets": ["ENTITY_STATES"]
  }
}

The processor watches these buckets using the KV watch pattern and converts entity state updates into messages for rule evaluation.

Runtime Configuration

Rules support dynamic runtime updates via ApplyConfigUpdate():

  • Add/remove rules without restart
  • Update rule conditions and metadata
  • Enable/disable rules on the fly

Event Generation

Rules generate gtypes.Event directly (no template system):

  • Events contain: Type, EntityID, Properties, Metadata
  • Published to NATS graph subjects
  • Consumed by graph processor for entity storage

Metrics

The processor exposes Prometheus metrics:

  • semstreams_rule_evaluations_total
  • semstreams_rule_triggers_total
  • semstreams_rule_evaluation_duration_seconds
  • semstreams_rule_errors_total

Example Usage

config := rule.DefaultConfig()
config.Ports = &component.PortConfig{
    Inputs: []component.PortDefinition{
        {
            Name:    "semantic_messages",
            Type:    "nats",
            Subject: "process.>",
        },
    },
}

processor, err := rule.NewProcessor(natsClient, &config)
if err != nil {
    log.Fatal(err)
}
err = processor.Initialize()
err = processor.Start(ctx)

For comprehensive documentation, see:

  • /Users/coby/Code/c360/semdocs/docs/guides/rules-engine.md
  • /Users/coby/Code/c360/semdocs/docs/specs/SPEC-001-generic-rules-engine.md

Package rule - Execution context for rule actions

Package rule - Expression Rule Factory for condition-based rules

Package rule provides interfaces and implementations for rule processing

Package rule - NATS KV Configuration Integration for Rules

Package rule provides a rule processing component that implements the Discoverable interface for processing message streams through rules

Package rule - Rule Factory Pattern for Dynamic Rule Creation

Package rule - State tracking for stateful ECA rules

Package rule - Stateful rule evaluation with state tracking

Package rule - Test Rule Factory for Integration Tests

Package rule provides triple mutation support via NATS request/response

Package rule - Workflow trigger payload for rule-to-workflow integration

Index

Constants

View Source
const (
	// ActionTypePublish publishes a message to a NATS subject
	ActionTypePublish = "publish"
	// ActionTypeAddTriple creates a relationship triple in the graph
	ActionTypeAddTriple = "add_triple"
	// ActionTypeRemoveTriple removes a relationship triple from the graph
	ActionTypeRemoveTriple = "remove_triple"
	// ActionTypeUpdateTriple updates metadata on an existing triple
	ActionTypeUpdateTriple = "update_triple"
	// ActionTypePublishAgent triggers an agentic loop by publishing a TaskMessage
	ActionTypePublishAgent = "publish_agent"
	// ActionTypeTriggerWorkflow triggers a reactive workflow by publishing to workflow.trigger.<workflow_id>
	ActionTypeTriggerWorkflow = "trigger_workflow"
	// ActionTypePublishBoidSignal publishes a Boid steering signal for agent coordination
	ActionTypePublishBoidSignal = "publish_boid_signal"
)

Action type constants define the supported action types for rule execution.

View Source
const (
	SubjectTripleAdd    = "graph.mutation.triple.add"
	SubjectTripleRemove = "graph.mutation.triple.remove"
	MutationTimeout     = 5 * time.Second
)

NATS subjects for graph mutations (must match processor/graph/mutations.go)

View Source
const (
	WorkflowTriggerDomain   = "rule"
	WorkflowTriggerCategory = "workflow_trigger"
	WorkflowTriggerVersion  = "v1"
)

Workflow trigger payload type constants. Uses "rule" domain to avoid conflict with processor/workflow's workflow.trigger.v1

Variables

View Source
var ErrStateNotFound = errors.New("rule state not found")

ErrStateNotFound is returned when no rule state exists for the given key

Functions

func AssertEventuallyTrue

func AssertEventuallyTrue(t *testing.T, condition func() bool, timeout time.Duration, tick time.Duration, msgAndArgs ...interface{})

AssertEventuallyTrue asserts that a condition becomes true within timeout

func CreateBatteryEntity

func CreateBatteryEntity(id string, level float64, voltage float64) *gtypes.EntityState

CreateBatteryEntity creates a battery entity for testing

func CreateDroneEntity

func CreateDroneEntity(id string, armed bool, mode string, altitude float64) *gtypes.EntityState

CreateDroneEntity creates a drone entity for testing

func CreateRuleProcessor

func CreateRuleProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateRuleProcessor creates a rule processor with the new factory pattern

func CreateTestEntityPattern

func CreateTestEntityPattern(domain, category, instance string) string

CreateTestEntityPattern creates entity IDs matching test patterns

func GetRegisteredRuleTypes

func GetRegisteredRuleTypes() []string

GetRegisteredRuleTypes returns all registered rule types

func GetRuleSchemas

func GetRuleSchemas() map[string]Schema

GetRuleSchemas returns schemas for all registered rule types

func Register

func Register(registry *component.Registry) error

Register registers the rule processor component with the given registry

func RegisterRuleFactory

func RegisterRuleFactory(ruleType string, factory Factory) error

RegisterRuleFactory registers a rule factory

func SetupKVBucketForTesting

func SetupKVBucketForTesting(t *testing.T, js jetstream.JetStream, bucketName string) jetstream.KeyValue

SetupKVBucketForTesting creates and initializes a KV bucket for testing

func UnregisterRuleFactory

func UnregisterRuleFactory(ruleType string) error

UnregisterRuleFactory removes a rule factory for a given type This is primarily for testing or dynamic factory management

func VerifyRuleTriggered

func VerifyRuleTriggered(_ *testing.T, processor *Processor, ruleName string) bool

VerifyRuleTriggered checks if a rule has been triggered by examining metrics or events

func WaitForEntityProcessing

func WaitForEntityProcessing(_ *testing.T, _ time.Duration)

WaitForEntityProcessing waits for an entity update to be processed This is useful when testing async KV watchers

func WaitForKVWatcher

func WaitForKVWatcher(t *testing.T, processor *Processor, timeout time.Duration)

WaitForKVWatcher waits for a KV watcher to be ready

Types

type Action

type Action struct {
	// Type specifies the action type (publish, add_triple, remove_triple, update_triple, publish_agent)
	Type string `json:"type"`

	// Subject is the NATS subject for publish actions
	Subject string `json:"subject,omitempty"`

	// Predicate is the relationship type for triple actions
	Predicate string `json:"predicate,omitempty"`

	// Object is the target entity or value for triple actions
	Object string `json:"object,omitempty"`

	// TTL specifies optional expiration time for triples (e.g., "5m", "1h")
	TTL string `json:"ttl,omitempty"`

	// Properties contains additional metadata for the action
	Properties map[string]any `json:"properties,omitempty"`

	// Role is the agent role for publish_agent actions (e.g., "general", "architect", "editor")
	Role string `json:"role,omitempty"`

	// Model is the model endpoint name for publish_agent actions
	Model string `json:"model,omitempty"`

	// Prompt is the task prompt template for publish_agent actions
	// Supports variable substitution: $entity.id, $related.id
	Prompt string `json:"prompt,omitempty"`

	// WorkflowID is the workflow identifier for trigger_workflow actions
	WorkflowID string `json:"workflow_id,omitempty"`

	// ContextData provides additional context passed to the workflow
	ContextData map[string]any `json:"context_data,omitempty"`

	// BoidSignalType specifies the type of boid signal: separation, cohesion, or alignment
	BoidSignalType string `json:"boid_signal_type,omitempty"`

	// BoidStrength specifies the steering strength for boid signals (0.0-1.0)
	BoidStrength float64 `json:"boid_strength,omitempty"`

	// WorkflowSlug identifies the workflow for publish_agent actions (e.g., "github-issue-to-pr")
	WorkflowSlug string `json:"workflow_slug,omitempty"`

	// WorkflowStep identifies the step within the workflow (e.g., "qualify", "develop", "review")
	WorkflowStep string `json:"workflow_step,omitempty"`

	// When is an optional guard clause for conditional action execution.
	// If present, all conditions must match (AND logic) against the entity's current
	// state and $state.* fields for the action to execute. Actions without When always execute.
	When []expression.ConditionExpression `json:"when,omitempty"`
}

Action represents an action to execute when a rule fires. Actions are triggered by state transitions (OnEnter, OnExit) or while a condition remains true (WhileTrue).

func (Action) ParseTTL

func (a Action) ParseTTL() (time.Duration, error)

ParseTTL parses the TTL string into a duration. Returns 0 duration if TTL is empty (no expiration). Returns an error if the TTL format is invalid or negative.

type ActionExecutor

type ActionExecutor struct {
	// contains filtered or unexported fields
}

ActionExecutor executes actions for rules. It handles triple mutations, NATS publishing, and other action types.

func NewActionExecutor

func NewActionExecutor(logger *slog.Logger) *ActionExecutor

NewActionExecutor creates a new ActionExecutor with the given logger. If logger is nil, uses the default logger.

func NewActionExecutorFull

func NewActionExecutorFull(logger *slog.Logger, mutator TripleMutator, publisher Publisher) *ActionExecutor

NewActionExecutorFull creates a new ActionExecutor with full functionality. The mutator enables triple persistence, and the publisher enables NATS publishing.

func NewActionExecutorWithMutator

func NewActionExecutorWithMutator(logger *slog.Logger, mutator TripleMutator) *ActionExecutor

NewActionExecutorWithMutator creates a new ActionExecutor with triple mutation support. The mutator enables actual persistence of triple operations via NATS request/response.

func (*ActionExecutor) Execute

func (e *ActionExecutor) Execute(ctx context.Context, action Action, ec *ExecutionContext) error

Execute runs the given action using the execution context. The ExecutionContext provides the entity ID, related entity ID, full entity state, and match state for rich action execution.

func (*ActionExecutor) ExecuteAddTriple

func (e *ActionExecutor) ExecuteAddTriple(ctx context.Context, action Action, ec *ExecutionContext) (message.Triple, error)

ExecuteAddTriple executes an add_triple action, creating a new semantic triple. Returns the created triple and any error that occurred. If a TripleMutator is configured, the triple is persisted via NATS request/response.

func (*ActionExecutor) ExecuteRemoveTriple

func (e *ActionExecutor) ExecuteRemoveTriple(ctx context.Context, action Action, ec *ExecutionContext) error

ExecuteRemoveTriple executes a remove_triple action, removing a semantic triple. If a TripleMutator is configured, the triple is removed via NATS request/response.

type ActionExecutorInterface

type ActionExecutorInterface interface {
	Execute(ctx context.Context, action Action, ec *ExecutionContext) error
}

ActionExecutorInterface defines the interface for action execution. Actions receive an ExecutionContext with the full entity state and match state, replacing the previous (entityID, relatedID string) signature.

type BaseRuleFactory

type BaseRuleFactory struct {
	// contains filtered or unexported fields
}

BaseRuleFactory provides common factory functionality

func NewBaseRuleFactory

func NewBaseRuleFactory(ruleType, displayName, description, category string) *BaseRuleFactory

NewBaseRuleFactory creates a base factory implementation

func (*BaseRuleFactory) Type

func (f *BaseRuleFactory) Type() string

Type returns the rule type

func (*BaseRuleFactory) ValidateExpression

func (f *BaseRuleFactory) ValidateExpression(def Definition) error

ValidateExpression validates expression configuration

type Config

type Config struct {
	// Port configuration for inputs and outputs
	Ports *component.PortConfig `` /* 168-byte string literal not displayed */

	// Rule configuration sources
	RulesFiles  []string     `json:"rules_files" schema:"type:array,description:Paths to JSON rule definition files,default:[],category:basic"`
	InlineRules []Definition `json:"inline_rules,omitempty" schema:"type:array,description:Inline rule definitions (alternative to files),category:basic"`

	// Message cache configuration (not exposed in schema - internal config)
	MessageCache cache.Config `json:"message_cache"`

	// Buffer window size for time-window analysis
	BufferWindowSize string `` /* 135-byte string literal not displayed */

	// Alert cooldown period to prevent spam
	AlertCooldownPeriod string `` /* 139-byte string literal not displayed */

	// Graph processor integration
	EnableGraphIntegration bool `` /* 130-byte string literal not displayed */

	// NATS KV patterns to watch for entity changes (e.g., 'telemetry.robotics.>')
	// DEPRECATED: Use EntityWatchBuckets for multi-bucket support. This field is still
	// supported for backwards compatibility and applies to ENTITY_STATES bucket.
	EntityWatchPatterns []string `` /* 153-byte string literal not displayed */

	// EntityWatchBuckets maps bucket names to watch patterns.
	// This enables rules to observe operational results from multiple components.
	// Example: {"ENTITY_STATES": ["telemetry.>"], "WORKFLOW_EXECUTIONS": ["COMPLETE_*"]}
	// If not specified, falls back to EntityWatchPatterns for ENTITY_STATES bucket.
	EntityWatchBuckets map[string][]string `` /* 147-byte string literal not displayed */

	// Debounce delay for rule evaluation (settling time for entity state)
	// Default is 0 (disabled) to ensure rules evaluate against each state change.
	// Set to a positive value (e.g., 100) to batch rapid updates and evaluate final state only.
	DebounceDelayMs time.Duration `` /* 146-byte string literal not displayed */

	// JetStream consumer configuration (not exposed in schema - internal config)
	Consumer struct {
		Enabled        bool   `json:"enabled"`          // Enable JetStream consumer
		AckWaitSeconds int    `json:"ack_wait_seconds"` // Acknowledgment timeout
		MaxDeliver     int    `json:"max_deliver"`      // Max delivery attempts
		ReplayPolicy   string `json:"replay_policy"`    // "instant" or "original"
	} `json:"consumer"`
}

Config holds configuration for the RuleProcessor

func CreateRuleTestConfig

func CreateRuleTestConfig(watchPatterns []string) Config

CreateRuleTestConfig creates a test configuration for rule processor

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible defaults

func (Config) MarshalJSON

func (c Config) MarshalJSON() ([]byte, error)

MarshalJSON implements custom JSON marshaling for Config

func (*Config) UnmarshalJSON

func (c *Config) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom JSON unmarshaling for Config

type ConfigManager

type ConfigManager struct {
	// contains filtered or unexported fields
}

ConfigManager manages rules through NATS KV configuration

func NewConfigManager

func NewConfigManager(processor *Processor, configMgr *config.Manager, logger *slog.Logger) *ConfigManager

NewConfigManager creates a new rule configuration manager

func (*ConfigManager) DeleteRule

func (rcm *ConfigManager) DeleteRule(ctx context.Context, ruleID string) error

DeleteRule removes a rule configuration from NATS KV

func (*ConfigManager) GetRule

func (rcm *ConfigManager) GetRule(ctx context.Context, ruleID string) (*Definition, error)

GetRule retrieves a rule configuration from NATS KV

func (*ConfigManager) InitializeKVStore

func (rcm *ConfigManager) InitializeKVStore(natsClient *natsclient.Client) error

InitializeKVStore initializes the KVStore for direct KV operations

func (*ConfigManager) ListRules

func (rcm *ConfigManager) ListRules(_ context.Context) (map[string]Definition, error)

ListRules returns all rule configurations

func (*ConfigManager) SaveRule

func (rcm *ConfigManager) SaveRule(ctx context.Context, ruleID string, ruleDef Definition) error

SaveRule saves a rule configuration to NATS KV

func (*ConfigManager) Start

func (rcm *ConfigManager) Start(_ context.Context) error

Start begins watching for rule configuration updates

func (*ConfigManager) Stop

func (rcm *ConfigManager) Stop() error

Stop stops the configuration manager

func (*ConfigManager) WatchRules

func (rcm *ConfigManager) WatchRules(_ context.Context, _ func(ruleID string, rule Rule, operation string)) error

WatchRules watches for rule changes and returns active rules

type Definition

type Definition struct {
	ID              string                           `json:"id"`
	Type            string                           `json:"type"`
	Name            string                           `json:"name"`
	Description     string                           `json:"description"`
	Enabled         bool                             `json:"enabled"`
	Conditions      []expression.ConditionExpression `json:"conditions"`
	Logic           string                           `json:"logic"`
	Cooldown        string                           `json:"cooldown,omitempty"`
	Entity          EntityConfig                     `json:"entity,omitempty"`
	Metadata        map[string]interface{}           `json:"metadata,omitempty"`
	OnEnter         []Action                         `json:"on_enter,omitempty"`         // Fires on false→true transition
	OnExit          []Action                         `json:"on_exit,omitempty"`          // Fires on true→false transition
	WhileTrue       []Action                         `json:"while_true,omitempty"`       // Fires on every update while true
	RelatedPatterns []string                         `json:"related_patterns,omitempty"` // For pair rules

	// MaxIterations limits how many times a rule can enter the matching state for an entity.
	// 0 means no limit. Use $state.iteration and $state.max_iterations in When clauses
	// for retry budgets (e.g., retry up to 3 times then escalate).
	MaxIterations int `json:"max_iterations,omitempty"`
}

Definition represents a JSON rule configuration

type Dependencies

type Dependencies struct {
	NATSClient *natsclient.Client
	Logger     *slog.Logger
}

Dependencies provides dependencies for rule creation

type EntityConfig

type EntityConfig struct {
	Pattern      string   `json:"pattern"`       // Entity ID pattern to match
	WatchBuckets []string `json:"watch_buckets"` // KV buckets to watch
}

EntityConfig defines entity-specific configuration

type EntityStateEvaluator

type EntityStateEvaluator interface {
	// EvaluateEntityState evaluates the rule directly against EntityState triples.
	// Rule conditions should use full predicate paths (e.g., "sensor.measurement.fahrenheit").
	EvaluateEntityState(entityState *gtypes.EntityState) bool
}

EntityStateEvaluator is an optional interface for rules that can evaluate directly against EntityState triples, bypassing the message transformation layer. This is more efficient for rules that need to access triple predicates directly.

type Event

type Event interface {
	// EventType returns the type identifier for this event
	EventType() string

	// Subject returns the NATS subject for publishing this event
	Subject() string

	// Payload returns the event data as a generic map
	Payload() map[string]any

	// Validate checks if the event is valid and ready to publish
	Validate() error
}

Event represents a generic event that can be published by rules. This interface allows rules to be decoupled from specific event types while still integrating with graph events when EnableGraphIntegration is enabled.

type Example

type Example struct {
	Name        string     `json:"name"`
	Description string     `json:"description"`
	Config      Definition `json:"config"`
}

Example provides example configurations

type ExecutionContext

type ExecutionContext struct {
	// EntityID is the primary entity identifier.
	EntityID string

	// RelatedID is the related entity identifier (empty for single-entity rules).
	RelatedID string

	// Entity is the full entity state with triples (nil for message-path rules).
	Entity *gtypes.EntityState

	// Related is the related entity state (nil for single-entity rules or message-path).
	Related *gtypes.EntityState

	// State is the current match state including iteration tracking.
	// May be nil for first evaluation before state is persisted.
	State *MatchState
}

ExecutionContext carries typed data through the rule evaluation → action pipeline. It replaces the previous (entityID, relatedID string) action signature, providing actions with the full entity state and match state for richer execution logic.

func (*ExecutionContext) SubstituteVariables

func (ec *ExecutionContext) SubstituteVariables(template string) string

SubstituteVariables replaces template variables with values from the execution context. Supported variables:

  • $entity.id: The primary entity ID
  • $related.id: The related entity ID (for pair rules)
  • $state.iteration: Current iteration count
  • $state.max_iterations: Configured max iterations

Entity triple values can be accessed via $entity.triple.<predicate> syntax.

type ExpressionRule

type ExpressionRule struct {
	// contains filtered or unexported fields
}

ExpressionRule implements Rule interface for expression-based condition evaluation

func NewExpressionRule

func NewExpressionRule(def Definition) (*ExpressionRule, error)

NewExpressionRule creates a new expression-based rule

func (*ExpressionRule) Evaluate

func (r *ExpressionRule) Evaluate(messages []message.Message) bool

Evaluate evaluates the rule against messages

func (*ExpressionRule) EvaluateEntityState

func (r *ExpressionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool

EvaluateEntityState evaluates the rule directly against EntityState triples. This bypasses the message transformation layer and evaluates conditions directly against triple predicates (e.g., "sensor.measurement.fahrenheit").

func (*ExpressionRule) ExecuteEvents

func (r *ExpressionRule) ExecuteEvents(messages []message.Message) ([]Event, error)

ExecuteEvents generates events when rule triggers

func (*ExpressionRule) Name

func (r *ExpressionRule) Name() string

Name returns the rule name

func (*ExpressionRule) Subscribe

func (r *ExpressionRule) Subscribe() []string

Subscribe returns subjects this rule subscribes to

type ExpressionRuleFactory

type ExpressionRuleFactory struct {
	// contains filtered or unexported fields
}

ExpressionRuleFactory creates expression-based rules

func NewExpressionRuleFactory

func NewExpressionRuleFactory() *ExpressionRuleFactory

NewExpressionRuleFactory creates a new expression rule factory

func (*ExpressionRuleFactory) Create

Create creates an expression rule from definition

func (*ExpressionRuleFactory) Schema

func (f *ExpressionRuleFactory) Schema() Schema

Schema returns the expression rule schema

func (*ExpressionRuleFactory) Type

func (f *ExpressionRuleFactory) Type() string

Type returns the factory type

func (*ExpressionRuleFactory) Validate

func (f *ExpressionRuleFactory) Validate(def Definition) error

Validate validates the rule definition

type Factory

type Factory interface {
	// Create creates a rule instance from configuration
	Create(id string, config Definition, deps Dependencies) (Rule, error)

	// Type returns the rule type this factory creates
	Type() string

	// Schema returns the configuration schema for UI discovery
	Schema() Schema

	// Validate validates a rule configuration
	Validate(config Definition) error
}

Factory creates rules from configuration

func GetRuleFactory

func GetRuleFactory(ruleType string) (Factory, bool)

GetRuleFactory returns a registered rule factory

type KVTestHelper

type KVTestHelper struct {
	// contains filtered or unexported fields
}

KVTestHelper provides utilities for KV-based rule testing

func NewKVTestHelper

func NewKVTestHelper(t *testing.T, bucket jetstream.KeyValue) *KVTestHelper

NewKVTestHelper creates a new KV test helper

func (*KVTestHelper) DeleteEntity

func (h *KVTestHelper) DeleteEntity(entityID string)

DeleteEntity removes an entity from the KV bucket

func (*KVTestHelper) UpdateEntityProperty

func (h *KVTestHelper) UpdateEntityProperty(entityID string, predicateFull string, value interface{})

UpdateEntityProperty updates a specific property of an entity

func (*KVTestHelper) WriteEntityState

func (h *KVTestHelper) WriteEntityState(entity *gtypes.EntityState)

WriteEntityState writes an entity state to the KV bucket

type MatchState

type MatchState struct {
	RuleID         string    `json:"rule_id"`
	EntityKey      string    `json:"entity_key"`
	IsMatching     bool      `json:"is_matching"`
	LastTransition string    `json:"last_transition"` // ""|"entered"|"exited"
	TransitionAt   time.Time `json:"transition_at,omitempty"`
	SourceRevision uint64    `json:"source_revision"`
	LastChecked    time.Time `json:"last_checked"`

	// Iteration tracks how many times this rule has entered the matching state
	// for this entity. Incremented on each TransitionEntered, preserved through exits.
	Iteration int `json:"iteration"`

	// MaxIterations is the configured limit from the rule Definition.
	// 0 means no limit. Used with $state.iteration in When clauses for retry budgets.
	MaxIterations int `json:"max_iterations,omitempty"`
}

MatchState tracks whether a rule's condition is currently matching for a specific entity or entity pair.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds Prometheus metrics for RuleProcessor component

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a component that processes messages through rules

func NewProcessor

func NewProcessor(natsClient *natsclient.Client, config *Config) (*Processor, error)

NewProcessor creates a new rule processor

func NewProcessorWithMetrics

func NewProcessorWithMetrics(natsClient *natsclient.Client, config *Config, metricsRegistry *metric.MetricsRegistry) (*Processor, error)

NewProcessorWithMetrics creates a new rule processor with optional metrics

func (*Processor) ApplyConfigUpdate

func (rp *Processor) ApplyConfigUpdate(changes map[string]any) error

ApplyConfigUpdate applies validated configuration changes

func (*Processor) ConfigSchema

func (rp *Processor) ConfigSchema() component.ConfigSchema

ConfigSchema returns configuration schema for component interface

func (*Processor) DataFlow

func (rp *Processor) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Processor) DebugStatus

func (rp *Processor) DebugStatus() any

DebugStatus returns extended debug information for the rule processor. Implements component.DebugStatusProvider.

func (*Processor) GetRuleMetrics

func (rp *Processor) GetRuleMetrics() map[string]any

GetRuleMetrics returns metrics for all rules

func (*Processor) GetRuntimeConfig

func (rp *Processor) GetRuntimeConfig() map[string]any

GetRuntimeConfig returns current runtime configuration

func (*Processor) Health

func (rp *Processor) Health() component.HealthStatus

Health returns current health status

func (*Processor) Initialize

func (rp *Processor) Initialize() error

Initialize loads rules and prepares the processor

func (*Processor) InputPorts

func (rp *Processor) InputPorts() []component.Port

InputPorts returns declared input ports

func (*Processor) Meta

func (rp *Processor) Meta() component.Metadata

Meta returns component metadata

func (*Processor) OutputPorts

func (rp *Processor) OutputPorts() []component.Port

OutputPorts returns declared output ports

func (*Processor) Start

func (rp *Processor) Start(ctx context.Context) error

Start begins processing messages through rules

func (*Processor) Stop

func (rp *Processor) Stop(_ time.Duration) error

Stop stops the processor and cleans up resources

func (*Processor) UpdateWatchBuckets

func (rp *Processor) UpdateWatchBuckets(newBuckets map[string][]string) error

UpdateWatchBuckets dynamically updates the entity watch buckets and patterns.

func (*Processor) UpdateWatchPatterns

func (rp *Processor) UpdateWatchPatterns(newPatterns []string) error

UpdateWatchPatterns dynamically updates the entity watch patterns for ENTITY_STATES. DEPRECATED: Use UpdateWatchBuckets for multi-bucket support.

func (*Processor) ValidateConfigUpdate

func (rp *Processor) ValidateConfigUpdate(changes map[string]any) error

ValidateConfigUpdate validates proposed configuration changes

type Publisher

type Publisher interface {
	// Publish sends a message to a NATS subject.
	// The implementation determines whether to use core NATS or JetStream
	// based on port configuration.
	Publish(ctx context.Context, subject string, data []byte) error
}

Publisher handles publishing messages to NATS subjects. It abstracts the decision between core NATS and JetStream publishing.

type Rule

type Rule interface {
	// Name returns the human-readable name of this rule
	Name() string

	// Subscribe returns the NATS subjects this rule should listen to
	Subscribe() []string

	// Evaluate checks if the rule conditions are met for the given messages
	Evaluate(messages []message.Message) bool

	// ExecuteEvents generates events when rule conditions are satisfied
	ExecuteEvents(messages []message.Message) ([]Event, error)
}

Rule interface defines the event-based contract for processing rules. Rules evaluate messages and generate events when conditions are met.

func CreateRuleFromDefinition

func CreateRuleFromDefinition(def Definition, deps Dependencies) (Rule, error)

CreateRuleFromDefinition creates a rule using the appropriate factory

type Schema

type Schema struct {
	Type        string                              `json:"type"`
	DisplayName string                              `json:"display_name"`
	Description string                              `json:"description"`
	Category    string                              `json:"category"`
	Icon        string                              `json:"icon,omitempty"`
	Properties  map[string]component.PropertySchema `json:"properties"`
	Required    []string                            `json:"required"`
	Examples    []Example                           `json:"examples,omitempty"`
}

Schema describes the configuration schema for a rule type

type StateTracker

type StateTracker struct {
	// contains filtered or unexported fields
}

StateTracker manages rule match state persistence in NATS KV. It tracks which rules are currently matching for which entities to enable proper transition detection for stateful ECA rules.

func NewStateTracker

func NewStateTracker(bucket jetstream.KeyValue, logger *slog.Logger) *StateTracker

NewStateTracker creates a new StateTracker with the given KV bucket.

func (*StateTracker) Delete

func (st *StateTracker) Delete(ctx context.Context, ruleID, entityKey string) error

Delete removes the match state for a rule and entity key.

func (*StateTracker) DeleteAllForEntity

func (st *StateTracker) DeleteAllForEntity(ctx context.Context, entityID string) error

DeleteAllForEntity removes all rule states associated with an entity. This is called when an entity is deleted to clean up orphaned state.

func (*StateTracker) Get

func (st *StateTracker) Get(ctx context.Context, ruleID, entityKey string) (MatchState, error)

Get retrieves the current match state for a rule and entity key. Returns ErrStateNotFound if no state exists for this combination.

func (*StateTracker) Set

func (st *StateTracker) Set(ctx context.Context, state MatchState) error

Set persists the match state for a rule and entity.

type StatefulEvaluator

type StatefulEvaluator struct {
	// contains filtered or unexported fields
}

StatefulEvaluator handles rule evaluation with state tracking. It manages state transitions (entered/exited/none) and executes the appropriate actions based on the transition type.

func NewStatefulEvaluator

func NewStatefulEvaluator(stateTracker *StateTracker, actionExecutor ActionExecutorInterface, logger *slog.Logger) *StatefulEvaluator

NewStatefulEvaluator creates a new stateful evaluator with the given dependencies. If logger is nil, uses the default logger.

func (*StatefulEvaluator) EvaluateWithState

func (e *StatefulEvaluator) EvaluateWithState(
	ctx context.Context,
	ruleDef Definition,
	entityID string,
	relatedID string,
	currentlyMatching bool,
	entity *gtypes.EntityState,
	related *gtypes.EntityState,
) (Transition, error)

EvaluateWithState evaluates a rule and fires appropriate actions based on state transitions. It:

  1. Retrieves previous state from StateTracker (treats missing state as false)
  2. Detects transition (entered/exited/none) by comparing previous and current match state
  3. Fires appropriate actions based on transition: - TransitionEntered: Execute all OnEnter actions (increments iteration) - TransitionExited: Execute all OnExit actions (preserves iteration) - TransitionNone + currentlyMatching: Execute all WhileTrue actions
  4. Evaluates When clauses on each action before execution
  5. Persists new state to StateTracker

The entity and related parameters provide typed entity state for When clause evaluation and are passed through to actions via ExecutionContext. Pass nil for message-path rules.

Returns the transition that occurred and any error encountered.

type Status

type Status struct {
	DebounceDelayMs    int       `json:"debounce_delay_ms"`
	PendingEvaluations int       `json:"pending_evaluations"`
	TotalEvaluations   int       `json:"total_evaluations"`
	TotalTriggers      int       `json:"total_triggers"`
	DebouncedCount     int       `json:"debounced_count"` // Matches Tester's test expectations
	RulesLoaded        int       `json:"rules_loaded"`
	LastEvaluationTime time.Time `json:"last_evaluation_time,omitempty"`
}

Status represents the current status of rule evaluation for debug observability

type TestRule

type TestRule struct {
	// contains filtered or unexported fields
}

TestRule is a functional rule implementation for testing

func NewTestRule

func NewTestRule(id, name string, subjects []string, conditions []expression.ConditionExpression) *TestRule

NewTestRule creates a new test rule

func (*TestRule) Evaluate

func (r *TestRule) Evaluate(messages []message.Message) bool

Evaluate evaluates the rule against messages

func (*TestRule) EvaluateEntityState

func (r *TestRule) EvaluateEntityState(entityState *gtypes.EntityState) bool

EvaluateEntityState evaluates the rule directly against EntityState triples. This implements the EntityStateEvaluator interface for KV watch-based evaluation.

func (*TestRule) ExecuteEvents

func (r *TestRule) ExecuteEvents(messages []message.Message) ([]Event, error)

ExecuteEvents generates events when rule triggers

func (*TestRule) Name

func (r *TestRule) Name() string

Name returns the rule name

func (*TestRule) Subscribe

func (r *TestRule) Subscribe() []string

Subscribe returns subjects this rule subscribes to

type TestRuleFactory

type TestRuleFactory struct {
	// contains filtered or unexported fields
}

TestRuleFactory creates test rules for integration testing

func NewTestRuleFactory

func NewTestRuleFactory() *TestRuleFactory

NewTestRuleFactory creates a new test rule factory

func (*TestRuleFactory) Create

func (f *TestRuleFactory) Create(ruleID string, def Definition, _ Dependencies) (Rule, error)

Create creates a test rule from definition

func (*TestRuleFactory) Examples

func (f *TestRuleFactory) Examples() []Example

Examples returns test rule examples

func (*TestRuleFactory) Schema

func (f *TestRuleFactory) Schema() Schema

Schema returns the test rule schema

func (*TestRuleFactory) Type

func (f *TestRuleFactory) Type() string

Type returns the factory type

func (*TestRuleFactory) Validate

func (f *TestRuleFactory) Validate(_ Definition) error

Validate validates the rule definition

type Transition

type Transition string

Transition represents the type of state change detected

const (
	// TransitionNone indicates no state change
	TransitionNone Transition = ""
	// TransitionEntered indicates condition became true
	TransitionEntered Transition = "entered"
	// TransitionExited indicates condition became false
	TransitionExited Transition = "exited"
)

func DetectTransition

func DetectTransition(wasMatching, nowMatching bool) Transition

DetectTransition compares previous and current match state to determine transition type. Returns TransitionEntered if condition changed from false to true. Returns TransitionExited if condition changed from true to false. Returns TransitionNone if state did not change.

type TripleMutator

type TripleMutator interface {
	// AddTriple adds a triple via NATS request/response and returns the KV revision
	AddTriple(ctx context.Context, triple message.Triple) (uint64, error)
	// RemoveTriple removes a triple via NATS request/response and returns the KV revision
	RemoveTriple(ctx context.Context, subject, predicate string) (uint64, error)
}

TripleMutator handles triple mutations via NATS request/response. The returned uint64 is the KV revision after the write, used for feedback loop prevention.

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError represents a payload validation error.

func (*ValidationError) Error

func (e *ValidationError) Error() string

type WorkflowTriggerPayload

type WorkflowTriggerPayload struct {
	WorkflowID  string         `json:"workflow_id"`
	EntityID    string         `json:"entity_id"`
	TriggeredAt time.Time      `json:"triggered_at"`
	RelatedID   string         `json:"related_id,omitempty"`
	Context     map[string]any `json:"context,omitempty"`
}

WorkflowTriggerPayload represents a message sent by the rule processor to trigger a reactive workflow. This payload is wrapped in a BaseMessage for proper deserialization by the reactive workflow engine.

func (*WorkflowTriggerPayload) MarshalJSON

func (p *WorkflowTriggerPayload) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler. This marshals just the payload fields - BaseMessage handles the wrapping.

func (*WorkflowTriggerPayload) Schema

func (p *WorkflowTriggerPayload) Schema() message.Type

Schema returns the message type for workflow trigger payloads.

func (*WorkflowTriggerPayload) UnmarshalJSON

func (p *WorkflowTriggerPayload) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

func (*WorkflowTriggerPayload) Validate

func (p *WorkflowTriggerPayload) Validate() error

Validate ensures the payload has required fields.

Directories

Path Synopsis
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
Package expression - Expression evaluator implementation
Package expression - Expression evaluator implementation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL