rule

package
v1.0.0-beta.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: MIT Imports: 30 Imported by: 0

README

Rule Processor

Processes message streams through configurable rules and evaluates conditions against semantic messages and entity state changes.

What It Does

  • Watches ENTITY_STATES KV bucket for entity changes
  • Evaluates rules against semantic messages and entity state
  • Publishes rule events and graph mutation requests
  • Provides time-window analysis with message buffering

Quick Start

config := rule.DefaultConfig()
config.Ports = &component.PortsDefinition{
    Inputs: []component.PortDefinition{{Subject: "process.>"}},
    Outputs: []component.PortDefinition{{Subject: "rule.events.>"}},
}

processor, err := rule.NewProcessor(natsClient, &config)
if err != nil {
    log.Fatal(err)
}

if err := processor.Initialize(); err != nil {
    log.Fatal(err)
}

if err := processor.Start(ctx); err != nil {
    log.Fatal(err)
}

Rule Definition

Rules are defined in JSON:

{
  "id": "battery-low",
  "name": "Battery Low Alert",
  "enabled": true,
  "conditions": [
    {"field": "drone.telemetry.battery", "operator": "lt", "value": 20}
  ],
  "on_enter": [
    {"type": "add_triple", "predicate": "alert.status", "object": "battery_low"},
    {"type": "publish", "subject": "alerts.battery"}
  ],
  "on_exit": [
    {"type": "remove_triple", "predicate": "alert.status"}
  ]
}

Documentation

Topic Location
Conceptual Overview docs/advanced/06-rules-engine.md
Rule Syntax docs/syntax.md
Conditions docs/conditions.md
Actions docs/actions.md
State Tracking docs/state-tracking.md
Entity Watching docs/entity-watching.md
Configuration docs/configuration.md
Custom Rules docs/custom-rules.md
Operations docs/operations.md
Examples docs/examples.md

Package Structure

File Purpose
processor.go Core processor, lifecycle, ports
config.go Config struct and defaults
factory.go Component registration
entity_watcher.go KV entity state watching
message_handler.go Message processing and rule evaluation
rule_loader.go Rule loading from JSON files
publisher.go Event publishing to NATS
metrics.go Prometheus metrics

Metrics

Key metrics exposed:

Metric Type Description
semstreams_rule_evaluations_total Counter Rule evaluations performed
semstreams_rule_triggers_total Counter Successful rule triggers
semstreams_rule_evaluation_duration_seconds Histogram Evaluation latency
semstreams_rule_active_rules Gauge Active rules count
semstreams_rule_errors_total Counter Processing errors

Design Decisions

Entity ID Format

Uses 6-part hierarchical dotted notation for global uniqueness:

<org>.<platform>.<system>.<domain>.<type>.<instance>
Example: c360.platform1.gcs1.robotics.drone.1
Nil Safety Pattern

Metrics use "nil input = nil feature" pattern - zero overhead when disabled.

Graph Integration

When enabled (default), rule actions directly affect the graph via add_triple and remove_triple actions.

Documentation

Overview

Package rule - Action execution for ECA rules

Package rule - Prometheus metrics for the cron scheduler.

The cron scheduler exposes seven metrics under the `semstreams_cron_*` namespace. They mirror what an operator typically wants to see for a periodically-firing component:

  • fires_total{rule_id, status} — counter; status is success / error / panic / cooldown_skipped. Lets ops see fire rates per rule and spot rules whose cron expression ticks faster than actions complete (cooldown_skipped grows).
  • fire_duration_seconds{rule_id} — histogram of action-dispatch latency. Surfaces slow `publish_agent` rules that risk overlapping with the next tick before the cooldown gate triggers.
  • registered — gauge; total registered cron rules. Flips on hot-reload; useful for confirming a config change took effect.
  • missed_fires_total{rule_id} — counter; incremented once per missed fire detected on Start. Per ADR-031 the policy is log-only, so the metric tracks cumulative missed fires rather than auto-replay attempts.
  • missed_fires_capped_total{rule_id} — counter; incremented when a rule's missed-fire count hits the detection cap (currently 100). Lets operators alert on long downtimes without grepping logs.
  • next_fire_timestamp_seconds{rule_id} — gauge; Unix epoch seconds of the rule's next scheduled fire. Enables alerts like "no fire in last 2× period" by comparing scrape time against the gauge.
  • scheduler_running — gauge; 1 when Start has been called and Stop hasn't, 0 otherwise. Catches the "processor still alive but scheduler crashed" pathology.

The constructor follows the routerMetrics pattern in agentic-dispatch: per-registry instances cached so multiple processors sharing a registry get the same collectors, plus a nil-registry singleton fallback for production deployments using the default Prometheus registerer. Both paths support nil-tolerant recordX helpers so callers don't have to nil-check before every observation.

Package rule - Cron rule type for time-driven actions.

Cron rules are a third firing path alongside KV-watch and message-path rules. Unlike expression rules, cron rules have no condition — they fire on a clock tick alone, dispatched by CronScheduler against the same ActionExecutor that expression rules use.

CronRule is intentionally NOT an implementation of the Rule interface. Subscribe / Evaluate / ExecuteEvents are message-driven concepts that don't fit time-driven firing. Keeping cron rules outside the Rule registry means they don't accidentally appear in evaluateRulesForMessage dispatch and don't need no-op stubs cluttering the type.

Package rule - Cron scheduler for time-driven rule firing.

CronScheduler is the third firing path of the rule processor, parallel to message-driven and KV-watch-driven evaluation. It wraps robfig/cron/v3.Cron and dispatches each registered CronRule's actions through the shared ActionExecutor at the times described by the rule's cron expression.

Lifecycle is owned by the rule processor:

  1. Processor.Start (via initializeCronScheduler) constructs the scheduler and registers the cron rules loaded by Initialize.
  2. Processor.run launches the scheduler with Start(ctx); robfig spawns its own internal goroutine that walks the schedule and dispatches each fire callback on its own goroutine.
  3. Processor.run defers Stop on shutdown, draining in-flight fires up to the shutdown grace period.
  4. Hot reload (applyRuleChanges) calls Register / Deregister under the processor's mu.Lock; robfig supports live add/remove without a scheduler restart.

The fire path is best-effort: per-rule panic-recover keeps one bad action from crashing the scheduler goroutine, action errors are logged and do not stop sibling actions in the same tick. This matches StatefulEvaluator behaviour so the operator's mental model is consistent across rule kinds.

Package rule - Cron-specific substitution context.

CronRule fires have no entity in scope at MVP — the scheduler ticks on a clock alone. Action templates therefore cannot use the `$entity.*` / `$related.*` / `$state.*` namespaces that expression rules populate from a matched entity. Instead, cron actions read schedule-shaped context: `$schedule.id`, `$schedule.spec`, `$schedule.last_fired_at`. Plus the existing `$now` token, which SubstituteVariables already handles for every rule kind.

Retrofit-safe namespace seam

`$schedule.*` is intentionally disjoint from `$entity.*`. When the deferred per-entity fan-out extension lands (Definition.ForEach), each iteration of the fan-out gets the existing `$entity.*` namespace populated identically to expression-rule firing — the cron path supplies an EntityID, an Entity (or *EntityState fetched on demand), and a State; the substitution layer needs no changes. Adding `for_each` is purely additive.

A future `$trigger.*` namespace (planned for richer tick metadata — e.g. `$trigger.attempt`, `$trigger.scheduled_for`) would land here as a sibling shim. Do not overload `$schedule.*` to carry per-tick metadata; keeping spec-shaped data and tick-shaped data in separate namespaces preserves the door for that extension.

Package rule provides rule-based processing of semantic message streams with support for entity state watching and event generation.

Overview

The rule processor evaluates conditions against streaming messages and entity state changes, generating graph events when rules trigger. Rules are defined using JSON configuration and implemented using the factory pattern.

Architecture

Rules consist of three main components:

  • Definition: JSON configuration (conditions, logic, metadata)
  • Factory: Creates Rule instances from definitions
  • Rule: Evaluates conditions and generates events

Rule Interface

Rules must implement this interface:

type Rule interface {
    Name() string
    Subscribe() []string
    Evaluate(messages []message.Message) bool
    ExecuteEvents(messages []message.Message) ([]rtypes.Event, error)
}

Creating Custom Rules

1. Create a RuleFactory:

type MyRuleFactory struct {
    ruleType string
}

func (f *MyRuleFactory) Create(id string, def Definition, deps Dependencies) (rtypes.Rule, error) {
    return &MyRule{...}, nil
}

func (f *MyRuleFactory) Validate(def Definition) error {
    // Validate rule configuration
    return nil
}

2. Implement the Rule interface:

type MyRule struct {
    id         string
    conditions []expression.ConditionExpression
}

func (r *MyRule) ExecuteEvents(messages []message.Message) ([]rtypes.Event, error) {
    // Generate events directly in Go code
    // gtypes.Event implements rtypes.Event interface
    event := &gtypes.Event{
        Type:     gtypes.EventEntityUpdate,
        EntityID: "alert.my-rule." + r.id,
        Properties: map[string]interface{}{
            "triggered": true,
            "timestamp": time.Now(),
        },
    }
    return []rtypes.Event{event}, nil
}

3. Register the factory:

func init() {
    RegisterRuleFactory("my_rule", &MyRuleFactory{ruleType: "my_rule"})
}

Entity State Watching

Rules can watch entity state changes via NATS KV buckets:

{
  "id": "my-rule",
  "type": "my_rule",
  "entity": {
    "pattern": "c360.*.robotics.drone.>",
    "watch_buckets": ["ENTITY_STATES"]
  }
}

The processor watches these buckets using the KV watch pattern and converts entity state updates into messages for rule evaluation.

Runtime Configuration

Rules support dynamic runtime updates via ApplyConfigUpdate():

  • Add/remove rules without restart
  • Update rule conditions and metadata
  • Enable/disable rules on the fly

Event Generation

Rules generate gtypes.Event directly (no template system):

  • Events contain: Type, EntityID, Properties, Metadata
  • Published to NATS graph subjects
  • Consumed by graph processor for entity storage

Metrics

The processor exposes Prometheus metrics:

  • semstreams_rule_evaluations_total
  • semstreams_rule_triggers_total
  • semstreams_rule_evaluation_duration_seconds
  • semstreams_rule_errors_total

Example Usage

config := rule.DefaultConfig()
config.Ports = &component.PortConfig{
    Inputs: []component.PortDefinition{
        {
            Name:    "semantic_messages",
            Type:    "nats",
            Subject: "process.>",
        },
    },
}

processor, err := rule.NewProcessor(natsClient, &config)
if err != nil {
    log.Fatal(err)
}
err = processor.Initialize()
err = processor.Start(ctx)

For comprehensive documentation, see:

  • /Users/coby/Code/c360/semdocs/docs/guides/rules-engine.md
  • /Users/coby/Code/c360/semdocs/docs/specs/SPEC-001-generic-rules-engine.md

Package rule - Execution context for rule actions

Package rule - Expression Rule Factory for condition-based rules

Package rule provides interfaces and implementations for rule processing

Package rule - NATS KV Configuration Integration for Rules

Package rule - KV write action support for rule-driven state machine orchestration

Package rule provides a rule processing component that implements the Discoverable interface for processing message streams through rules

Package rule - Rule Factory Pattern for Dynamic Rule Creation

Package rule - Schedule tracking for cron rule firing.

ScheduleTracker persists per-rule last-fired timestamps to the RULE_SCHEDULES KV bucket so that:

  1. Restart-recovery can detect missed fires by comparing the persisted last-fired timestamp against the cron schedule's expected next fire.
  2. Operators (and downstream components like governance startup hooks) can read last-fire state without coupling to scheduler internals.

The bucket is intentionally simple: one JSON record per rule, keyed by rule ID. Records are upserted on every successful fire; they have no TTL (retention is the rule's own enable/disable lifecycle).

Key shape (retrofit-safe)

MVP keys are the bare rule ID — `{ruleID}` — because every cron rule fires once globally per tick. When the deferred per-entity fan-out extension lands (Definition.ForEach), keys will gain a `.{entityID}` suffix to track last-fired per fanout target. Existing single-fire rules keep the bare-rule-ID shape; the suffix is purely additive and requires no migration. See ADR-031 retrofit seams for context.

External read access

Governance startup hooks (kill-switch sweeps, chain-hash audits) need to read last-fired timestamps to issue catch-up sweeps when a missed fire crosses a meaningful interval. Two access patterns are supported:

  1. In-process callers hold a *ScheduleTracker reference and call LastFiredAt(ctx, ruleID).
  2. Out-of-process callers read RULE_SCHEDULES KV directly using the documented record shape (LastFireRecord). Bucket name and JSON shape are part of the framework's public contract.

Package rule - State tracking for stateful ECA rules

Package rule - Stateful rule evaluation with state tracking

Package rule - Test Rule Factory for Integration Tests

Package rule provides triple mutation support via NATS request/response

Package rule - Workflow trigger payload for rule-to-workflow integration

Index

Constants

View Source
const (
	// ActionTypePublish publishes a message to a NATS subject
	ActionTypePublish = "publish"
	// ActionTypeAddTriple creates a relationship triple in the graph
	ActionTypeAddTriple = "add_triple"
	// ActionTypeRemoveTriple removes a relationship triple from the graph
	ActionTypeRemoveTriple = "remove_triple"
	// ActionTypeUpdateTriple updates metadata on an existing triple
	ActionTypeUpdateTriple = "update_triple"
	// ActionTypePublishAgent triggers an agentic loop by publishing a TaskMessage
	ActionTypePublishAgent = "publish_agent"
	// ActionTypeTriggerWorkflow triggers a reactive workflow by publishing to workflow.trigger.<workflow_id>
	ActionTypeTriggerWorkflow = "trigger_workflow"
	// ActionTypePublishBoidSignal publishes a Boid steering signal for agent coordination
	ActionTypePublishBoidSignal = "publish_boid_signal"
	// ActionTypeUpdateKV writes JSON to a named KV bucket with optional CAS merge
	ActionTypeUpdateKV = "update_kv"
)

Action type constants define the supported action types for rule execution.

View Source
const (
	SubjectTripleAdd    = "graph.mutation.triple.add"
	SubjectTripleRemove = "graph.mutation.triple.remove"
	MutationTimeout     = 5 * time.Second
)

NATS subjects for graph mutations (must match processor/graph/mutations.go)

View Source
const (
	WorkflowTriggerDomain   = "rule"
	WorkflowTriggerCategory = "workflow_trigger"
	WorkflowTriggerVersion  = "v1"
)

Workflow trigger payload type constants. Uses "rule" domain to avoid conflict with processor/workflow's workflow.trigger.v1

View Source
const CronRuleType = "cron"

CronRuleType is the string used as Definition.Type for cron rules.

View Source
const ScheduleBucketName = "RULE_SCHEDULES"

ScheduleBucketName is the canonical NATS KV bucket name for per-rule last-fired timestamps. Exported so out-of-process readers (governance startup hooks, ops dashboards) can address the bucket without stringly-typed duplication.

Variables

View Source
var ErrScheduleRecordNotFound = errors.New("schedule record not found")

ErrScheduleRecordNotFound is returned by LastFiredAt when no fire has been recorded for the requested rule. Callers should treat this as "never fired" rather than an error condition — it is the expected state for a freshly-deployed rule.

View Source
var ErrStateNotFound = errors.New("rule state not found")

ErrStateNotFound is returned when no rule state exists for the given key

Functions

func AssertEventuallyTrue

func AssertEventuallyTrue(t *testing.T, condition func() bool, timeout time.Duration, tick time.Duration, msgAndArgs ...interface{})

AssertEventuallyTrue asserts that a condition becomes true within timeout

func CreateBatteryEntity

func CreateBatteryEntity(id string, level float64, voltage float64) *gtypes.EntityState

CreateBatteryEntity creates a battery entity for testing

func CreateDroneEntity

func CreateDroneEntity(id string, armed bool, mode string, altitude float64) *gtypes.EntityState

CreateDroneEntity creates a drone entity for testing

func CreateRuleProcessor

func CreateRuleProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateRuleProcessor creates a rule processor with the new factory pattern

func CreateTestEntityPattern

func CreateTestEntityPattern(domain, category, instance string) string

CreateTestEntityPattern creates entity IDs matching test patterns

func GetRegisteredRuleTypes

func GetRegisteredRuleTypes() []string

GetRegisteredRuleTypes returns all registered rule types.

Note: built-in rule kinds that do not go through the factory registry (today: "cron" — see CronRuleType) are not returned here. Callers that need the complete known-types surface should also include them explicitly. See isKnownRuleType for the validation-side equivalent.

func GetRuleSchemas

func GetRuleSchemas() map[string]Schema

GetRuleSchemas returns schemas for all registered rule types

func Register

func Register(registry *component.Registry) error

Register registers the rule processor component with the given registry

func RegisterPayloads

func RegisterPayloads(reg *payloadregistry.Registry) error

RegisterPayloads registers the workflow-trigger payload type with the supplied registry. Called from payloadbuiltins.Register at process bootstrap.

func RegisterRuleFactory

func RegisterRuleFactory(ruleType string, factory Factory) error

RegisterRuleFactory registers a rule factory

func SetupKVBucketForTesting

func SetupKVBucketForTesting(t *testing.T, js jetstream.JetStream, bucketName string) jetstream.KeyValue

SetupKVBucketForTesting creates and initializes a KV bucket for testing

func UnregisterRuleFactory

func UnregisterRuleFactory(ruleType string) error

UnregisterRuleFactory removes a rule factory for a given type This is primarily for testing or dynamic factory management

func ValidateDefinition

func ValidateDefinition(def Definition) error

ValidateDefinition validates a rule Definition for fields that are processor-level concerns (not delegated to individual rule factories). Call this before passing a Definition to CreateRuleFromDefinition.

func VerifyRuleTriggered

func VerifyRuleTriggered(_ *testing.T, processor *Processor, ruleName string) bool

VerifyRuleTriggered checks if a rule has been triggered by examining metrics or events

func WaitForEntityProcessing

func WaitForEntityProcessing(_ *testing.T, _ time.Duration)

WaitForEntityProcessing waits for an entity update to be processed This is useful when testing async KV watchers

func WaitForKVWatcher

func WaitForKVWatcher(t *testing.T, processor *Processor, timeout time.Duration)

WaitForKVWatcher waits for a KV watcher to be ready

Types

type Action

type Action struct {
	// Type specifies the action type (publish, add_triple, remove_triple, update_triple, publish_agent)
	Type string `json:"type"`

	// Subject is the NATS subject for publish actions
	Subject string `json:"subject,omitempty"`

	// Predicate is the relationship type for triple actions
	Predicate string `json:"predicate,omitempty"`

	// Object is the target entity or value for triple actions
	Object string `json:"object,omitempty"`

	// TTL specifies optional expiration time for triples (e.g., "5m", "1h")
	TTL string `json:"ttl,omitempty"`

	// Properties contains additional metadata for the action
	Properties map[string]any `json:"properties,omitempty"`

	// Role is the agent role for publish_agent actions (e.g., "general", "architect", "editor")
	Role string `json:"role,omitempty"`

	// Model is the model endpoint name for publish_agent actions
	Model string `json:"model,omitempty"`

	// Prompt is the task prompt template for publish_agent actions
	// Supports variable substitution: $entity.id, $related.id
	Prompt string `json:"prompt,omitempty"`

	// Tools is the per-spawned-agent tool allowlist for publish_agent actions.
	// Tool names are resolved to agentic.ToolDefinition against the global
	// tool registry at dispatch time; unknown names are logged and dropped.
	// Empty/nil leaves TaskMessage.Tools unset, which makes the spawned loop
	// fall back to global tool discovery (existing behaviour).
	//
	// This is the product-layer hook for scoping which tools a given role
	// can see. Putting it on the rule — not in agentic-tools Config — keeps
	// role→tools decisions in the workflow config that already owns the
	// role name, model, and prompt for the spawned agent.
	Tools []string `json:"tools,omitempty"`

	// WorkflowID is the workflow identifier for trigger_workflow actions
	WorkflowID string `json:"workflow_id,omitempty"`

	// ContextData provides additional context passed to the workflow
	ContextData map[string]any `json:"context_data,omitempty"`

	// BoidSignalType specifies the type of boid signal: separation, cohesion, or alignment
	BoidSignalType string `json:"boid_signal_type,omitempty"`

	// BoidStrength specifies the steering strength for boid signals (0.0-1.0)
	BoidStrength float64 `json:"boid_strength,omitempty"`

	// WorkflowSlug identifies the workflow for publish_agent actions (e.g., "github-issue-to-pr")
	WorkflowSlug string `json:"workflow_slug,omitempty"`

	// WorkflowStep identifies the step within the workflow (e.g., "qualify", "develop", "review")
	WorkflowStep string `json:"workflow_step,omitempty"`

	// When is an optional guard clause for conditional action execution.
	// If present, all conditions must match (AND logic) against the entity's current
	// state and $state.* fields for the action to execute. Actions without When always execute.
	When []expression.ConditionExpression `json:"when,omitempty"`

	// Bucket is the KV bucket name for update_kv actions (e.g., "PLAN_STATES").
	// Supports variable substitution.
	Bucket string `json:"bucket,omitempty"`

	// Key is the KV key for update_kv actions. Supports variable substitution.
	Key string `json:"key,omitempty"`

	// Payload is the data to write for update_kv actions.
	// Supports variable substitution in string values (including nested maps).
	Payload map[string]any `json:"payload,omitempty"`

	// Merge controls write semantics for update_kv:
	// true = CAS read-modify-write (merge payload into existing document)
	// false = overwrite entire document (last writer wins)
	Merge bool `json:"merge,omitempty"`
}

Action represents an action to execute when a rule fires. Actions are triggered by state transitions (OnEnter, OnExit) or while a condition remains true (WhileTrue).

func (Action) ParseTTL

func (a Action) ParseTTL() (time.Duration, error)

ParseTTL parses the TTL string into a duration. Returns 0 duration if TTL is empty (no expiration). Returns an error if the TTL format is invalid or negative.

type ActionExecutor

type ActionExecutor struct {
	// contains filtered or unexported fields
}

ActionExecutor executes actions for rules. It handles triple mutations, NATS publishing, KV writes, and other action types.

func NewActionExecutor

func NewActionExecutor(logger *slog.Logger) *ActionExecutor

NewActionExecutor creates a new ActionExecutor with the given logger. If logger is nil, uses the default logger.

func NewActionExecutorComplete

func NewActionExecutorComplete(logger *slog.Logger, mutator TripleMutator, publisher Publisher, kvWriter KVWriter) *ActionExecutor

NewActionExecutorComplete creates an ActionExecutor with all capabilities including KV writes.

func NewActionExecutorFull

func NewActionExecutorFull(logger *slog.Logger, mutator TripleMutator, publisher Publisher) *ActionExecutor

NewActionExecutorFull creates a new ActionExecutor with full functionality. The mutator enables triple persistence, and the publisher enables NATS publishing.

func NewActionExecutorWithMutator

func NewActionExecutorWithMutator(logger *slog.Logger, mutator TripleMutator) *ActionExecutor

NewActionExecutorWithMutator creates a new ActionExecutor with triple mutation support. The mutator enables actual persistence of triple operations via NATS request/response.

func (*ActionExecutor) Execute

func (e *ActionExecutor) Execute(ctx context.Context, action Action, ec *ExecutionContext) error

Execute runs the given action using the execution context. The ExecutionContext provides the entity ID, related entity ID, full entity state, and match state for rich action execution.

func (*ActionExecutor) ExecuteAddTriple

func (e *ActionExecutor) ExecuteAddTriple(ctx context.Context, action Action, ec *ExecutionContext) (message.Triple, error)

ExecuteAddTriple executes an add_triple action, creating a new semantic triple. Returns the created triple and any error that occurred. If a TripleMutator is configured, the triple is persisted via NATS request/response.

func (*ActionExecutor) ExecuteRemoveTriple

func (e *ActionExecutor) ExecuteRemoveTriple(ctx context.Context, action Action, ec *ExecutionContext) error

ExecuteRemoveTriple executes a remove_triple action, removing a semantic triple. If a TripleMutator is configured, the triple is removed via NATS request/response.

func (*ActionExecutor) SetToolRegistry

func (e *ActionExecutor) SetToolRegistry(r component.ToolRegistryReader)

SetToolRegistry installs the shared tool registry used by resolveToolNames during publish_agent action execution. nil-valued arg disables tool name resolution (tools list passed to the agent is left empty). Set explicitly after construction by the rule processor when it has access to deps.ToolRegistry.

type ActionExecutorInterface

type ActionExecutorInterface interface {
	Execute(ctx context.Context, action Action, ec *ExecutionContext) error
}

ActionExecutorInterface defines the interface for action execution. Actions receive an ExecutionContext with the full entity state and match state, replacing the previous (entityID, relatedID string) signature.

type BaseRuleFactory

type BaseRuleFactory struct {
	// contains filtered or unexported fields
}

BaseRuleFactory provides common factory functionality

func NewBaseRuleFactory

func NewBaseRuleFactory(ruleType, displayName, description, category string) *BaseRuleFactory

NewBaseRuleFactory creates a base factory implementation

func (*BaseRuleFactory) Type

func (f *BaseRuleFactory) Type() string

Type returns the rule type

func (*BaseRuleFactory) ValidateExpression

func (f *BaseRuleFactory) ValidateExpression(def Definition) error

ValidateExpression validates expression configuration

type Config

type Config struct {
	// Port configuration for inputs and outputs
	Ports *component.PortConfig `` /* 168-byte string literal not displayed */

	// Rule configuration sources
	RulesFiles  []string     `json:"rules_files" schema:"type:array,description:Paths to JSON rule definition files,default:[],category:basic"`
	InlineRules []Definition `json:"inline_rules,omitempty" schema:"type:array,description:Inline rule definitions (alternative to files),category:basic"`

	// Message cache configuration (not exposed in schema - internal config)
	MessageCache cache.Config `json:"message_cache"`

	// Buffer window size for time-window analysis
	BufferWindowSize string `` /* 135-byte string literal not displayed */

	// Alert cooldown period to prevent spam
	AlertCooldownPeriod string `` /* 139-byte string literal not displayed */

	// Graph processor integration
	EnableGraphIntegration bool `` /* 130-byte string literal not displayed */

	// NATS KV patterns to watch for entity changes (e.g., 'telemetry.robotics.>')
	// DEPRECATED: Use EntityWatchBuckets for multi-bucket support. This field is still
	// supported for backwards compatibility and applies to ENTITY_STATES bucket.
	EntityWatchPatterns []string `` /* 153-byte string literal not displayed */

	// EntityWatchBuckets maps bucket names to watch patterns.
	// This enables rules to observe operational results from multiple components.
	// Example: {"ENTITY_STATES": ["telemetry.>"], "WORKFLOW_EXECUTIONS": ["COMPLETE_*"]}
	// If not specified, falls back to EntityWatchPatterns for ENTITY_STATES bucket.
	EntityWatchBuckets map[string][]string `` /* 147-byte string literal not displayed */

	// Debounce delay for rule evaluation (settling time for entity state)
	// Default is 0 (disabled) to ensure rules evaluate against each state change.
	// Set to a positive value (e.g., 100) to batch rapid updates and evaluate final state only.
	DebounceDelayMs time.Duration `` /* 146-byte string literal not displayed */

	// JetStream consumer configuration (not exposed in schema - internal config)
	Consumer struct {
		Enabled        bool   `json:"enabled"`          // Enable JetStream consumer
		AckWaitSeconds int    `json:"ack_wait_seconds"` // Acknowledgment timeout
		MaxDeliver     int    `json:"max_deliver"`      // Max delivery attempts
		ReplayPolicy   string `json:"replay_policy"`    // "instant" or "original"
	} `json:"consumer"`
}

Config holds configuration for the RuleProcessor

func CreateRuleTestConfig

func CreateRuleTestConfig(watchPatterns []string) Config

CreateRuleTestConfig creates a test configuration for rule processor

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns sensible defaults

func (Config) MarshalJSON

func (c Config) MarshalJSON() ([]byte, error)

MarshalJSON implements custom JSON marshaling for Config

func (*Config) UnmarshalJSON

func (c *Config) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom JSON unmarshaling for Config

type ConfigManager

type ConfigManager struct {
	// contains filtered or unexported fields
}

ConfigManager manages rules through NATS KV configuration.

Two instances of ConfigManager coexist in a running binary:

  1. The Pattern-B CRUD manager constructed in cmd/semstreams/main.go (buildRuleManager) with processor=nil. It handles agent tool writes (create_rule, update_rule, delete_rule) and has no watcher.
  2. The component-internal manager constructed inside Processor.Start with processor non-nil. It owns the KV watcher and applies hot-reload updates to the running processor.

Both read/write semstreams_config:rules.*. The component watcher picks up writes from the CRUD manager via normal KV semantics.

func NewConfigManager

func NewConfigManager(processor *Processor, configMgr *config.Manager, logger *slog.Logger) *ConfigManager

NewConfigManager creates a new rule configuration manager

func (*ConfigManager) DeleteRule

func (rcm *ConfigManager) DeleteRule(ctx context.Context, ruleID string) error

DeleteRule removes a rule configuration from NATS KV.

func (*ConfigManager) GetRule

func (rcm *ConfigManager) GetRule(ctx context.Context, ruleID string) (*Definition, error)

GetRule retrieves a rule configuration from NATS KV.

func (*ConfigManager) InitializeKVStore

func (rcm *ConfigManager) InitializeKVStore(natsClient *natsclient.Client) error

InitializeKVStore initializes the KVStore for direct KV operations

func (*ConfigManager) ListRules

func (rcm *ConfigManager) ListRules(ctx context.Context) (map[string]Definition, error)

ListRules returns all rule configurations from the KV store.

Prefers direct KV reads when a kvStore is wired (consistent with SaveRule/GetRule/DeleteRule). Falls back to the processor's runtime config for deployments that haven't called InitializeKVStore — kept so the existing runtime-config-only path still works. Callers that need full Definition data should ensure kvStore is initialised; the fallback returns a stub Definition carrying only ID/Type/Name/Enabled.

func (*ConfigManager) ReconcileCount

func (rcm *ConfigManager) ReconcileCount() int

ReconcileCount returns the total number of times reconcileFromKV has completed successfully. Intended for integration tests that verify debounce coalescing. For tests only.

func (*ConfigManager) SaveRule

func (rcm *ConfigManager) SaveRule(ctx context.Context, ruleID string, ruleDef Definition) error

SaveRule saves a rule configuration to NATS KV.

func (*ConfigManager) SeedFromRuntime

func (rcm *ConfigManager) SeedFromRuntime(ctx context.Context) error

SeedFromRuntime writes file-loaded rules from the running processor into KV idempotently. Uses Create (not Put) so operator edits already in KV are never overwritten — a key-already-exists error is treated as a no-op.

Reads from rp.ruleDefinitions (populated by loadRules/Initialize) since rp.ruleConfigs (from GetRuntimeConfig) is only populated by ApplyConfigUpdate, not by the initial file/inline load path.

Partial failures are logged but do not abort seeding. The method returns nil even when individual writes fail so that hot-reload startup is not blocked.

func (*ConfigManager) Start

func (rcm *ConfigManager) Start(_ context.Context) error

Start begins watching for rule configuration updates.

When processor is nil (Pattern-B CRUD path), Start is a no-op so that cmd/semstreams/main.go can safely call it without wiring a watcher.

When processor is non-nil (component-internal path), Start:

  1. Seeds file-loaded rules into KV idempotently via SeedFromRuntime.
  2. Opens a KV watcher on "rules.*".
  3. Spawns a goroutine that debounces watcher events and calls reconcileFromKV on each coalesced burst.

func (*ConfigManager) Stop

func (rcm *ConfigManager) Stop() error

Stop stops the configuration manager and waits for the watcher goroutine to exit cleanly.

func (*ConfigManager) WatchRules

func (rcm *ConfigManager) WatchRules(_ context.Context, _ func(ruleID string, rule Rule, operation string)) error

WatchRules watches for rule changes and returns active rules

type CronRule

type CronRule struct {
	// contains filtered or unexported fields
}

CronRule holds a parsed cron-rule definition ready for the scheduler to register and fire. It carries everything the scheduler needs at fire time without re-reading the source Definition.

Fields are unexported behind accessor methods so the scheduler and metrics layer can hold a CronRule reference without coupling to its internals — important for the future per-tenant fan-out path which will need to build per-iteration Actions copies without mutating the shared CronRule.

func NewCronRule

func NewCronRule(def Definition) (*CronRule, error)

NewCronRule builds a CronRule from a Definition. Returns an error if any cron-specific validation fails: wrong Type, missing schedule, schedule fails to parse, cooldown unparseable, forbidden field present, or any action with an empty Type.

func (*CronRule) Actions

func (r *CronRule) Actions() []Action

Actions returns a clone of the action list to execute on each fire. The clone protects callers (per-iteration fan-out, in particular) from accidentally mutating the shared CronRule's slice across fires.

func (*CronRule) Cooldown

func (r *CronRule) Cooldown() time.Duration

Cooldown returns the duration after which a fire may run again. Zero means no cooldown.

func (*CronRule) Description

func (r *CronRule) Description() string

Description returns the rule's description string.

func (*CronRule) Enabled

func (r *CronRule) Enabled() bool

Enabled reports whether the rule should be registered with the scheduler.

func (*CronRule) FireEveryN

func (r *CronRule) FireEveryN() int

FireEveryN returns the action-gating factor (every Nth tick fires actions). Zero or one means "fire every tick."

func (*CronRule) ID

func (r *CronRule) ID() string

ID returns the rule's stable identifier.

func (*CronRule) Metadata

func (r *CronRule) Metadata() map[string]any

Metadata returns the rule's metadata map, or nil if none was supplied. Useful for carrying provenance (e.g. operating-model entry IDs) from the source Definition through to action substitution.

func (*CronRule) Name

func (r *CronRule) Name() string

Name returns the human-readable rule name.

func (*CronRule) Schedule

func (r *CronRule) Schedule() cronlib.Schedule

Schedule returns the parsed cron schedule. Used by the scheduler to compute next-fire times and by the missed-fire detector to compute expected fire intervals.

func (*CronRule) ScheduleString

func (r *CronRule) ScheduleString() string

ScheduleString returns the original cron expression, useful for metrics labels and substitution variables.

type CronScheduler

type CronScheduler struct {
	// contains filtered or unexported fields
}

CronScheduler dispatches CronRule actions on a cron schedule. Methods are safe for concurrent use; the entries map is guarded by mu.

func NewCronScheduler

func NewCronScheduler(cfg CronSchedulerConfig) (*CronScheduler, error)

NewCronScheduler builds a scheduler that will dispatch actions through cfg.Executor. Returns an error only when cfg.Executor is nil — the other fields are all optional with documented degraded-mode behaviour.

func (*CronScheduler) Deregister

func (s *CronScheduler) Deregister(ruleID string)

Deregister removes a cron rule from the scheduler. It is a no-op when the rule isn't registered, so hot-reload can call it unconditionally.

func (*CronScheduler) Register

func (s *CronScheduler) Register(rule *CronRule) error

Register schedules a CronRule for periodic firing. Disabled rules are skipped silently (logged at Debug). Returns an error if the rule is already registered — callers performing hot-reload should call Deregister first.

func (*CronScheduler) RegisteredCount

func (s *CronScheduler) RegisteredCount() int

RegisteredCount returns the number of rules currently registered. Used by the processor for logging and (Chunk 5) the registered-rules gauge.

func (*CronScheduler) Start

func (s *CronScheduler) Start(ctx context.Context) error

Start kicks off the scheduler ticker. The supplied context is captured and passed to every subsequent fire callback so action dispatches inherit the processor's cancellation. Calling Start twice is an error; the scheduler is single-shot per Start/Stop cycle.

Before the ticker starts, Start runs a one-shot restoreFromTracker pass against the persisted last-fired records (when a tracker is configured). It seeds each entry's in-memory lastFiredNanos cache so the cooldown gate and `$schedule.last_fired_at` substitution behave correctly across restarts, and it logs a Warn for any rule whose schedule expected at least one fire between the persisted timestamp and now (log-only per ADR-031 product direction). Tracker failures are logged but do not block startup; a clean cron tick is more important than a perfect audit log.

func (*CronScheduler) Stop

func (s *CronScheduler) Stop() context.Context

Stop signals the scheduler to halt. It returns the context returned by robfig's Cron.Stop, which closes when all in-flight fires have completed. Callers should select on the returned context with their shutdown deadline. Calling Stop on a never-started scheduler is safe.

type CronSchedulerConfig

type CronSchedulerConfig struct {
	// Executor is required. A nil executor is rejected because every
	// fire would silently no-op, hiding misconfiguration.
	Executor ActionExecutorInterface

	// Tracker is optional. Pass nil to run without persistence — no
	// cross-restart missed-fire detection, no cooldown hydration on
	// startup. Tests typically pass nil; the production processor
	// wires in a tracker bound to the RULE_SCHEDULES bucket.
	Tracker *ScheduleTracker

	// Metrics is optional. Pass nil to run without Prometheus
	// observability — every recordX call short-circuits on the nil
	// receiver. Tests that don't assert on metrics pass nil; the
	// production processor wires in metrics scoped to its registry.
	Metrics *cronMetrics

	// Logger is optional. Defaults to slog.Default() when nil so the
	// scheduler always has something to log to.
	Logger *slog.Logger
}

CronSchedulerConfig groups the collaborators NewCronScheduler needs. Constructed by initializeCronScheduler in production; tests build it inline with whichever fields they exercise. Following the team's "4+ args → request struct" convention so future additions (timeouts, custom parsers, observers) don't grow a positional arg list.

type Definition

type Definition struct {
	ID              string                           `json:"id"`
	Type            string                           `json:"type"`
	Name            string                           `json:"name"`
	Description     string                           `json:"description"`
	Enabled         bool                             `json:"enabled"`
	Conditions      []expression.ConditionExpression `json:"conditions"`
	Logic           string                           `json:"logic"`
	Cooldown        string                           `json:"cooldown,omitempty"`
	Entity          EntityConfig                     `json:"entity,omitempty"`
	Metadata        map[string]interface{}           `json:"metadata,omitempty"`
	OnEnter         []Action                         `json:"on_enter,omitempty"`         // Fires on false→true transition
	OnExit          []Action                         `json:"on_exit,omitempty"`          // Fires on true→false transition
	WhileTrue       []Action                         `json:"while_true,omitempty"`       // Fires on every update while true
	OnRecovery      []Action                         `json:"on_recovery,omitempty"`      // Fires once on restart if still matching; falls back to OnEnter
	RelatedPatterns []string                         `json:"related_patterns,omitempty"` // For pair rules

	// RerunOnRecovery opts rules with only OnEnter/WhileTrue defined into the
	// bootstrap recovery path. When false (default), such rules must declare
	// OnRecovery explicitly to re-fire on restart. Rules that already declare
	// OnRecovery always participate regardless of this flag.
	RerunOnRecovery bool `json:"rerun_on_recovery,omitempty"`

	// MaxIterations limits how many times a rule can enter the matching state for an entity.
	// 0 means no limit. Use $state.iteration and $state.max_iterations in When clauses
	// for retry budgets (e.g., retry up to 3 times then escalate).
	MaxIterations int `json:"max_iterations,omitempty"`

	// FireEveryNEvents makes the rule fire its action only every Nth matching
	// event. N=0 and N=1 both mean "fire on every match" (backward-compatible
	// default). N=2 fires on the 2nd, 4th, 6th matching events; N=10 fires on
	// the 10th, 20th, 30th, etc.
	//
	// The MATCH check still runs on every event — only the action is gated.
	// Counter state is per-rule (not per-entity) and resets when the rule
	// definition is replaced via applyRuleChanges, so a policy change begins
	// with a fresh rhythm.
	//
	// This is a parallel dimension to time-based AlertCooldownPeriod; neither
	// replaces the other.
	FireEveryNEvents int `json:"fire_every_n_events,omitempty"`

	// Schedule is the cron expression for `type: "cron"` rules. Required
	// when Type == "cron"; ignored otherwise. Supports POSIX 5-field
	// (`min hour dom mon dow`) and the descriptors @hourly / @daily /
	// @weekly / @monthly / @yearly. Parsed via robfig/cron/v3 at config
	// load time; bad expressions fail rule construction.
	Schedule string `json:"schedule,omitempty"`

	// Actions is the action list for `type: "cron"` rules. Cron rules use
	// this field instead of OnEnter/OnExit/WhileTrue because they have no
	// state-transition semantics — every scheduled tick fires every Action
	// here. Required when Type == "cron"; ignored otherwise.
	Actions []Action `json:"actions,omitempty"`
}

Definition represents a JSON rule configuration

type Dependencies

type Dependencies struct {
	NATSClient *natsclient.Client
	Logger     *slog.Logger
}

Dependencies provides dependencies for rule creation

type EntityConfig

type EntityConfig struct {
	Pattern      string   `json:"pattern"`       // Entity ID pattern to match
	WatchBuckets []string `json:"watch_buckets"` // KV buckets to watch
}

EntityConfig defines entity-specific configuration

type EntityStateEvaluator

type EntityStateEvaluator interface {
	// EvaluateEntityState evaluates the rule directly against EntityState triples.
	// Rule conditions should use full predicate paths (e.g., "sensor.measurement.fahrenheit").
	EvaluateEntityState(entityState *gtypes.EntityState) bool
}

EntityStateEvaluator is an optional interface for rules that can evaluate directly against EntityState triples, bypassing the message transformation layer. This is more efficient for rules that need to access triple predicates directly.

type Evaluation

type Evaluation struct {
	Rule              Definition
	EntityID          string
	RelatedID         string // "" for single-entity rules
	CurrentlyMatching bool
	Entity            *gtypes.EntityState // nil for message-path rules
	Related           *gtypes.EntityState // nil for single-entity or message-path
	Revision          uint64              // KV revision that triggered this evaluation; 0 if unknown
	Bootstrap         bool                // true during watcher initial-state replay
}

Evaluation groups the inputs to a single stateful rule evaluation. Zero values are meaningful: Revision=0 means "no KV revision available" (message-path rules), Bootstrap=false means live traffic (not startup replay), Entity/Related=nil means message-path rule.

type Event

type Event interface {
	// EventType returns the type identifier for this event
	EventType() string

	// Subject returns the NATS subject for publishing this event
	Subject() string

	// Payload returns the event data as a generic map
	Payload() map[string]any

	// Validate checks if the event is valid and ready to publish
	Validate() error
}

Event represents a generic event that can be published by rules. This interface allows rules to be decoupled from specific event types while still integrating with graph events when EnableGraphIntegration is enabled.

type Example

type Example struct {
	Name        string     `json:"name"`
	Description string     `json:"description"`
	Config      Definition `json:"config"`
}

Example provides example configurations

type ExecutionContext

type ExecutionContext struct {
	// EntityID is the primary entity identifier.
	EntityID string

	// RelatedID is the related entity identifier (empty for single-entity rules).
	RelatedID string

	// Entity is the full entity state with triples (nil for message-path rules).
	Entity *gtypes.EntityState

	// Related is the related entity state (nil for single-entity rules or message-path).
	Related *gtypes.EntityState

	// State is the current match state including iteration tracking.
	// May be nil for first evaluation before state is persisted.
	State *MatchState

	// Schedule carries cron-rule context (rule ID, spec, prior fire
	// timestamp). Populated by CronScheduler.fire on time-driven
	// dispatches; nil for message-path and KV-watch rules. The
	// `$schedule.*` substitution layer reads this; see
	// cron_substitution.go for the namespace contract.
	Schedule *ScheduleContext
}

ExecutionContext carries typed data through the rule evaluation → action pipeline. It replaces the previous (entityID, relatedID string) action signature, providing actions with the full entity state and match state for richer execution logic.

func (*ExecutionContext) RuleID

func (ec *ExecutionContext) RuleID() string

RuleID returns the originating rule's identifier, or an empty string if the execution context has no associated match state (e.g. message-path actions invoked outside a stateful evaluator). Callers use this to scope feedback loop prevention to the rule that caused the write.

func (*ExecutionContext) SubstituteVariables

func (ec *ExecutionContext) SubstituteVariables(template string) string

SubstituteVariables replaces template variables with values from the execution context. Supported variables:

  • $now: Current wallclock as RFC3339 UTC (always available)
  • $entity.id: The primary entity ID
  • $related.id: The related entity ID (for pair rules)
  • $state.iteration: Current iteration count
  • $state.max_iterations: Configured max iterations
  • $schedule.id: Cron rule ID (cron rules only)
  • $schedule.spec: Cron expression (cron rules only)
  • $schedule.last_fired_at: Prior fire timestamp, RFC3339 UTC; empty on first fire

Entity triple values can be accessed via $entity.triple.<predicate> syntax.

If any template variable survives substitution (e.g. $entity.triple.X where X isn't on the entity at fire time — a common race with late-arriving triples, or a cron-only $schedule.* token reaching an expression rule), the literal stays in the output and a warning is logged so the author sees the silent-pass. Downstream callers that feed the result into an identifier (KV key, NATS subject) will then get a loud failure instead of mysteriously wrong behaviour.

type ExpressionRule

type ExpressionRule struct {
	// contains filtered or unexported fields
}

ExpressionRule implements Rule interface for expression-based condition evaluation

func NewExpressionRule

func NewExpressionRule(def Definition) (*ExpressionRule, error)

NewExpressionRule creates a new expression-based rule

func (*ExpressionRule) Evaluate

func (r *ExpressionRule) Evaluate(messages []message.Message) bool

Evaluate evaluates the rule against messages

func (*ExpressionRule) EvaluateEntityState

func (r *ExpressionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool

EvaluateEntityState evaluates the rule directly against EntityState triples. This bypasses the message transformation layer and evaluates conditions directly against triple predicates (e.g., "sensor.measurement.fahrenheit").

func (*ExpressionRule) ExecuteEvents

func (r *ExpressionRule) ExecuteEvents(messages []message.Message) ([]Event, error)

ExecuteEvents generates events when rule triggers

func (*ExpressionRule) Name

func (r *ExpressionRule) Name() string

Name returns the rule name

func (*ExpressionRule) Subscribe

func (r *ExpressionRule) Subscribe() []string

Subscribe returns subjects this rule subscribes to

type ExpressionRuleFactory

type ExpressionRuleFactory struct {
	// contains filtered or unexported fields
}

ExpressionRuleFactory creates expression-based rules

func NewExpressionRuleFactory

func NewExpressionRuleFactory() *ExpressionRuleFactory

NewExpressionRuleFactory creates a new expression rule factory

func (*ExpressionRuleFactory) Create

Create creates an expression rule from definition

func (*ExpressionRuleFactory) Schema

func (f *ExpressionRuleFactory) Schema() Schema

Schema returns the expression rule schema

func (*ExpressionRuleFactory) Type

func (f *ExpressionRuleFactory) Type() string

Type returns the factory type

func (*ExpressionRuleFactory) Validate

func (f *ExpressionRuleFactory) Validate(def Definition) error

Validate validates the rule definition

type Factory

type Factory interface {
	// Create creates a rule instance from configuration
	Create(id string, config Definition, deps Dependencies) (Rule, error)

	// Type returns the rule type this factory creates
	Type() string

	// Schema returns the configuration schema for UI discovery
	Schema() Schema

	// Validate validates a rule configuration
	Validate(config Definition) error
}

Factory creates rules from configuration

func GetRuleFactory

func GetRuleFactory(ruleType string) (Factory, bool)

GetRuleFactory returns a registered rule factory

type KVTestHelper

type KVTestHelper struct {
	// contains filtered or unexported fields
}

KVTestHelper provides utilities for KV-based rule testing

func NewKVTestHelper

func NewKVTestHelper(t *testing.T, bucket jetstream.KeyValue) *KVTestHelper

NewKVTestHelper creates a new KV test helper

func (*KVTestHelper) DeleteEntity

func (h *KVTestHelper) DeleteEntity(entityID string)

DeleteEntity removes an entity from the KV bucket

func (*KVTestHelper) UpdateEntityProperty

func (h *KVTestHelper) UpdateEntityProperty(entityID string, predicateFull string, value interface{})

UpdateEntityProperty updates a specific property of an entity

func (*KVTestHelper) WriteEntityState

func (h *KVTestHelper) WriteEntityState(entity *gtypes.EntityState)

WriteEntityState writes an entity state to the KV bucket

type KVWriter

type KVWriter interface {
	// UpdateJSON performs a CAS read-modify-write on a JSON document in the named bucket.
	// The updateFn receives the current value (empty map if key doesn't exist) and mutates it.
	// Uses exponential backoff retry on CAS conflicts (via natsclient.KVStore.UpdateJSON).
	UpdateJSON(ctx context.Context, bucket, key string, updateFn func(current map[string]any) error) error

	// PutJSON writes a JSON value to the named bucket (last writer wins, no CAS).
	PutJSON(ctx context.Context, bucket, key string, value map[string]any) error
}

KVWriter performs read-modify-write operations on domain KV buckets. It abstracts the underlying NATS KV infrastructure for testability.

type LastFireRecord

type LastFireRecord struct {
	// RuleID is the cron rule's stable identifier. Duplicates the KV key
	// so that List() consumers can decode a record without re-threading
	// the key alongside it.
	RuleID string `json:"rule_id"`

	// ScheduleSpec is the rule's cron expression at the time of the
	// recorded fire. Useful for operators reading the bucket directly
	// and for missed-fire detection that wants to compute the expected
	// fire interval without re-loading rule definitions.
	ScheduleSpec string `json:"schedule_spec"`

	// LastFiredAt is the wallclock at which the most recent fire began
	// dispatching actions. UTC; serialised RFC3339Nano via Go's default
	// time.Time JSON encoder. Nanosecond precision is preserved best-
	// effort across platforms — callers that depend on exact equality
	// (e.g. round-trip tests) should truncate to a known resolution
	// before write rather than relying on the encoder.
	LastFiredAt time.Time `json:"last_fired_at"`
}

LastFireRecord is the on-disk shape of a per-rule last-fired record. JSON field names are part of the framework's public contract — out-of- process readers depend on them; renames require a migration.

type MatchState

type MatchState struct {
	RuleID         string    `json:"rule_id"`
	EntityKey      string    `json:"entity_key"`
	IsMatching     bool      `json:"is_matching"`
	LastTransition string    `json:"last_transition"` // ""|"entered"|"exited"
	TransitionAt   time.Time `json:"transition_at,omitempty"`
	SourceRevision uint64    `json:"source_revision"`
	LastChecked    time.Time `json:"last_checked"`

	// Iteration tracks how many times this rule has entered the matching state
	// for this entity. Incremented on each TransitionEntered, preserved through exits.
	Iteration int `json:"iteration"`

	// MaxIterations is the configured limit from the rule Definition.
	// 0 means no limit. Used with $state.iteration in When clauses for retry budgets.
	MaxIterations int `json:"max_iterations,omitempty"`

	// FieldValues tracks the last-seen values of fields used in transition conditions.
	// Keys are field names (e.g., "workflow.plan.status"), values are the string
	// representation of the last observed value. Used by the transition operator
	// to detect field-level state changes across evaluations.
	FieldValues map[string]string `json:"field_values,omitempty"`
}

MatchState tracks whether a rule's condition is currently matching for a specific entity or entity pair.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds Prometheus metrics for RuleProcessor component

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a component that processes messages through rules

func NewProcessor

func NewProcessor(natsClient *natsclient.Client, config *Config) (*Processor, error)

NewProcessor creates a new rule processor

func NewProcessorWithMetrics

func NewProcessorWithMetrics(natsClient *natsclient.Client, config *Config, metricsRegistry *metric.MetricsRegistry) (*Processor, error)

NewProcessorWithMetrics creates a new rule processor with optional metrics

func (*Processor) ApplyConfigUpdate

func (rp *Processor) ApplyConfigUpdate(changes map[string]any) error

ApplyConfigUpdate applies validated configuration changes

func (*Processor) ConfigSchema

func (rp *Processor) ConfigSchema() component.ConfigSchema

ConfigSchema returns configuration schema for component interface

func (*Processor) DataFlow

func (rp *Processor) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Processor) DebugStatus

func (rp *Processor) DebugStatus() any

DebugStatus returns extended debug information for the rule processor. Implements component.DebugStatusProvider.

func (*Processor) GetRuleMetrics

func (rp *Processor) GetRuleMetrics() map[string]any

GetRuleMetrics returns metrics for all rules

func (*Processor) GetRuntimeConfig

func (rp *Processor) GetRuntimeConfig() map[string]any

GetRuntimeConfig returns current runtime configuration

func (*Processor) Health

func (rp *Processor) Health() component.HealthStatus

Health returns current health status

func (*Processor) Initialize

func (rp *Processor) Initialize() error

Initialize loads rules and prepares the processor

func (*Processor) InputPorts

func (rp *Processor) InputPorts() []component.Port

InputPorts returns declared input ports

func (*Processor) Meta

func (rp *Processor) Meta() component.Metadata

Meta returns component metadata

func (*Processor) OutputPorts

func (rp *Processor) OutputPorts() []component.Port

OutputPorts returns declared output ports

func (*Processor) SetDecoder

func (rp *Processor) SetDecoder(d *message.Decoder)

SetDecoder installs the payload Decoder used to unmarshal incoming BaseMessage envelopes. Called by the component factory after construction with message.NewDecoder(deps.PayloadRegistry). Mirrors SetToolRegistry. A nil arg leaves the processor unable to handle semantic messages — handleSemanticMessage will fail-fast.

func (*Processor) SetToolRegistry

func (rp *Processor) SetToolRegistry(r component.ToolRegistryReader)

SetToolRegistry installs the shared tool registry. Called by the component factory after construction with deps.ToolRegistry. The registry flows from here to ActionExecutor at Initialize time so publish_agent's default_tools resolution sees the right tools.

A nil arg is allowed and disables tool name resolution (deployments without agentic-tools).

func (*Processor) Start

func (rp *Processor) Start(ctx context.Context) error

Start begins processing messages through rules

func (*Processor) Stop

func (rp *Processor) Stop(_ time.Duration) error

Stop stops the processor and cleans up resources

func (*Processor) UpdateWatchBuckets

func (rp *Processor) UpdateWatchBuckets(newBuckets map[string][]string) error

UpdateWatchBuckets dynamically updates the entity watch buckets and patterns.

func (*Processor) UpdateWatchPatterns

func (rp *Processor) UpdateWatchPatterns(newPatterns []string) error

UpdateWatchPatterns dynamically updates the entity watch patterns for ENTITY_STATES. DEPRECATED: Use UpdateWatchBuckets for multi-bucket support.

func (*Processor) ValidateConfigUpdate

func (rp *Processor) ValidateConfigUpdate(changes map[string]any) error

ValidateConfigUpdate validates proposed configuration changes

type Publisher

type Publisher interface {
	// Publish sends a message to a NATS subject.
	// The implementation determines whether to use core NATS or JetStream
	// based on port configuration.
	Publish(ctx context.Context, subject string, data []byte) error
}

Publisher handles publishing messages to NATS subjects. It abstracts the decision between core NATS and JetStream publishing.

type Rule

type Rule interface {
	// Name returns the human-readable name of this rule
	Name() string

	// Subscribe returns the NATS subjects this rule should listen to
	Subscribe() []string

	// Evaluate checks if the rule conditions are met for the given messages
	Evaluate(messages []message.Message) bool

	// ExecuteEvents generates events when rule conditions are satisfied
	ExecuteEvents(messages []message.Message) ([]Event, error)
}

Rule interface defines the event-based contract for processing rules. Rules evaluate messages and generate events when conditions are met.

func CreateRuleFromDefinition

func CreateRuleFromDefinition(def Definition, deps Dependencies) (Rule, error)

CreateRuleFromDefinition creates a rule using the appropriate factory

type ScheduleContext

type ScheduleContext struct {
	// ID is the firing rule's stable identifier — same value as
	// Definition.ID and CronRule.ID. Always non-empty for a cron fire.
	ID string

	// Spec is the cron expression as the rule was registered with it.
	// Captured at fire time so a hot-reload that changes the schedule
	// doesn't retroactively alter substitution output for in-flight
	// dispatches. Always non-empty for a cron fire.
	Spec string

	// LastFiredAt is the wallclock of the most recent successful fire
	// before this one. Zero value (time.IsZero) on the first fire of
	// a freshly-registered rule, before any persisted record exists.
	// The substitution renders zero as the empty string so authors can
	// detect first-fire via `len("$schedule.last_fired_at") == 0` in
	// downstream conditional templates.
	LastFiredAt time.Time
}

ScheduleContext carries the cron-rule context that's available at fire time but not derivable from any other rule field. ExecutionContext holds a *ScheduleContext on cron-fire dispatches; expression-rule dispatches leave the field nil and the substitution is a no-op.

Fields are exported because the substitution layer reads them directly; the type is constructed by the cron scheduler in fire() and never mutated thereafter.

type ScheduleTracker

type ScheduleTracker struct {
	// contains filtered or unexported fields
}

ScheduleTracker persists last-fired timestamps for cron rules to KV. Methods are safe for concurrent use; the underlying NATS KV client already serialises writes.

func NewScheduleTracker

func NewScheduleTracker(bucket jetstream.KeyValue, logger *slog.Logger) *ScheduleTracker

NewScheduleTracker builds a tracker bound to the supplied bucket. A nil bucket is permitted so the rule processor can degrade gracefully when bucket creation fails on startup — every method returns an error when bucket is nil rather than panicking, mirroring StateTracker's nil-tolerant behaviour.

func (*ScheduleTracker) Bucket

func (st *ScheduleTracker) Bucket() jetstream.KeyValue

Bucket returns the underlying jetstream.KeyValue handle. Exposed so in-process callers (governance startup hooks, ops dashboards in the same binary) can use the full KV API — Watch for live updates, ListKeys for filtered scans — without re-resolving the bucket via js.KeyValue. The returned handle may be nil if the tracker was constructed in nil-bucket mode (degraded startup); callers must nil-check.

func (*ScheduleTracker) Delete

func (st *ScheduleTracker) Delete(ctx context.Context, ruleID string) error

Delete removes the persisted record for a rule. Called when a rule is removed from configuration so stale records don't linger. Missing records are treated as success — Delete is idempotent.

func (*ScheduleTracker) LastFiredAt

func (st *ScheduleTracker) LastFiredAt(ctx context.Context, ruleID string) (LastFireRecord, error)

LastFiredAt returns the most recent fire timestamp for the given rule. Returns ErrScheduleRecordNotFound when no fire has been recorded — the caller should treat that as "never fired", not as an error to surface.

The schedule spec stored alongside the timestamp is returned too so missed-fire detection can compute the expected interval without re-loading the live rule definition (which may have been hot-reloaded to a different spec since the last fire).

func (*ScheduleTracker) List

func (st *ScheduleTracker) List(ctx context.Context) ([]LastFireRecord, error)

List returns every persisted last-fire record. Used by the scheduler on Start to detect missed fires and by operators inspecting bucket state. Records are returned in undefined order; callers that need a stable order should sort by RuleID.

A bucket with no records returns an empty slice and no error.

func (*ScheduleTracker) RecordFire

func (st *ScheduleTracker) RecordFire(ctx context.Context, ruleID, scheduleSpec string, firedAt time.Time) error

RecordFire upserts the last-fired record for the given rule. Called by CronScheduler.fire after a successful action dispatch. The schedule spec is captured at the moment of the fire so a hot-reload that changes the cron expression doesn't retroactively rewrite history.

Errors are surfaced to the caller; the scheduler logs them as warnings (a transient KV write failure must not crash the scheduler goroutine — the next fire will overwrite the record).

type Schema

type Schema struct {
	Type        string                              `json:"type"`
	DisplayName string                              `json:"display_name"`
	Description string                              `json:"description"`
	Category    string                              `json:"category"`
	Icon        string                              `json:"icon,omitempty"`
	Properties  map[string]component.PropertySchema `json:"properties"`
	Required    []string                            `json:"required"`
	Examples    []Example                           `json:"examples,omitempty"`
}

Schema describes the configuration schema for a rule type

type StateTracker

type StateTracker struct {
	// contains filtered or unexported fields
}

StateTracker manages rule match state persistence in NATS KV. It tracks which rules are currently matching for which entities to enable proper transition detection for stateful ECA rules.

func NewStateTracker

func NewStateTracker(bucket jetstream.KeyValue, logger *slog.Logger) *StateTracker

NewStateTracker creates a new StateTracker with the given KV bucket.

func (*StateTracker) Delete

func (st *StateTracker) Delete(ctx context.Context, ruleID, entityKey string) error

Delete removes the match state for a rule and entity key.

func (*StateTracker) DeleteAllForEntity

func (st *StateTracker) DeleteAllForEntity(ctx context.Context, entityID string) error

DeleteAllForEntity removes all rule states associated with an entity. This is called when an entity is deleted to clean up orphaned state.

func (*StateTracker) Get

func (st *StateTracker) Get(ctx context.Context, ruleID, entityKey string) (MatchState, error)

Get retrieves the current match state for a rule and entity key. Returns ErrStateNotFound if no state exists for this combination.

func (*StateTracker) Set

func (st *StateTracker) Set(ctx context.Context, state MatchState) error

Set persists the match state for a rule and entity.

type StatefulEvaluator

type StatefulEvaluator struct {
	// contains filtered or unexported fields
}

StatefulEvaluator handles rule evaluation with state tracking. It manages state transitions (entered/exited/none) and executes the appropriate actions based on the transition type.

func NewStatefulEvaluator

func NewStatefulEvaluator(stateTracker *StateTracker, actionExecutor ActionExecutorInterface, logger *slog.Logger) *StatefulEvaluator

NewStatefulEvaluator creates a new stateful evaluator with the given dependencies. If logger is nil, uses the default logger.

func (*StatefulEvaluator) Evaluate

func (e *StatefulEvaluator) Evaluate(ctx context.Context, ev Evaluation) (Transition, error)

Evaluate runs a stateful rule evaluation and fires the appropriate actions for the detected transition:

  • TransitionEntered → OnEnter (increments iteration)
  • TransitionExited → OnExit (preserves iteration)
  • TransitionNone+match → WhileTrue

Action-level When clauses are evaluated before each action runs. New state is persisted to the StateTracker on the way out.

When Evaluation.Bootstrap is true, the watcher is replaying state after a restart. A persisted IsMatching=true state that still matches would otherwise produce TransitionNone — we promote it to a synthetic Entered (fires OnRecovery, or OnEnter under RerunOnRecovery) so state machines resume instead of hanging on restart.

type Status

type Status struct {
	DebounceDelayMs    int       `json:"debounce_delay_ms"`
	PendingEvaluations int       `json:"pending_evaluations"`
	TotalEvaluations   int       `json:"total_evaluations"`
	TotalTriggers      int       `json:"total_triggers"`
	DebouncedCount     int       `json:"debounced_count"` // Matches Tester's test expectations
	RulesLoaded        int       `json:"rules_loaded"`
	LastEvaluationTime time.Time `json:"last_evaluation_time,omitempty"`

	// TrackedRevisions is the total number of (rule,entity,revision) tuples
	// currently in the feedback-loop-prevention map. Monitor this to detect
	// leaks from writes that the watcher never delivers — a healthy
	// processor's count should stay roughly proportional to in-flight rule
	// actions, bounded by the sweeper.
	TrackedRevisions int `json:"tracked_revisions"`
}

Status represents the current status of rule evaluation for debug observability

type TestRule

type TestRule struct {
	// contains filtered or unexported fields
}

TestRule is a functional rule implementation for testing

func NewTestRule

func NewTestRule(id, name string, subjects []string, conditions []expression.ConditionExpression) *TestRule

NewTestRule creates a new test rule

func (*TestRule) Evaluate

func (r *TestRule) Evaluate(messages []message.Message) bool

Evaluate evaluates the rule against messages

func (*TestRule) EvaluateEntityState

func (r *TestRule) EvaluateEntityState(entityState *gtypes.EntityState) bool

EvaluateEntityState evaluates the rule directly against EntityState triples. This implements the EntityStateEvaluator interface for KV watch-based evaluation.

func (*TestRule) ExecuteEvents

func (r *TestRule) ExecuteEvents(messages []message.Message) ([]Event, error)

ExecuteEvents generates events when rule triggers

func (*TestRule) Name

func (r *TestRule) Name() string

Name returns the rule name

func (*TestRule) Subscribe

func (r *TestRule) Subscribe() []string

Subscribe returns subjects this rule subscribes to

type TestRuleFactory

type TestRuleFactory struct {
	// contains filtered or unexported fields
}

TestRuleFactory creates test rules for integration testing

func NewTestRuleFactory

func NewTestRuleFactory() *TestRuleFactory

NewTestRuleFactory creates a new test rule factory

func (*TestRuleFactory) Create

func (f *TestRuleFactory) Create(ruleID string, def Definition, _ Dependencies) (Rule, error)

Create creates a test rule from definition

func (*TestRuleFactory) Examples

func (f *TestRuleFactory) Examples() []Example

Examples returns test rule examples

func (*TestRuleFactory) Schema

func (f *TestRuleFactory) Schema() Schema

Schema returns the test rule schema

func (*TestRuleFactory) Type

func (f *TestRuleFactory) Type() string

Type returns the factory type

func (*TestRuleFactory) Validate

func (f *TestRuleFactory) Validate(_ Definition) error

Validate validates the rule definition

type Transition

type Transition string

Transition represents the type of state change detected

const (
	// TransitionNone indicates no state change
	TransitionNone Transition = ""
	// TransitionEntered indicates condition became true
	TransitionEntered Transition = "entered"
	// TransitionExited indicates condition became false
	TransitionExited Transition = "exited"
)

func DetectTransition

func DetectTransition(wasMatching, nowMatching bool) Transition

DetectTransition compares previous and current match state to determine transition type. Returns TransitionEntered if condition changed from false to true. Returns TransitionExited if condition changed from true to false. Returns TransitionNone if state did not change.

type TripleMutator

type TripleMutator interface {
	// AddTriple adds a triple via NATS request/response and returns the KV revision.
	AddTriple(ctx context.Context, ruleID string, triple message.Triple) (uint64, error)
	// RemoveTriple removes a triple via NATS request/response and returns the KV revision.
	RemoveTriple(ctx context.Context, ruleID, subject, predicate string) (uint64, error)
}

TripleMutator handles triple mutations via NATS request/response. The returned uint64 is the KV revision after the write, used for per-rule feedback loop prevention. The ruleID identifies the originating rule so the revision can be scoped to that rule only — pass an empty ruleID for ad-hoc mutations that should not be tracked.

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError represents a payload validation error.

func (*ValidationError) Error

func (e *ValidationError) Error() string

type WorkflowTriggerPayload

type WorkflowTriggerPayload struct {
	WorkflowID  string         `json:"workflow_id"`
	EntityID    string         `json:"entity_id"`
	TriggeredAt time.Time      `json:"triggered_at"`
	RelatedID   string         `json:"related_id,omitempty"`
	Context     map[string]any `json:"context,omitempty"`
}

WorkflowTriggerPayload represents a message sent by the rule processor to trigger a reactive workflow. This payload is wrapped in a BaseMessage for proper deserialization by the reactive workflow engine.

func (*WorkflowTriggerPayload) MarshalJSON

func (p *WorkflowTriggerPayload) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler. This marshals just the payload fields - BaseMessage handles the wrapping.

func (*WorkflowTriggerPayload) Schema

func (p *WorkflowTriggerPayload) Schema() message.Type

Schema returns the message type for workflow trigger payloads.

func (*WorkflowTriggerPayload) UnmarshalJSON

func (p *WorkflowTriggerPayload) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

func (*WorkflowTriggerPayload) Validate

func (p *WorkflowTriggerPayload) Validate() error

Validate ensures the payload has required fields.

Directories

Path Synopsis
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
Package expression - Expression evaluator implementation
Package expression - Expression evaluator implementation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL