rule

package
v1.0.0-beta.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 28 Imported by: 0

README

Rule Processor

Processes message streams through configurable rules and evaluates conditions against semantic messages and entity state changes.

What It Does

  • Watches ENTITY_STATES KV bucket for entity changes
  • Evaluates rules against semantic messages and entity state
  • Publishes rule events and graph mutation requests
  • Provides time-window analysis with message buffering

Quick Start

config := rule.DefaultConfig()
config.Ports = &component.PortsDefinition{
    Inputs: []component.PortDefinition{{Subject: "process.>"}},
    Outputs: []component.PortDefinition{{Subject: "rule.events.>"}},
}

processor, err := rule.NewProcessor(natsClient, &config)
if err != nil {
    log.Fatal(err)
}

if err := processor.Initialize(); err != nil {
    log.Fatal(err)
}

if err := processor.Start(ctx); err != nil {
    log.Fatal(err)
}

Rule Definition

Rules are defined in JSON:

{
  "id": "battery-low",
  "name": "Battery Low Alert",
  "enabled": true,
  "conditions": [
    {"field": "drone.telemetry.battery", "operator": "lt", "value": 20}
  ],
  "on_enter": [
    {"type": "add_triple", "predicate": "alert.status", "object": "battery_low"},
    {"type": "publish", "subject": "alerts.battery"}
  ],
  "on_exit": [
    {"type": "remove_triple", "predicate": "alert.status"}
  ]
}

Documentation

Topic Location
Conceptual Overview docs/advanced/06-rules-engine.md
Rule Syntax docs/syntax.md
Conditions docs/conditions.md
Actions docs/actions.md
State Tracking docs/state-tracking.md
Entity Watching docs/entity-watching.md
Configuration docs/configuration.md
Custom Rules docs/custom-rules.md
Operations docs/operations.md
Examples docs/examples.md

Package Structure

File Purpose
processor.go Core processor, lifecycle, ports
config.go Config struct and defaults
factory.go Component registration
entity_watcher.go KV entity state watching
message_handler.go Message processing and rule evaluation
rule_loader.go Rule loading from JSON files
publisher.go Event publishing to NATS
metrics.go Prometheus metrics

Metrics

Key metrics exposed:

Metric Type Description
semstreams_rule_evaluations_total Counter Rule evaluations performed
semstreams_rule_triggers_total Counter Successful rule triggers
semstreams_rule_evaluation_duration_seconds Histogram Evaluation latency
semstreams_rule_active_rules Gauge Active rules count
semstreams_rule_errors_total Counter Processing errors

Design Decisions

Entity ID Format

Uses 6-part hierarchical dotted notation for global uniqueness:

<org>.<platform>.<system>.<domain>.<type>.<instance>
Example: c360.platform1.gcs1.robotics.drone.1
Nil Safety Pattern

Metrics use "nil input = nil feature" pattern - zero overhead when disabled.

Graph Integration

When enabled (default), rule actions directly affect the graph via add_triple and remove_triple actions.

Documentation

Overview

Package rule - Action execution for ECA rules

Package rule provides rule-based processing of semantic message streams with support for entity state watching and event generation.

Overview

The rule processor evaluates conditions against streaming messages and entity state changes, generating graph events when rules trigger. Rules are defined using JSON configuration and implemented using the factory pattern.

Architecture

Rules consist of three main components:

  • Definition: JSON configuration (conditions, logic, metadata)
  • Factory: Creates Rule instances from definitions
  • Rule: Evaluates conditions and generates events

Rule Interface

Rules must implement this interface:

type Rule interface {
    Name() string
    Subscribe() []string
    Evaluate(messages []message.Message) bool
    ExecuteEvents(messages []message.Message) ([]rtypes.Event, error)
}

Creating Custom Rules

1. Create a RuleFactory:

type MyRuleFactory struct {
    ruleType string
}

func (f *MyRuleFactory) Create(id string, def Definition, deps Dependencies) (rtypes.Rule, error) {
    return &MyRule{...}, nil
}

func (f *MyRuleFactory) Validate(def Definition) error {
    // Validate rule configuration
    return nil
}

2. Implement the Rule interface:

type MyRule struct {
    id         string
    conditions []expression.ConditionExpression
}

func (r *MyRule) ExecuteEvents(messages []message.Message) ([]rtypes.Event, error) {
    // Generate events directly in Go code
    // gtypes.Event implements rtypes.Event interface
    event := &gtypes.Event{
        Type:     gtypes.EventEntityUpdate,
        EntityID: "alert.my-rule." + r.id,
        Properties: map[string]interface{}{
            "triggered": true,
            "timestamp": time.Now(),
        },
    }
    return []rtypes.Event{event}, nil
}

3. Register the factory:

func init() {
    RegisterRuleFactory("my_rule", &MyRuleFactory{ruleType: "my_rule"})
}

Entity State Watching

Rules can watch entity state changes via NATS KV buckets:

{
  "id": "my-rule",
  "type": "my_rule",
  "entity": {
    "pattern": "c360.*.robotics.drone.>",
    "watch_buckets": ["ENTITY_STATES"]
  }
}

The processor watches these buckets using the KV watch pattern and converts entity state updates into messages for rule evaluation.

Runtime Configuration

Rules support dynamic runtime updates via ApplyConfigUpdate():

  • Add/remove rules without restart
  • Update rule conditions and metadata
  • Enable/disable rules on the fly

Event Generation

Rules generate gtypes.Event directly (no template system):

  • Events contain: Type, EntityID, Properties, Metadata
  • Published to NATS graph subjects
  • Consumed by graph processor for entity storage

Metrics

The processor exposes Prometheus metrics:

  • semstreams_rule_evaluations_total
  • semstreams_rule_triggers_total
  • semstreams_rule_evaluation_duration_seconds
  • semstreams_rule_errors_total

Example Usage

config := rule.DefaultConfig()
config.Ports = &component.PortConfig{
    Inputs: []component.PortDefinition{
        {
            Name:    "semantic_messages",
            Type:    "nats",
            Subject: "process.>",
        },
    },
}

processor, err := rule.NewProcessor(natsClient, &config)
if err != nil {
    log.Fatal(err)
}
err = processor.Initialize()
err = processor.Start(ctx)

For comprehensive documentation, see:

  • /Users/coby/Code/c360/semdocs/docs/guides/rules-engine.md
  • /Users/coby/Code/c360/semdocs/docs/specs/SPEC-001-generic-rules-engine.md

Package rule - Execution context for rule actions

Package rule - Expression Rule Factory for condition-based rules

Package rule provides interfaces and implementations for rule processing

Package rule - NATS KV Configuration Integration for Rules

Package rule - KV write action support for rule-driven state machine orchestration

Package rule provides a rule processing component that implements the Discoverable interface for processing message streams through rules

Package rule - Rule Factory Pattern for Dynamic Rule Creation

Package rule - State tracking for stateful ECA rules

Package rule - Stateful rule evaluation with state tracking

Package rule - Test Rule Factory for Integration Tests

Package rule provides triple mutation support via NATS request/response

Package rule - Workflow trigger payload for rule-to-workflow integration

Index

Constants

View Source
const (
	// ActionTypePublish publishes a message to a NATS subject
	ActionTypePublish = "publish"
	// ActionTypeAddTriple creates a relationship triple in the graph
	ActionTypeAddTriple = "add_triple"
	// ActionTypeRemoveTriple removes a relationship triple from the graph
	ActionTypeRemoveTriple = "remove_triple"
	// ActionTypeUpdateTriple updates metadata on an existing triple
	ActionTypeUpdateTriple = "update_triple"
	// ActionTypePublishAgent triggers an agentic loop by publishing a TaskMessage
	ActionTypePublishAgent = "publish_agent"
	// ActionTypeTriggerWorkflow triggers a reactive workflow by publishing to workflow.trigger.<workflow_id>
	ActionTypeTriggerWorkflow = "trigger_workflow"
	// ActionTypePublishBoidSignal publishes a Boid steering signal for agent coordination
	ActionTypePublishBoidSignal = "publish_boid_signal"
	// ActionTypeUpdateKV writes JSON to a named KV bucket with optional CAS merge
	ActionTypeUpdateKV = "update_kv"
)

Action type constants define the supported action types for rule execution.

View Source
const (
	SubjectTripleAdd    = "graph.mutation.triple.add"
	SubjectTripleRemove = "graph.mutation.triple.remove"
	MutationTimeout     = 5 * time.Second
)

NATS subjects for graph mutations (must match processor/graph/mutations.go)

View Source
const (
	WorkflowTriggerDomain   = "rule"
	WorkflowTriggerCategory = "workflow_trigger"
	WorkflowTriggerVersion  = "v1"
)

Workflow trigger payload type constants. Uses "rule" domain to avoid conflict with processor/workflow's workflow.trigger.v1

Variables

View Source
var ErrStateNotFound = errors.New("rule state not found")

ErrStateNotFound is returned when no rule state exists for the given key

Functions

func AssertEventuallyTrue

func AssertEventuallyTrue(t *testing.T, condition func() bool, timeout time.Duration, tick time.Duration, msgAndArgs ...interface{})

AssertEventuallyTrue asserts that a condition becomes true within timeout

func CreateBatteryEntity

func CreateBatteryEntity(id string, level float64, voltage float64) *gtypes.EntityState

CreateBatteryEntity creates a battery entity for testing

func CreateDroneEntity

func CreateDroneEntity(id string, armed bool, mode string, altitude float64) *gtypes.EntityState

CreateDroneEntity creates a drone entity for testing

func CreateRuleProcessor

func CreateRuleProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateRuleProcessor creates a rule processor with the new factory pattern

func CreateTestEntityPattern

func CreateTestEntityPattern(domain, category, instance string) string

CreateTestEntityPattern creates entity IDs matching test patterns

func GetRegisteredRuleTypes

func GetRegisteredRuleTypes() []string

GetRegisteredRuleTypes returns all registered rule types

func GetRuleSchemas

func GetRuleSchemas() map[string]Schema

GetRuleSchemas returns schemas for all registered rule types

func Register

func Register(registry *component.Registry) error

Register registers the rule processor component with the given registry

func RegisterRuleFactory

func RegisterRuleFactory(ruleType string, factory Factory) error

RegisterRuleFactory registers a rule factory

func SetupKVBucketForTesting

func SetupKVBucketForTesting(t *testing.T, js jetstream.JetStream, bucketName string) jetstream.KeyValue

SetupKVBucketForTesting creates and initializes a KV bucket for testing

func UnregisterRuleFactory

func UnregisterRuleFactory(ruleType string) error

UnregisterRuleFactory removes a rule factory for a given type This is primarily for testing or dynamic factory management

func ValidateDefinition

func ValidateDefinition(def Definition) error

ValidateDefinition validates a rule Definition for fields that are processor-level concerns (not delegated to individual rule factories). Call this before passing a Definition to CreateRuleFromDefinition.

func VerifyRuleTriggered

func VerifyRuleTriggered(_ *testing.T, processor *Processor, ruleName string) bool

VerifyRuleTriggered checks if a rule has been triggered by examining metrics or events

func WaitForEntityProcessing

func WaitForEntityProcessing(_ *testing.T, _ time.Duration)

WaitForEntityProcessing waits for an entity update to be processed This is useful when testing async KV watchers

func WaitForKVWatcher

func WaitForKVWatcher(t *testing.T, processor *Processor, timeout time.Duration)

WaitForKVWatcher waits for a KV watcher to be ready

Types

type Action

type Action struct {
	// Type specifies the action type (publish, add_triple, remove_triple, update_triple, publish_agent)
	Type string `json:"type"`

	// Subject is the NATS subject for publish actions
	Subject string `json:"subject,omitempty"`

	// Predicate is the relationship type for triple actions
	Predicate string `json:"predicate,omitempty"`

	// Object is the target entity or value for triple actions
	Object string `json:"object,omitempty"`

	// TTL specifies optional expiration time for triples (e.g., "5m", "1h")
	TTL string `json:"ttl,omitempty"`

	// Properties contains additional metadata for the action
	Properties map[string]any `json:"properties,omitempty"`

	// Role is the agent role for publish_agent actions (e.g., "general", "architect", "editor")
	Role string `json:"role,omitempty"`

	// Model is the model endpoint name for publish_agent actions
	Model string `json:"model,omitempty"`

	// Prompt is the task prompt template for publish_agent actions
	// Supports variable substitution: $entity.id, $related.id
	Prompt string `json:"prompt,omitempty"`

	// Tools is the per-spawned-agent tool allowlist for publish_agent actions.
	// Tool names are resolved to agentic.ToolDefinition against the global
	// tool registry at dispatch time; unknown names are logged and dropped.
	// Empty/nil leaves TaskMessage.Tools unset, which makes the spawned loop
	// fall back to global tool discovery (existing behaviour).
	//
	// This is the product-layer hook for scoping which tools a given role
	// can see. Putting it on the rule — not in agentic-tools Config — keeps
	// role→tools decisions in the workflow config that already owns the
	// role name, model, and prompt for the spawned agent.
	Tools []string `json:"tools,omitempty"`

	// WorkflowID is the workflow identifier for trigger_workflow actions
	WorkflowID string `json:"workflow_id,omitempty"`

	// ContextData provides additional context passed to the workflow
	ContextData map[string]any `json:"context_data,omitempty"`

	// BoidSignalType specifies the type of boid signal: separation, cohesion, or alignment
	BoidSignalType string `json:"boid_signal_type,omitempty"`

	// BoidStrength specifies the steering strength for boid signals (0.0-1.0)
	BoidStrength float64 `json:"boid_strength,omitempty"`

	// WorkflowSlug identifies the workflow for publish_agent actions (e.g., "github-issue-to-pr")
	WorkflowSlug string `json:"workflow_slug,omitempty"`

	// WorkflowStep identifies the step within the workflow (e.g., "qualify", "develop", "review")
	WorkflowStep string `json:"workflow_step,omitempty"`

	// When is an optional guard clause for conditional action execution.
	// If present, all conditions must match (AND logic) against the entity's current
	// state and $state.* fields for the action to execute. Actions without When always execute.
	When []expression.ConditionExpression `json:"when,omitempty"`

	// Bucket is the KV bucket name for update_kv actions (e.g., "PLAN_STATES").
	// Supports variable substitution.
	Bucket string `json:"bucket,omitempty"`

	// Key is the KV key for update_kv actions. Supports variable substitution.
	Key string `json:"key,omitempty"`

	// Payload is the data to write for update_kv actions.
	// Supports variable substitution in string values (including nested maps).
	Payload map[string]any `json:"payload,omitempty"`

	// Merge controls write semantics for update_kv:
	// true = CAS read-modify-write (merge payload into existing document)
	// false = overwrite entire document (last writer wins)
	Merge bool `json:"merge,omitempty"`
}

Action represents an action to execute when a rule fires. Actions are triggered by state transitions (OnEnter, OnExit) or while a condition remains true (WhileTrue).

func (Action) ParseTTL

func (a Action) ParseTTL() (time.Duration, error)

ParseTTL parses the TTL string into a duration. Returns 0 duration if TTL is empty (no expiration). Returns an error if the TTL format is invalid or negative.

type ActionExecutor

type ActionExecutor struct {
	// contains filtered or unexported fields
}

ActionExecutor executes actions for rules. It handles triple mutations, NATS publishing, KV writes, and other action types.

func NewActionExecutor

func NewActionExecutor(logger *slog.Logger) *ActionExecutor

NewActionExecutor creates a new ActionExecutor with the given logger. If logger is nil, uses the default logger.

func NewActionExecutorComplete

func NewActionExecutorComplete(logger *slog.Logger, mutator TripleMutator, publisher Publisher, kvWriter KVWriter) *ActionExecutor

NewActionExecutorComplete creates an ActionExecutor with all capabilities including KV writes.

func NewActionExecutorFull

func NewActionExecutorFull(logger *slog.Logger, mutator TripleMutator, publisher Publisher) *ActionExecutor

NewActionExecutorFull creates a new ActionExecutor with full functionality. The mutator enables triple persistence, and the publisher enables NATS publishing.

func NewActionExecutorWithMutator

func NewActionExecutorWithMutator(logger *slog.Logger, mutator TripleMutator) *ActionExecutor

NewActionExecutorWithMutator creates a new ActionExecutor with triple mutation support. The mutator enables actual persistence of triple operations via NATS request/response.

func (*ActionExecutor) Execute

func (e *ActionExecutor) Execute(ctx context.Context, action Action, ec *ExecutionContext) error

Execute runs the given action using the execution context. The ExecutionContext provides the entity ID, related entity ID, full entity state, and match state for rich action execution.

func (*ActionExecutor) ExecuteAddTriple

func (e *ActionExecutor) ExecuteAddTriple(ctx context.Context, action Action, ec *ExecutionContext) (message.Triple, error)

ExecuteAddTriple executes an add_triple action, creating a new semantic triple. Returns the created triple and any error that occurred. If a TripleMutator is configured, the triple is persisted via NATS request/response.

func (*ActionExecutor) ExecuteRemoveTriple

func (e *ActionExecutor) ExecuteRemoveTriple(ctx context.Context, action Action, ec *ExecutionContext) error

ExecuteRemoveTriple executes a remove_triple action, removing a semantic triple. If a TripleMutator is configured, the triple is removed via NATS request/response.

type ActionExecutorInterface

type ActionExecutorInterface interface {
	Execute(ctx context.Context, action Action, ec *ExecutionContext) error
}

ActionExecutorInterface defines the interface for action execution. Actions receive an ExecutionContext with the full entity state and match state, replacing the previous (entityID, relatedID string) signature.

type BaseRuleFactory

type BaseRuleFactory struct {
	// contains filtered or unexported fields
}

BaseRuleFactory provides common factory functionality

func NewBaseRuleFactory

func NewBaseRuleFactory(ruleType, displayName, description, category string) *BaseRuleFactory

NewBaseRuleFactory creates a base factory implementation

func (*BaseRuleFactory) Type

func (f *BaseRuleFactory) Type() string

Type returns the rule type

func (*BaseRuleFactory) ValidateExpression

func (f *BaseRuleFactory) ValidateExpression(def Definition) error

ValidateExpression validates expression configuration

type Config

type Config struct {
	// Port configuration for inputs and outputs
	Ports *component.PortConfig `` /* 168-byte string literal not displayed */

	// Rule configuration sources
	RulesFiles  []string     `json:"rules_files" schema:"type:array,description:Paths to JSON rule definition files,default:[],category:basic"`
	InlineRules []Definition `json:"inline_rules,omitempty" schema:"type:array,description:Inline rule definitions (alternative to files),category:basic"`

	// Message cache configuration (not exposed in schema - internal config)
	MessageCache cache.Config `json:"message_cache"`

	// Buffer window size for time-window analysis
	BufferWindowSize string `` /* 135-byte string literal not displayed */

	// Alert cooldown period to prevent spam
	AlertCooldownPeriod string `` /* 139-byte string literal not displayed */

	// Graph processor integration
	EnableGraphIntegration bool `` /* 130-byte string literal not displayed */

	// NATS KV patterns to watch for entity changes (e.g., 'telemetry.robotics.>')
	// DEPRECATED: Use EntityWatchBuckets for multi-bucket support. This field is still
	// supported for backwards compatibility and applies to ENTITY_STATES bucket.
	EntityWatchPatterns []string `` /* 153-byte string literal not displayed */

	// EntityWatchBuckets maps bucket names to watch patterns.
	// This enables rules to observe operational results from multiple components.
	// Example: {"ENTITY_STATES": ["telemetry.>"], "WORKFLOW_EXECUTIONS": ["COMPLETE_*"]}
	// If not specified, falls back to EntityWatchPatterns for ENTITY_STATES bucket.
	EntityWatchBuckets map[string][]string `` /* 147-byte string literal not displayed */

	// Debounce delay for rule evaluation (settling time for entity state)
	// Default is 0 (disabled) to ensure rules evaluate against each state change.
	// Set to a positive value (e.g., 100) to batch rapid updates and evaluate final state only.
	DebounceDelayMs time.Duration `` /* 146-byte string literal not displayed */

	// JetStream consumer configuration (not exposed in schema - internal config)
	Consumer struct {
		Enabled        bool   `json:"enabled"`          // Enable JetStream consumer
		AckWaitSeconds int    `json:"ack_wait_seconds"` // Acknowledgment timeout
		MaxDeliver     int    `json:"max_deliver"`      // Max delivery attempts
		ReplayPolicy   string `json:"replay_policy"`    // "instant" or "original"
	} `json:"consumer"`
}

Config holds configuration for the RuleProcessor

func CreateRuleTestConfig

func CreateRuleTestConfig(watchPatterns []string) Config

CreateRuleTestConfig creates a test configuration for rule processor

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible defaults

func (Config) MarshalJSON

func (c Config) MarshalJSON() ([]byte, error)

MarshalJSON implements custom JSON marshaling for Config

func (*Config) UnmarshalJSON

func (c *Config) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom JSON unmarshaling for Config

type ConfigManager

type ConfigManager struct {
	// contains filtered or unexported fields
}

ConfigManager manages rules through NATS KV configuration.

Two instances of ConfigManager coexist in a running binary:

  1. The Pattern-B CRUD manager constructed in cmd/semstreams/main.go (buildRuleManager) with processor=nil. It handles agent tool writes (create_rule, update_rule, delete_rule) and has no watcher.
  2. The component-internal manager constructed inside Processor.Start with processor non-nil. It owns the KV watcher and applies hot-reload updates to the running processor.

Both read/write semstreams_config:rules.*. The component watcher picks up writes from the CRUD manager via normal KV semantics.

func NewConfigManager

func NewConfigManager(processor *Processor, configMgr *config.Manager, logger *slog.Logger) *ConfigManager

NewConfigManager creates a new rule configuration manager

func (*ConfigManager) DeleteRule

func (rcm *ConfigManager) DeleteRule(ctx context.Context, ruleID string) error

DeleteRule removes a rule configuration from NATS KV.

func (*ConfigManager) GetRule

func (rcm *ConfigManager) GetRule(ctx context.Context, ruleID string) (*Definition, error)

GetRule retrieves a rule configuration from NATS KV.

func (*ConfigManager) InitializeKVStore

func (rcm *ConfigManager) InitializeKVStore(natsClient *natsclient.Client) error

InitializeKVStore initializes the KVStore for direct KV operations

func (*ConfigManager) ListRules

func (rcm *ConfigManager) ListRules(ctx context.Context) (map[string]Definition, error)

ListRules returns all rule configurations from the KV store.

Prefers direct KV reads when a kvStore is wired (consistent with SaveRule/GetRule/DeleteRule). Falls back to the processor's runtime config for deployments that haven't called InitializeKVStore — kept so the existing runtime-config-only path still works. Callers that need full Definition data should ensure kvStore is initialised; the fallback returns a stub Definition carrying only ID/Type/Name/Enabled.

func (*ConfigManager) ReconcileCount

func (rcm *ConfigManager) ReconcileCount() int

ReconcileCount returns the total number of times reconcileFromKV has completed successfully. Intended for integration tests that verify debounce coalescing. For tests only.

func (*ConfigManager) SaveRule

func (rcm *ConfigManager) SaveRule(ctx context.Context, ruleID string, ruleDef Definition) error

SaveRule saves a rule configuration to NATS KV.

func (*ConfigManager) SeedFromRuntime

func (rcm *ConfigManager) SeedFromRuntime(ctx context.Context) error

SeedFromRuntime writes file-loaded rules from the running processor into KV idempotently. Uses Create (not Put) so operator edits already in KV are never overwritten — a key-already-exists error is treated as a no-op.

Reads from rp.ruleDefinitions (populated by loadRules/Initialize) since rp.ruleConfigs (from GetRuntimeConfig) is only populated by ApplyConfigUpdate, not by the initial file/inline load path.

Partial failures are logged but do not abort seeding. The method returns nil even when individual writes fail so that hot-reload startup is not blocked.

func (*ConfigManager) Start

func (rcm *ConfigManager) Start(_ context.Context) error

Start begins watching for rule configuration updates.

When processor is nil (Pattern-B CRUD path), Start is a no-op so that cmd/semstreams/main.go can safely call it without wiring a watcher.

When processor is non-nil (component-internal path), Start:

  1. Seeds file-loaded rules into KV idempotently via SeedFromRuntime.
  2. Opens a KV watcher on "rules.*".
  3. Spawns a goroutine that debounces watcher events and calls reconcileFromKV on each coalesced burst.

func (*ConfigManager) Stop

func (rcm *ConfigManager) Stop() error

Stop stops the configuration manager and waits for the watcher goroutine to exit cleanly.

func (*ConfigManager) WatchRules

func (rcm *ConfigManager) WatchRules(_ context.Context, _ func(ruleID string, rule Rule, operation string)) error

WatchRules watches for rule changes and returns active rules

type Definition

type Definition struct {
	ID              string                           `json:"id"`
	Type            string                           `json:"type"`
	Name            string                           `json:"name"`
	Description     string                           `json:"description"`
	Enabled         bool                             `json:"enabled"`
	Conditions      []expression.ConditionExpression `json:"conditions"`
	Logic           string                           `json:"logic"`
	Cooldown        string                           `json:"cooldown,omitempty"`
	Entity          EntityConfig                     `json:"entity,omitempty"`
	Metadata        map[string]interface{}           `json:"metadata,omitempty"`
	OnEnter         []Action                         `json:"on_enter,omitempty"`         // Fires on false→true transition
	OnExit          []Action                         `json:"on_exit,omitempty"`          // Fires on true→false transition
	WhileTrue       []Action                         `json:"while_true,omitempty"`       // Fires on every update while true
	OnRecovery      []Action                         `json:"on_recovery,omitempty"`      // Fires once on restart if still matching; falls back to OnEnter
	RelatedPatterns []string                         `json:"related_patterns,omitempty"` // For pair rules

	// RerunOnRecovery opts rules with only OnEnter/WhileTrue defined into the
	// bootstrap recovery path. When false (default), such rules must declare
	// OnRecovery explicitly to re-fire on restart. Rules that already declare
	// OnRecovery always participate regardless of this flag.
	RerunOnRecovery bool `json:"rerun_on_recovery,omitempty"`

	// MaxIterations limits how many times a rule can enter the matching state for an entity.
	// 0 means no limit. Use $state.iteration and $state.max_iterations in When clauses
	// for retry budgets (e.g., retry up to 3 times then escalate).
	MaxIterations int `json:"max_iterations,omitempty"`

	// FireEveryNEvents makes the rule fire its action only every Nth matching
	// event. N=0 and N=1 both mean "fire on every match" (backward-compatible
	// default). N=2 fires on the 2nd, 4th, 6th matching events; N=10 fires on
	// the 10th, 20th, 30th, etc.
	//
	// The MATCH check still runs on every event — only the action is gated.
	// Counter state is per-rule (not per-entity) and resets when the rule
	// definition is replaced via applyRuleChanges, so a policy change begins
	// with a fresh rhythm.
	//
	// This is a parallel dimension to time-based AlertCooldownPeriod; neither
	// replaces the other.
	FireEveryNEvents int `json:"fire_every_n_events,omitempty"`
}

Definition represents a JSON rule configuration

type Dependencies

type Dependencies struct {
	NATSClient *natsclient.Client
	Logger     *slog.Logger
}

Dependencies provides dependencies for rule creation

type EntityConfig

type EntityConfig struct {
	Pattern      string   `json:"pattern"`       // Entity ID pattern to match
	WatchBuckets []string `json:"watch_buckets"` // KV buckets to watch
}

EntityConfig defines entity-specific configuration

type EntityStateEvaluator

type EntityStateEvaluator interface {
	// EvaluateEntityState evaluates the rule directly against EntityState triples.
	// Rule conditions should use full predicate paths (e.g., "sensor.measurement.fahrenheit").
	EvaluateEntityState(entityState *gtypes.EntityState) bool
}

EntityStateEvaluator is an optional interface for rules that can evaluate directly against EntityState triples, bypassing the message transformation layer. This is more efficient for rules that need to access triple predicates directly.

type Evaluation

type Evaluation struct {
	Rule              Definition
	EntityID          string
	RelatedID         string // "" for single-entity rules
	CurrentlyMatching bool
	Entity            *gtypes.EntityState // nil for message-path rules
	Related           *gtypes.EntityState // nil for single-entity or message-path
	Revision          uint64              // KV revision that triggered this evaluation; 0 if unknown
	Bootstrap         bool                // true during watcher initial-state replay
}

Evaluation groups the inputs to a single stateful rule evaluation. Zero values are meaningful: Revision=0 means "no KV revision available" (message-path rules), Bootstrap=false means live traffic (not startup replay), Entity/Related=nil means message-path rule.

type Event

type Event interface {
	// EventType returns the type identifier for this event
	EventType() string

	// Subject returns the NATS subject for publishing this event
	Subject() string

	// Payload returns the event data as a generic map
	Payload() map[string]any

	// Validate checks if the event is valid and ready to publish
	Validate() error
}

Event represents a generic event that can be published by rules. This interface allows rules to be decoupled from specific event types while still integrating with graph events when EnableGraphIntegration is enabled.

type Example

type Example struct {
	Name        string     `json:"name"`
	Description string     `json:"description"`
	Config      Definition `json:"config"`
}

Example provides example configurations

type ExecutionContext

type ExecutionContext struct {
	// EntityID is the primary entity identifier.
	EntityID string

	// RelatedID is the related entity identifier (empty for single-entity rules).
	RelatedID string

	// Entity is the full entity state with triples (nil for message-path rules).
	Entity *gtypes.EntityState

	// Related is the related entity state (nil for single-entity rules or message-path).
	Related *gtypes.EntityState

	// State is the current match state including iteration tracking.
	// May be nil for first evaluation before state is persisted.
	State *MatchState
}

ExecutionContext carries typed data through the rule evaluation → action pipeline. It replaces the previous (entityID, relatedID string) action signature, providing actions with the full entity state and match state for richer execution logic.

func (*ExecutionContext) RuleID

func (ec *ExecutionContext) RuleID() string

RuleID returns the originating rule's identifier, or an empty string if the execution context has no associated match state (e.g. message-path actions invoked outside a stateful evaluator). Callers use this to scope feedback loop prevention to the rule that caused the write.

func (*ExecutionContext) SubstituteVariables

func (ec *ExecutionContext) SubstituteVariables(template string) string

SubstituteVariables replaces template variables with values from the execution context. Supported variables:

  • $entity.id: The primary entity ID
  • $related.id: The related entity ID (for pair rules)
  • $state.iteration: Current iteration count
  • $state.max_iterations: Configured max iterations

Entity triple values can be accessed via $entity.triple.<predicate> syntax.

If any template variable survives substitution (e.g. $entity.triple.X where X isn't on the entity at fire time — a common race with late-arriving triples), the literal stays in the output and a warning is logged so the author sees the silent-pass. Downstream callers that feed the result into an identifier (KV key, NATS subject) will then get a loud failure instead of mysteriously wrong behaviour.

type ExpressionRule

type ExpressionRule struct {
	// contains filtered or unexported fields
}

ExpressionRule implements Rule interface for expression-based condition evaluation

func NewExpressionRule

func NewExpressionRule(def Definition) (*ExpressionRule, error)

NewExpressionRule creates a new expression-based rule

func (*ExpressionRule) Evaluate

func (r *ExpressionRule) Evaluate(messages []message.Message) bool

Evaluate evaluates the rule against messages

func (*ExpressionRule) EvaluateEntityState

func (r *ExpressionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool

EvaluateEntityState evaluates the rule directly against EntityState triples. This bypasses the message transformation layer and evaluates conditions directly against triple predicates (e.g., "sensor.measurement.fahrenheit").

func (*ExpressionRule) ExecuteEvents

func (r *ExpressionRule) ExecuteEvents(messages []message.Message) ([]Event, error)

ExecuteEvents generates events when rule triggers

func (*ExpressionRule) Name

func (r *ExpressionRule) Name() string

Name returns the rule name

func (*ExpressionRule) Subscribe

func (r *ExpressionRule) Subscribe() []string

Subscribe returns subjects this rule subscribes to

type ExpressionRuleFactory

type ExpressionRuleFactory struct {
	// contains filtered or unexported fields
}

ExpressionRuleFactory creates expression-based rules

func NewExpressionRuleFactory

func NewExpressionRuleFactory() *ExpressionRuleFactory

NewExpressionRuleFactory creates a new expression rule factory

func (*ExpressionRuleFactory) Create

Create creates an expression rule from definition

func (*ExpressionRuleFactory) Schema

func (f *ExpressionRuleFactory) Schema() Schema

Schema returns the expression rule schema

func (*ExpressionRuleFactory) Type

func (f *ExpressionRuleFactory) Type() string

Type returns the factory type

func (*ExpressionRuleFactory) Validate

func (f *ExpressionRuleFactory) Validate(def Definition) error

Validate validates the rule definition

type Factory

type Factory interface {
	// Create creates a rule instance from configuration
	Create(id string, config Definition, deps Dependencies) (Rule, error)

	// Type returns the rule type this factory creates
	Type() string

	// Schema returns the configuration schema for UI discovery
	Schema() Schema

	// Validate validates a rule configuration
	Validate(config Definition) error
}

Factory creates rules from configuration

func GetRuleFactory

func GetRuleFactory(ruleType string) (Factory, bool)

GetRuleFactory returns a registered rule factory

type KVTestHelper

type KVTestHelper struct {
	// contains filtered or unexported fields
}

KVTestHelper provides utilities for KV-based rule testing

func NewKVTestHelper

func NewKVTestHelper(t *testing.T, bucket jetstream.KeyValue) *KVTestHelper

NewKVTestHelper creates a new KV test helper

func (*KVTestHelper) DeleteEntity

func (h *KVTestHelper) DeleteEntity(entityID string)

DeleteEntity removes an entity from the KV bucket

func (*KVTestHelper) UpdateEntityProperty

func (h *KVTestHelper) UpdateEntityProperty(entityID string, predicateFull string, value interface{})

UpdateEntityProperty updates a specific property of an entity

func (*KVTestHelper) WriteEntityState

func (h *KVTestHelper) WriteEntityState(entity *gtypes.EntityState)

WriteEntityState writes an entity state to the KV bucket

type KVWriter

type KVWriter interface {
	// UpdateJSON performs a CAS read-modify-write on a JSON document in the named bucket.
	// The updateFn receives the current value (empty map if key doesn't exist) and mutates it.
	// Uses exponential backoff retry on CAS conflicts (via natsclient.KVStore.UpdateJSON).
	UpdateJSON(ctx context.Context, bucket, key string, updateFn func(current map[string]any) error) error

	// PutJSON writes a JSON value to the named bucket (last writer wins, no CAS).
	PutJSON(ctx context.Context, bucket, key string, value map[string]any) error
}

KVWriter performs read-modify-write operations on domain KV buckets. It abstracts the underlying NATS KV infrastructure for testability.

type MatchState

type MatchState struct {
	RuleID         string    `json:"rule_id"`
	EntityKey      string    `json:"entity_key"`
	IsMatching     bool      `json:"is_matching"`
	LastTransition string    `json:"last_transition"` // ""|"entered"|"exited"
	TransitionAt   time.Time `json:"transition_at,omitempty"`
	SourceRevision uint64    `json:"source_revision"`
	LastChecked    time.Time `json:"last_checked"`

	// Iteration tracks how many times this rule has entered the matching state
	// for this entity. Incremented on each TransitionEntered, preserved through exits.
	Iteration int `json:"iteration"`

	// MaxIterations is the configured limit from the rule Definition.
	// 0 means no limit. Used with $state.iteration in When clauses for retry budgets.
	MaxIterations int `json:"max_iterations,omitempty"`

	// FieldValues tracks the last-seen values of fields used in transition conditions.
	// Keys are field names (e.g., "workflow.plan.status"), values are the string
	// representation of the last observed value. Used by the transition operator
	// to detect field-level state changes across evaluations.
	FieldValues map[string]string `json:"field_values,omitempty"`
}

MatchState tracks whether a rule's condition is currently matching for a specific entity or entity pair.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds Prometheus metrics for RuleProcessor component

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a component that processes messages through rules

func NewProcessor

func NewProcessor(natsClient *natsclient.Client, config *Config) (*Processor, error)

NewProcessor creates a new rule processor

func NewProcessorWithMetrics

func NewProcessorWithMetrics(natsClient *natsclient.Client, config *Config, metricsRegistry *metric.MetricsRegistry) (*Processor, error)

NewProcessorWithMetrics creates a new rule processor with optional metrics

func (*Processor) ApplyConfigUpdate

func (rp *Processor) ApplyConfigUpdate(changes map[string]any) error

ApplyConfigUpdate applies validated configuration changes

func (*Processor) ConfigSchema

func (rp *Processor) ConfigSchema() component.ConfigSchema

ConfigSchema returns configuration schema for component interface

func (*Processor) DataFlow

func (rp *Processor) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Processor) DebugStatus

func (rp *Processor) DebugStatus() any

DebugStatus returns extended debug information for the rule processor. Implements component.DebugStatusProvider.

func (*Processor) GetRuleMetrics

func (rp *Processor) GetRuleMetrics() map[string]any

GetRuleMetrics returns metrics for all rules

func (*Processor) GetRuntimeConfig

func (rp *Processor) GetRuntimeConfig() map[string]any

GetRuntimeConfig returns current runtime configuration

func (*Processor) Health

func (rp *Processor) Health() component.HealthStatus

Health returns current health status

func (*Processor) Initialize

func (rp *Processor) Initialize() error

Initialize loads rules and prepares the processor

func (*Processor) InputPorts

func (rp *Processor) InputPorts() []component.Port

InputPorts returns declared input ports

func (*Processor) Meta

func (rp *Processor) Meta() component.Metadata

Meta returns component metadata

func (*Processor) OutputPorts

func (rp *Processor) OutputPorts() []component.Port

OutputPorts returns declared output ports

func (*Processor) Start

func (rp *Processor) Start(ctx context.Context) error

Start begins processing messages through rules

func (*Processor) Stop

func (rp *Processor) Stop(_ time.Duration) error

Stop stops the processor and cleans up resources

func (*Processor) UpdateWatchBuckets

func (rp *Processor) UpdateWatchBuckets(newBuckets map[string][]string) error

UpdateWatchBuckets dynamically updates the entity watch buckets and patterns.

func (*Processor) UpdateWatchPatterns

func (rp *Processor) UpdateWatchPatterns(newPatterns []string) error

UpdateWatchPatterns dynamically updates the entity watch patterns for ENTITY_STATES. DEPRECATED: Use UpdateWatchBuckets for multi-bucket support.

func (*Processor) ValidateConfigUpdate

func (rp *Processor) ValidateConfigUpdate(changes map[string]any) error

ValidateConfigUpdate validates proposed configuration changes

type Publisher

type Publisher interface {
	// Publish sends a message to a NATS subject.
	// The implementation determines whether to use core NATS or JetStream
	// based on port configuration.
	Publish(ctx context.Context, subject string, data []byte) error
}

Publisher handles publishing messages to NATS subjects. It abstracts the decision between core NATS and JetStream publishing.

type Rule

type Rule interface {
	// Name returns the human-readable name of this rule
	Name() string

	// Subscribe returns the NATS subjects this rule should listen to
	Subscribe() []string

	// Evaluate checks if the rule conditions are met for the given messages
	Evaluate(messages []message.Message) bool

	// ExecuteEvents generates events when rule conditions are satisfied
	ExecuteEvents(messages []message.Message) ([]Event, error)
}

Rule interface defines the event-based contract for processing rules. Rules evaluate messages and generate events when conditions are met.

func CreateRuleFromDefinition

func CreateRuleFromDefinition(def Definition, deps Dependencies) (Rule, error)

CreateRuleFromDefinition creates a rule using the appropriate factory

type Schema

type Schema struct {
	Type        string                              `json:"type"`
	DisplayName string                              `json:"display_name"`
	Description string                              `json:"description"`
	Category    string                              `json:"category"`
	Icon        string                              `json:"icon,omitempty"`
	Properties  map[string]component.PropertySchema `json:"properties"`
	Required    []string                            `json:"required"`
	Examples    []Example                           `json:"examples,omitempty"`
}

Schema describes the configuration schema for a rule type

type StateTracker

type StateTracker struct {
	// contains filtered or unexported fields
}

StateTracker manages rule match state persistence in NATS KV. It tracks which rules are currently matching for which entities to enable proper transition detection for stateful ECA rules.

func NewStateTracker

func NewStateTracker(bucket jetstream.KeyValue, logger *slog.Logger) *StateTracker

NewStateTracker creates a new StateTracker with the given KV bucket.

func (*StateTracker) Delete

func (st *StateTracker) Delete(ctx context.Context, ruleID, entityKey string) error

Delete removes the match state for a rule and entity key.

func (*StateTracker) DeleteAllForEntity

func (st *StateTracker) DeleteAllForEntity(ctx context.Context, entityID string) error

DeleteAllForEntity removes all rule states associated with an entity. This is called when an entity is deleted to clean up orphaned state.

func (*StateTracker) Get

func (st *StateTracker) Get(ctx context.Context, ruleID, entityKey string) (MatchState, error)

Get retrieves the current match state for a rule and entity key. Returns ErrStateNotFound if no state exists for this combination.

func (*StateTracker) Set

func (st *StateTracker) Set(ctx context.Context, state MatchState) error

Set persists the match state for a rule and entity.

type StatefulEvaluator

type StatefulEvaluator struct {
	// contains filtered or unexported fields
}

StatefulEvaluator handles rule evaluation with state tracking. It manages state transitions (entered/exited/none) and executes the appropriate actions based on the transition type.

func NewStatefulEvaluator

func NewStatefulEvaluator(stateTracker *StateTracker, actionExecutor ActionExecutorInterface, logger *slog.Logger) *StatefulEvaluator

NewStatefulEvaluator creates a new stateful evaluator with the given dependencies. If logger is nil, uses the default logger.

func (*StatefulEvaluator) Evaluate

func (e *StatefulEvaluator) Evaluate(ctx context.Context, ev Evaluation) (Transition, error)

Evaluate runs a stateful rule evaluation and fires the appropriate actions for the detected transition:

  • TransitionEntered → OnEnter (increments iteration)
  • TransitionExited → OnExit (preserves iteration)
  • TransitionNone+match → WhileTrue

Action-level When clauses are evaluated before each action runs. New state is persisted to the StateTracker on the way out.

When Evaluation.Bootstrap is true, the watcher is replaying state after a restart. A persisted IsMatching=true state that still matches would otherwise produce TransitionNone — we promote it to a synthetic Entered (fires OnRecovery, or OnEnter under RerunOnRecovery) so state machines resume instead of hanging on restart.

type Status

type Status struct {
	DebounceDelayMs    int       `json:"debounce_delay_ms"`
	PendingEvaluations int       `json:"pending_evaluations"`
	TotalEvaluations   int       `json:"total_evaluations"`
	TotalTriggers      int       `json:"total_triggers"`
	DebouncedCount     int       `json:"debounced_count"` // Matches Tester's test expectations
	RulesLoaded        int       `json:"rules_loaded"`
	LastEvaluationTime time.Time `json:"last_evaluation_time,omitempty"`

	// TrackedRevisions is the total number of (rule,entity,revision) tuples
	// currently in the feedback-loop-prevention map. Monitor this to detect
	// leaks from writes that the watcher never delivers — a healthy
	// processor's count should stay roughly proportional to in-flight rule
	// actions, bounded by the sweeper.
	TrackedRevisions int `json:"tracked_revisions"`
}

Status represents the current status of rule evaluation for debug observability

type TestRule

type TestRule struct {
	// contains filtered or unexported fields
}

TestRule is a functional rule implementation for testing

func NewTestRule

func NewTestRule(id, name string, subjects []string, conditions []expression.ConditionExpression) *TestRule

NewTestRule creates a new test rule

func (*TestRule) Evaluate

func (r *TestRule) Evaluate(messages []message.Message) bool

Evaluate evaluates the rule against messages

func (*TestRule) EvaluateEntityState

func (r *TestRule) EvaluateEntityState(entityState *gtypes.EntityState) bool

EvaluateEntityState evaluates the rule directly against EntityState triples. This implements the EntityStateEvaluator interface for KV watch-based evaluation.

func (*TestRule) ExecuteEvents

func (r *TestRule) ExecuteEvents(messages []message.Message) ([]Event, error)

ExecuteEvents generates events when rule triggers

func (*TestRule) Name

func (r *TestRule) Name() string

Name returns the rule name

func (*TestRule) Subscribe

func (r *TestRule) Subscribe() []string

Subscribe returns subjects this rule subscribes to

type TestRuleFactory

type TestRuleFactory struct {
	// contains filtered or unexported fields
}

TestRuleFactory creates test rules for integration testing

func NewTestRuleFactory

func NewTestRuleFactory() *TestRuleFactory

NewTestRuleFactory creates a new test rule factory

func (*TestRuleFactory) Create

func (f *TestRuleFactory) Create(ruleID string, def Definition, _ Dependencies) (Rule, error)

Create creates a test rule from definition

func (*TestRuleFactory) Examples

func (f *TestRuleFactory) Examples() []Example

Examples returns test rule examples

func (*TestRuleFactory) Schema

func (f *TestRuleFactory) Schema() Schema

Schema returns the test rule schema

func (*TestRuleFactory) Type

func (f *TestRuleFactory) Type() string

Type returns the factory type

func (*TestRuleFactory) Validate

func (f *TestRuleFactory) Validate(_ Definition) error

Validate validates the rule definition

type Transition

type Transition string

Transition represents the type of state change detected

const (
	// TransitionNone indicates no state change
	TransitionNone Transition = ""
	// TransitionEntered indicates condition became true
	TransitionEntered Transition = "entered"
	// TransitionExited indicates condition became false
	TransitionExited Transition = "exited"
)

func DetectTransition

func DetectTransition(wasMatching, nowMatching bool) Transition

DetectTransition compares previous and current match state to determine transition type. Returns TransitionEntered if condition changed from false to true. Returns TransitionExited if condition changed from true to false. Returns TransitionNone if state did not change.

type TripleMutator

type TripleMutator interface {
	// AddTriple adds a triple via NATS request/response and returns the KV revision.
	AddTriple(ctx context.Context, ruleID string, triple message.Triple) (uint64, error)
	// RemoveTriple removes a triple via NATS request/response and returns the KV revision.
	RemoveTriple(ctx context.Context, ruleID, subject, predicate string) (uint64, error)
}

TripleMutator handles triple mutations via NATS request/response. The returned uint64 is the KV revision after the write, used for per-rule feedback loop prevention. The ruleID identifies the originating rule so the revision can be scoped to that rule only — pass an empty ruleID for ad-hoc mutations that should not be tracked.

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError represents a payload validation error.

func (*ValidationError) Error

func (e *ValidationError) Error() string

type WorkflowTriggerPayload

type WorkflowTriggerPayload struct {
	WorkflowID  string         `json:"workflow_id"`
	EntityID    string         `json:"entity_id"`
	TriggeredAt time.Time      `json:"triggered_at"`
	RelatedID   string         `json:"related_id,omitempty"`
	Context     map[string]any `json:"context,omitempty"`
}

WorkflowTriggerPayload represents a message sent by the rule processor to trigger a reactive workflow. This payload is wrapped in a BaseMessage for proper deserialization by the reactive workflow engine.

func (*WorkflowTriggerPayload) MarshalJSON

func (p *WorkflowTriggerPayload) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler. This marshals just the payload fields - BaseMessage handles the wrapping.

func (*WorkflowTriggerPayload) Schema

func (p *WorkflowTriggerPayload) Schema() message.Type

Schema returns the message type for workflow trigger payloads.

func (*WorkflowTriggerPayload) UnmarshalJSON

func (p *WorkflowTriggerPayload) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

func (*WorkflowTriggerPayload) Validate

func (p *WorkflowTriggerPayload) Validate() error

Validate ensures the payload has required fields.

Directories

Path Synopsis
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
Package expression - Expression evaluator implementation
Package expression - Expression evaluator implementation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL