Documentation
¶
Overview ¶
Package rule - Action execution for ECA rules
Package rule - Prometheus metrics for the cron scheduler.
The cron scheduler exposes seven metrics under the `semstreams_cron_*` namespace. They mirror what an operator typically wants to see for a periodically-firing component:
- fires_total{rule_id, status} — counter; status is success / error / panic / cooldown_skipped. Lets ops see fire rates per rule and spot rules whose cron expression ticks faster than actions complete (cooldown_skipped grows).
- fire_duration_seconds{rule_id} — histogram of action-dispatch latency. Surfaces slow `publish_agent` rules that risk overlapping with the next tick before the cooldown gate triggers.
- registered — gauge; total registered cron rules. Flips on hot-reload; useful for confirming a config change took effect.
- missed_fires_total{rule_id} — counter; incremented once per missed fire detected on Start. Per ADR-031 the policy is log-only, so the metric tracks cumulative missed fires rather than auto-replay attempts.
- missed_fires_capped_total{rule_id} — counter; incremented when a rule's missed-fire count hits the detection cap (currently 100). Lets operators alert on long downtimes without grepping logs.
- next_fire_timestamp_seconds{rule_id} — gauge; Unix epoch seconds of the rule's next scheduled fire. Enables alerts like "no fire in last 2× period" by comparing scrape time against the gauge.
- scheduler_running — gauge; 1 when Start has been called and Stop hasn't, 0 otherwise. Catches the "processor still alive but scheduler crashed" pathology.
The constructor follows the routerMetrics pattern in agentic-dispatch: per-registry instances cached so multiple processors sharing a registry get the same collectors, plus a nil-registry singleton fallback for production deployments using the default Prometheus registerer. Both paths support nil-tolerant recordX helpers so callers don't have to nil-check before every observation.
Package rule - Cron rule type for time-driven actions.
Cron rules are a third firing path alongside KV-watch and message-path rules. Unlike expression rules, cron rules have no condition — they fire on a clock tick alone, dispatched by CronScheduler against the same ActionExecutor that expression rules use.
CronRule is intentionally NOT an implementation of the Rule interface. Subscribe / Evaluate / ExecuteEvents are message-driven concepts that don't fit time-driven firing. Keeping cron rules outside the Rule registry means they don't accidentally appear in evaluateRulesForMessage dispatch and don't need no-op stubs cluttering the type.
Package rule - Cron scheduler for time-driven rule firing.
CronScheduler is the third firing path of the rule processor, parallel to message-driven and KV-watch-driven evaluation. It wraps robfig/cron/v3.Cron and dispatches each registered CronRule's actions through the shared ActionExecutor at the times described by the rule's cron expression.
Lifecycle is owned by the rule processor:
- Processor.Start (via initializeCronScheduler) constructs the scheduler and registers the cron rules loaded by Initialize.
- Processor.run launches the scheduler with Start(ctx); robfig spawns its own internal goroutine that walks the schedule and dispatches each fire callback on its own goroutine.
- Processor.run defers Stop on shutdown, draining in-flight fires up to the shutdown grace period.
- Hot reload (applyRuleChanges) calls Register / Deregister under the processor's mu.Lock; robfig supports live add/remove without a scheduler restart.
The fire path is best-effort: per-rule panic-recover keeps one bad action from crashing the scheduler goroutine, action errors are logged and do not stop sibling actions in the same tick. This matches StatefulEvaluator behaviour so the operator's mental model is consistent across rule kinds.
Package rule - Cron-specific substitution context.
CronRule fires have no entity in scope at MVP — the scheduler ticks on a clock alone. Action templates therefore cannot use the `$entity.*` / `$related.*` / `$state.*` namespaces that expression rules populate from a matched entity. Instead, cron actions read schedule-shaped context: `$schedule.id`, `$schedule.spec`, `$schedule.last_fired_at`. Plus the existing `$now` token, which SubstituteVariables already handles for every rule kind.
Retrofit-safe namespace seam ¶
`$schedule.*` is intentionally disjoint from `$entity.*`. When the deferred per-entity fan-out extension lands (Definition.ForEach), each iteration of the fan-out gets the existing `$entity.*` namespace populated identically to expression-rule firing — the cron path supplies an EntityID, an Entity (or *EntityState fetched on demand), and a State; the substitution layer needs no changes. Adding `for_each` is purely additive.
A future `$trigger.*` namespace (planned for richer tick metadata — e.g. `$trigger.attempt`, `$trigger.scheduled_for`) would land here as a sibling shim. Do not overload `$schedule.*` to carry per-tick metadata; keeping spec-shaped data and tick-shaped data in separate namespaces preserves the door for that extension.
Package rule provides rule-based processing of semantic message streams with support for entity state watching and event generation.
Overview ¶
The rule processor evaluates conditions against streaming messages and entity state changes, generating graph events when rules trigger. Rules are defined using JSON configuration and implemented using the factory pattern.
Architecture ¶
Rules consist of three main components:
- Definition: JSON configuration (conditions, logic, metadata)
- Factory: Creates Rule instances from definitions
- Rule: Evaluates conditions and generates events
Rule Interface ¶
Rules must implement this interface:
type Rule interface {
Name() string
Subscribe() []string
Evaluate(messages []message.Message) bool
ExecuteEvents(messages []message.Message) ([]rtypes.Event, error)
}
Creating Custom Rules ¶
1. Create a RuleFactory:
type MyRuleFactory struct {
ruleType string
}
func (f *MyRuleFactory) Create(id string, def Definition, deps Dependencies) (rtypes.Rule, error) {
return &MyRule{...}, nil
}
func (f *MyRuleFactory) Validate(def Definition) error {
// Validate rule configuration
return nil
}
2. Implement the Rule interface:
type MyRule struct {
id string
conditions []expression.ConditionExpression
}
func (r *MyRule) ExecuteEvents(messages []message.Message) ([]rtypes.Event, error) {
// Generate events directly in Go code
// gtypes.Event implements rtypes.Event interface
event := >ypes.Event{
Type: gtypes.EventEntityUpdate,
EntityID: "alert.my-rule." + r.id,
Properties: map[string]interface{}{
"triggered": true,
"timestamp": time.Now(),
},
}
return []rtypes.Event{event}, nil
}
3. Register the factory:
func init() {
RegisterRuleFactory("my_rule", &MyRuleFactory{ruleType: "my_rule"})
}
Entity State Watching ¶
Rules can watch entity state changes via NATS KV buckets:
{
"id": "my-rule",
"type": "my_rule",
"entity": {
"pattern": "c360.*.robotics.drone.>",
"watch_buckets": ["ENTITY_STATES"]
}
}
The processor watches these buckets using the KV watch pattern and converts entity state updates into messages for rule evaluation.
Runtime Configuration ¶
Rules support dynamic runtime updates via ApplyConfigUpdate():
- Add/remove rules without restart
- Update rule conditions and metadata
- Enable/disable rules on the fly
Event Generation ¶
Rules generate gtypes.Event directly (no template system):
- Events contain: Type, EntityID, Properties, Metadata
- Published to NATS graph subjects
- Consumed by graph processor for entity storage
Metrics ¶
The processor exposes Prometheus metrics:
- semstreams_rule_evaluations_total
- semstreams_rule_triggers_total
- semstreams_rule_evaluation_duration_seconds
- semstreams_rule_errors_total
Example Usage ¶
config := rule.DefaultConfig()
config.Ports = &component.PortConfig{
Inputs: []component.PortDefinition{
{
Name: "semantic_messages",
Type: "nats",
Subject: "process.>",
},
},
}
processor, err := rule.NewProcessor(natsClient, &config)
if err != nil {
log.Fatal(err)
}
err = processor.Initialize()
err = processor.Start(ctx)
For comprehensive documentation, see:
- /Users/coby/Code/c360/semdocs/docs/guides/rules-engine.md
- /Users/coby/Code/c360/semdocs/docs/specs/SPEC-001-generic-rules-engine.md
Package rule - Execution context for rule actions ¶
Package rule - Expression Rule Factory for condition-based rules ¶
Package rule provides interfaces and implementations for rule processing ¶
Package rule - NATS KV Configuration Integration for Rules ¶
Package rule - KV write action support for rule-driven state machine orchestration ¶
Package rule provides a rule processing component that implements the Discoverable interface for processing message streams through rules
Package rule - Rule Factory Pattern for Dynamic Rule Creation ¶
Package rule - Schedule tracking for cron rule firing.
ScheduleTracker persists per-rule last-fired timestamps to the RULE_SCHEDULES KV bucket so that:
- Restart-recovery can detect missed fires by comparing the persisted last-fired timestamp against the cron schedule's expected next fire.
- Operators (and downstream components like governance startup hooks) can read last-fire state without coupling to scheduler internals.
The bucket is intentionally simple: one JSON record per rule, keyed by rule ID. Records are upserted on every successful fire; they have no TTL (retention is the rule's own enable/disable lifecycle).
Key shape (retrofit-safe) ¶
MVP keys are the bare rule ID — `{ruleID}` — because every cron rule fires once globally per tick. When the deferred per-entity fan-out extension lands (Definition.ForEach), keys will gain a `.{entityID}` suffix to track last-fired per fanout target. Existing single-fire rules keep the bare-rule-ID shape; the suffix is purely additive and requires no migration. See ADR-031 retrofit seams for context.
External read access ¶
Governance startup hooks (kill-switch sweeps, chain-hash audits) need to read last-fired timestamps to issue catch-up sweeps when a missed fire crosses a meaningful interval. Two access patterns are supported:
- In-process callers hold a *ScheduleTracker reference and call LastFiredAt(ctx, ruleID).
- Out-of-process callers read RULE_SCHEDULES KV directly using the documented record shape (LastFireRecord). Bucket name and JSON shape are part of the framework's public contract.
Package rule - State tracking for stateful ECA rules ¶
Package rule - Stateful rule evaluation with state tracking ¶
Package rule - Test Rule Factory for Integration Tests ¶
Package rule provides triple mutation support via NATS request/response
Package rule - Workflow trigger payload for rule-to-workflow integration
Index ¶
- Constants
- Variables
- func AssertEventuallyTrue(t *testing.T, condition func() bool, timeout time.Duration, tick time.Duration, ...)
- func CreateBatteryEntity(id string, level float64, voltage float64) *gtypes.EntityState
- func CreateDroneEntity(id string, armed bool, mode string, altitude float64) *gtypes.EntityState
- func CreateRuleProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func CreateTestEntityPattern(domain, category, instance string) string
- func GetRegisteredRuleTypes() []string
- func GetRuleSchemas() map[string]Schema
- func Register(registry *component.Registry) error
- func RegisterPayloads(reg *payloadregistry.Registry) error
- func RegisterRuleFactory(ruleType string, factory Factory) error
- func SetupKVBucketForTesting(t *testing.T, js jetstream.JetStream, bucketName string) jetstream.KeyValue
- func UnregisterRuleFactory(ruleType string) error
- func ValidateDefinition(def Definition) error
- func VerifyRuleTriggered(_ *testing.T, processor *Processor, ruleName string) bool
- func WaitForEntityProcessing(_ *testing.T, _ time.Duration)
- func WaitForKVWatcher(t *testing.T, processor *Processor, timeout time.Duration)
- type Action
- type ActionExecutor
- func NewActionExecutor(logger *slog.Logger) *ActionExecutor
- func NewActionExecutorComplete(logger *slog.Logger, mutator TripleMutator, publisher Publisher, ...) *ActionExecutor
- func NewActionExecutorFull(logger *slog.Logger, mutator TripleMutator, publisher Publisher) *ActionExecutor
- func NewActionExecutorWithMutator(logger *slog.Logger, mutator TripleMutator) *ActionExecutor
- func (e *ActionExecutor) Execute(ctx context.Context, action Action, ec *ExecutionContext) error
- func (e *ActionExecutor) ExecuteAddTriple(ctx context.Context, action Action, ec *ExecutionContext) (message.Triple, error)
- func (e *ActionExecutor) ExecuteRemoveTriple(ctx context.Context, action Action, ec *ExecutionContext) error
- func (e *ActionExecutor) SetToolRegistry(r component.ToolRegistryReader)
- type ActionExecutorInterface
- type BaseRuleFactory
- type Config
- type ConfigManager
- func (rcm *ConfigManager) DeleteRule(ctx context.Context, ruleID string) error
- func (rcm *ConfigManager) GetRule(ctx context.Context, ruleID string) (*Definition, error)
- func (rcm *ConfigManager) InitializeKVStore(natsClient *natsclient.Client) error
- func (rcm *ConfigManager) ListRules(ctx context.Context) (map[string]Definition, error)
- func (rcm *ConfigManager) ReconcileCount() int
- func (rcm *ConfigManager) SaveRule(ctx context.Context, ruleID string, ruleDef Definition) error
- func (rcm *ConfigManager) SeedFromRuntime(ctx context.Context) error
- func (rcm *ConfigManager) Start(_ context.Context) error
- func (rcm *ConfigManager) Stop() error
- func (rcm *ConfigManager) WatchRules(_ context.Context, _ func(ruleID string, rule Rule, operation string)) error
- type CronRule
- func (r *CronRule) Actions() []Action
- func (r *CronRule) Cooldown() time.Duration
- func (r *CronRule) Description() string
- func (r *CronRule) Enabled() bool
- func (r *CronRule) FireEveryN() int
- func (r *CronRule) ID() string
- func (r *CronRule) Metadata() map[string]any
- func (r *CronRule) Name() string
- func (r *CronRule) Schedule() cronlib.Schedule
- func (r *CronRule) ScheduleString() string
- type CronScheduler
- type CronSchedulerConfig
- type Definition
- type Dependencies
- type EntityConfig
- type EntityStateEvaluator
- type Evaluation
- type Event
- type Example
- type ExecutionContext
- type ExpressionRule
- func (r *ExpressionRule) Evaluate(messages []message.Message) bool
- func (r *ExpressionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
- func (r *ExpressionRule) ExecuteEvents(messages []message.Message) ([]Event, error)
- func (r *ExpressionRule) Name() string
- func (r *ExpressionRule) Subscribe() []string
- type ExpressionRuleFactory
- type Factory
- type KVTestHelper
- type KVWriter
- type LastFireRecord
- type MatchState
- type Metrics
- type Processor
- func (rp *Processor) ApplyConfigUpdate(changes map[string]any) error
- func (rp *Processor) ConfigSchema() component.ConfigSchema
- func (rp *Processor) DataFlow() component.FlowMetrics
- func (rp *Processor) DebugStatus() any
- func (rp *Processor) GetRuleMetrics() map[string]any
- func (rp *Processor) GetRuntimeConfig() map[string]any
- func (rp *Processor) Health() component.HealthStatus
- func (rp *Processor) Initialize() error
- func (rp *Processor) InputPorts() []component.Port
- func (rp *Processor) Meta() component.Metadata
- func (rp *Processor) OutputPorts() []component.Port
- func (rp *Processor) SetDecoder(d *message.Decoder)
- func (rp *Processor) SetToolRegistry(r component.ToolRegistryReader)
- func (rp *Processor) Start(ctx context.Context) error
- func (rp *Processor) Stop(_ time.Duration) error
- func (rp *Processor) UpdateWatchBuckets(newBuckets map[string][]string) error
- func (rp *Processor) UpdateWatchPatterns(newPatterns []string) error
- func (rp *Processor) ValidateConfigUpdate(changes map[string]any) error
- type Publisher
- type Rule
- type ScheduleContext
- type ScheduleTracker
- func (st *ScheduleTracker) Bucket() jetstream.KeyValue
- func (st *ScheduleTracker) Delete(ctx context.Context, ruleID string) error
- func (st *ScheduleTracker) LastFiredAt(ctx context.Context, ruleID string) (LastFireRecord, error)
- func (st *ScheduleTracker) List(ctx context.Context) ([]LastFireRecord, error)
- func (st *ScheduleTracker) RecordFire(ctx context.Context, ruleID, scheduleSpec string, firedAt time.Time) error
- type Schema
- type StateTracker
- func (st *StateTracker) Delete(ctx context.Context, ruleID, entityKey string) error
- func (st *StateTracker) DeleteAllForEntity(ctx context.Context, entityID string) error
- func (st *StateTracker) Get(ctx context.Context, ruleID, entityKey string) (MatchState, error)
- func (st *StateTracker) Set(ctx context.Context, state MatchState) error
- type StatefulEvaluator
- type Status
- type TestRule
- type TestRuleFactory
- type Transition
- type TripleMutator
- type ValidationError
- type WorkflowTriggerPayload
Constants ¶
const ( // ActionTypePublish publishes a message to a NATS subject ActionTypePublish = "publish" // ActionTypeAddTriple creates a relationship triple in the graph ActionTypeAddTriple = "add_triple" // ActionTypeRemoveTriple removes a relationship triple from the graph ActionTypeRemoveTriple = "remove_triple" // ActionTypeUpdateTriple updates metadata on an existing triple ActionTypeUpdateTriple = "update_triple" // ActionTypePublishAgent triggers an agentic loop by publishing a TaskMessage ActionTypePublishAgent = "publish_agent" // ActionTypeTriggerWorkflow triggers a reactive workflow by publishing to workflow.trigger.<workflow_id> ActionTypeTriggerWorkflow = "trigger_workflow" // ActionTypePublishBoidSignal publishes a Boid steering signal for agent coordination ActionTypePublishBoidSignal = "publish_boid_signal" // ActionTypeUpdateKV writes JSON to a named KV bucket with optional CAS merge ActionTypeUpdateKV = "update_kv" )
Action type constants define the supported action types for rule execution.
const ( SubjectTripleAdd = "graph.mutation.triple.add" SubjectTripleRemove = "graph.mutation.triple.remove" MutationTimeout = 5 * time.Second )
NATS subjects for graph mutations (must match processor/graph/mutations.go)
const ( WorkflowTriggerDomain = "rule" WorkflowTriggerCategory = "workflow_trigger" WorkflowTriggerVersion = "v1" )
Workflow trigger payload type constants. Uses "rule" domain to avoid conflict with processor/workflow's workflow.trigger.v1
const CronRuleType = "cron"
CronRuleType is the string used as Definition.Type for cron rules.
const ScheduleBucketName = "RULE_SCHEDULES"
ScheduleBucketName is the canonical NATS KV bucket name for per-rule last-fired timestamps. Exported so out-of-process readers (governance startup hooks, ops dashboards) can address the bucket without stringly-typed duplication.
Variables ¶
var ErrScheduleRecordNotFound = errors.New("schedule record not found")
ErrScheduleRecordNotFound is returned by LastFiredAt when no fire has been recorded for the requested rule. Callers should treat this as "never fired" rather than an error condition — it is the expected state for a freshly-deployed rule.
var ErrStateNotFound = errors.New("rule state not found")
ErrStateNotFound is returned when no rule state exists for the given key
Functions ¶
func AssertEventuallyTrue ¶
func AssertEventuallyTrue(t *testing.T, condition func() bool, timeout time.Duration, tick time.Duration, msgAndArgs ...interface{})
AssertEventuallyTrue asserts that a condition becomes true within timeout
func CreateBatteryEntity ¶
func CreateBatteryEntity(id string, level float64, voltage float64) *gtypes.EntityState
CreateBatteryEntity creates a battery entity for testing
func CreateDroneEntity ¶
CreateDroneEntity creates a drone entity for testing
func CreateRuleProcessor ¶
func CreateRuleProcessor(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
CreateRuleProcessor creates a rule processor with the new factory pattern
func CreateTestEntityPattern ¶
CreateTestEntityPattern creates entity IDs matching test patterns
func GetRegisteredRuleTypes ¶
func GetRegisteredRuleTypes() []string
GetRegisteredRuleTypes returns all registered rule types.
Note: built-in rule kinds that do not go through the factory registry (today: "cron" — see CronRuleType) are not returned here. Callers that need the complete known-types surface should also include them explicitly. See isKnownRuleType for the validation-side equivalent.
func GetRuleSchemas ¶
GetRuleSchemas returns schemas for all registered rule types
func RegisterPayloads ¶
func RegisterPayloads(reg *payloadregistry.Registry) error
RegisterPayloads registers the workflow-trigger payload type with the supplied registry. Called from payloadbuiltins.Register at process bootstrap.
func RegisterRuleFactory ¶
RegisterRuleFactory registers a rule factory
func SetupKVBucketForTesting ¶
func SetupKVBucketForTesting(t *testing.T, js jetstream.JetStream, bucketName string) jetstream.KeyValue
SetupKVBucketForTesting creates and initializes a KV bucket for testing
func UnregisterRuleFactory ¶
UnregisterRuleFactory removes a rule factory for a given type This is primarily for testing or dynamic factory management
func ValidateDefinition ¶
func ValidateDefinition(def Definition) error
ValidateDefinition validates a rule Definition for fields that are processor-level concerns (not delegated to individual rule factories). Call this before passing a Definition to CreateRuleFromDefinition.
func VerifyRuleTriggered ¶
VerifyRuleTriggered checks if a rule has been triggered by examining metrics or events
func WaitForEntityProcessing ¶
WaitForEntityProcessing waits for an entity update to be processed This is useful when testing async KV watchers
Types ¶
type Action ¶
type Action struct {
// Type specifies the action type (publish, add_triple, remove_triple, update_triple, publish_agent)
Type string `json:"type"`
// Subject is the NATS subject for publish actions
Subject string `json:"subject,omitempty"`
// Predicate is the relationship type for triple actions
Predicate string `json:"predicate,omitempty"`
// Object is the target entity or value for triple actions
Object string `json:"object,omitempty"`
// TTL specifies optional expiration time for triples (e.g., "5m", "1h")
TTL string `json:"ttl,omitempty"`
// Properties contains additional metadata for the action
Properties map[string]any `json:"properties,omitempty"`
// Role is the agent role for publish_agent actions (e.g., "general", "architect", "editor")
Role string `json:"role,omitempty"`
// Model is the model endpoint name for publish_agent actions
Model string `json:"model,omitempty"`
// Prompt is the task prompt template for publish_agent actions
// Supports variable substitution: $entity.id, $related.id
Prompt string `json:"prompt,omitempty"`
// Tools is the per-spawned-agent tool allowlist for publish_agent actions.
// Tool names are resolved to agentic.ToolDefinition against the global
// tool registry at dispatch time; unknown names are logged and dropped.
// Empty/nil leaves TaskMessage.Tools unset, which makes the spawned loop
// fall back to global tool discovery (existing behaviour).
//
// This is the product-layer hook for scoping which tools a given role
// can see. Putting it on the rule — not in agentic-tools Config — keeps
// role→tools decisions in the workflow config that already owns the
// role name, model, and prompt for the spawned agent.
Tools []string `json:"tools,omitempty"`
// WorkflowID is the workflow identifier for trigger_workflow actions
WorkflowID string `json:"workflow_id,omitempty"`
// ContextData provides additional context passed to the workflow
ContextData map[string]any `json:"context_data,omitempty"`
// BoidSignalType specifies the type of boid signal: separation, cohesion, or alignment
BoidSignalType string `json:"boid_signal_type,omitempty"`
// BoidStrength specifies the steering strength for boid signals (0.0-1.0)
BoidStrength float64 `json:"boid_strength,omitempty"`
// WorkflowSlug identifies the workflow for publish_agent actions (e.g., "github-issue-to-pr")
WorkflowSlug string `json:"workflow_slug,omitempty"`
// WorkflowStep identifies the step within the workflow (e.g., "qualify", "develop", "review")
WorkflowStep string `json:"workflow_step,omitempty"`
// When is an optional guard clause for conditional action execution.
// If present, all conditions must match (AND logic) against the entity's current
// state and $state.* fields for the action to execute. Actions without When always execute.
When []expression.ConditionExpression `json:"when,omitempty"`
// Bucket is the KV bucket name for update_kv actions (e.g., "PLAN_STATES").
// Supports variable substitution.
Bucket string `json:"bucket,omitempty"`
// Key is the KV key for update_kv actions. Supports variable substitution.
Key string `json:"key,omitempty"`
// Payload is the data to write for update_kv actions.
// Supports variable substitution in string values (including nested maps).
Payload map[string]any `json:"payload,omitempty"`
// Merge controls write semantics for update_kv:
// true = CAS read-modify-write (merge payload into existing document)
// false = overwrite entire document (last writer wins)
Merge bool `json:"merge,omitempty"`
}
Action represents an action to execute when a rule fires. Actions are triggered by state transitions (OnEnter, OnExit) or while a condition remains true (WhileTrue).
type ActionExecutor ¶
type ActionExecutor struct {
// contains filtered or unexported fields
}
ActionExecutor executes actions for rules. It handles triple mutations, NATS publishing, KV writes, and other action types.
func NewActionExecutor ¶
func NewActionExecutor(logger *slog.Logger) *ActionExecutor
NewActionExecutor creates a new ActionExecutor with the given logger. If logger is nil, uses the default logger.
func NewActionExecutorComplete ¶
func NewActionExecutorComplete(logger *slog.Logger, mutator TripleMutator, publisher Publisher, kvWriter KVWriter) *ActionExecutor
NewActionExecutorComplete creates an ActionExecutor with all capabilities including KV writes.
func NewActionExecutorFull ¶
func NewActionExecutorFull(logger *slog.Logger, mutator TripleMutator, publisher Publisher) *ActionExecutor
NewActionExecutorFull creates a new ActionExecutor with full functionality. The mutator enables triple persistence, and the publisher enables NATS publishing.
func NewActionExecutorWithMutator ¶
func NewActionExecutorWithMutator(logger *slog.Logger, mutator TripleMutator) *ActionExecutor
NewActionExecutorWithMutator creates a new ActionExecutor with triple mutation support. The mutator enables actual persistence of triple operations via NATS request/response.
func (*ActionExecutor) Execute ¶
func (e *ActionExecutor) Execute(ctx context.Context, action Action, ec *ExecutionContext) error
Execute runs the given action using the execution context. The ExecutionContext provides the entity ID, related entity ID, full entity state, and match state for rich action execution.
func (*ActionExecutor) ExecuteAddTriple ¶
func (e *ActionExecutor) ExecuteAddTriple(ctx context.Context, action Action, ec *ExecutionContext) (message.Triple, error)
ExecuteAddTriple executes an add_triple action, creating a new semantic triple. Returns the created triple and any error that occurred. If a TripleMutator is configured, the triple is persisted via NATS request/response.
func (*ActionExecutor) ExecuteRemoveTriple ¶
func (e *ActionExecutor) ExecuteRemoveTriple(ctx context.Context, action Action, ec *ExecutionContext) error
ExecuteRemoveTriple executes a remove_triple action, removing a semantic triple. If a TripleMutator is configured, the triple is removed via NATS request/response.
func (*ActionExecutor) SetToolRegistry ¶
func (e *ActionExecutor) SetToolRegistry(r component.ToolRegistryReader)
SetToolRegistry installs the shared tool registry used by resolveToolNames during publish_agent action execution. nil-valued arg disables tool name resolution (tools list passed to the agent is left empty). Set explicitly after construction by the rule processor when it has access to deps.ToolRegistry.
type ActionExecutorInterface ¶
type ActionExecutorInterface interface {
Execute(ctx context.Context, action Action, ec *ExecutionContext) error
}
ActionExecutorInterface defines the interface for action execution. Actions receive an ExecutionContext with the full entity state and match state, replacing the previous (entityID, relatedID string) signature.
type BaseRuleFactory ¶
type BaseRuleFactory struct {
// contains filtered or unexported fields
}
BaseRuleFactory provides common factory functionality
func NewBaseRuleFactory ¶
func NewBaseRuleFactory(ruleType, displayName, description, category string) *BaseRuleFactory
NewBaseRuleFactory creates a base factory implementation
func (*BaseRuleFactory) ValidateExpression ¶
func (f *BaseRuleFactory) ValidateExpression(def Definition) error
ValidateExpression validates expression configuration
type Config ¶
type Config struct {
// Port configuration for inputs and outputs
Ports *component.PortConfig `` /* 168-byte string literal not displayed */
// Rule configuration sources
RulesFiles []string `json:"rules_files" schema:"type:array,description:Paths to JSON rule definition files,default:[],category:basic"`
InlineRules []Definition `json:"inline_rules,omitempty" schema:"type:array,description:Inline rule definitions (alternative to files),category:basic"`
// Message cache configuration (not exposed in schema - internal config)
MessageCache cache.Config `json:"message_cache"`
// Buffer window size for time-window analysis
BufferWindowSize string `` /* 135-byte string literal not displayed */
// Alert cooldown period to prevent spam
AlertCooldownPeriod string `` /* 139-byte string literal not displayed */
// Graph processor integration
EnableGraphIntegration bool `` /* 130-byte string literal not displayed */
// NATS KV patterns to watch for entity changes (e.g., 'telemetry.robotics.>')
// DEPRECATED: Use EntityWatchBuckets for multi-bucket support. This field is still
// supported for backwards compatibility and applies to ENTITY_STATES bucket.
EntityWatchPatterns []string `` /* 153-byte string literal not displayed */
// EntityWatchBuckets maps bucket names to watch patterns.
// This enables rules to observe operational results from multiple components.
// Example: {"ENTITY_STATES": ["telemetry.>"], "WORKFLOW_EXECUTIONS": ["COMPLETE_*"]}
// If not specified, falls back to EntityWatchPatterns for ENTITY_STATES bucket.
EntityWatchBuckets map[string][]string `` /* 147-byte string literal not displayed */
// Debounce delay for rule evaluation (settling time for entity state)
// Default is 0 (disabled) to ensure rules evaluate against each state change.
// Set to a positive value (e.g., 100) to batch rapid updates and evaluate final state only.
DebounceDelayMs time.Duration `` /* 146-byte string literal not displayed */
// JetStream consumer configuration (not exposed in schema - internal config)
Consumer struct {
Enabled bool `json:"enabled"` // Enable JetStream consumer
AckWaitSeconds int `json:"ack_wait_seconds"` // Acknowledgment timeout
MaxDeliver int `json:"max_deliver"` // Max delivery attempts
ReplayPolicy string `json:"replay_policy"` // "instant" or "original"
} `json:"consumer"`
}
Config holds configuration for the RuleProcessor
func CreateRuleTestConfig ¶
CreateRuleTestConfig creates a test configuration for rule processor
func (Config) MarshalJSON ¶
MarshalJSON implements custom JSON marshaling for Config
func (*Config) UnmarshalJSON ¶
UnmarshalJSON implements custom JSON unmarshaling for Config
type ConfigManager ¶
type ConfigManager struct {
// contains filtered or unexported fields
}
ConfigManager manages rules through NATS KV configuration.
Two instances of ConfigManager coexist in a running binary:
- The Pattern-B CRUD manager constructed in cmd/semstreams/main.go (buildRuleManager) with processor=nil. It handles agent tool writes (create_rule, update_rule, delete_rule) and has no watcher.
- The component-internal manager constructed inside Processor.Start with processor non-nil. It owns the KV watcher and applies hot-reload updates to the running processor.
Both read/write semstreams_config:rules.*. The component watcher picks up writes from the CRUD manager via normal KV semantics.
func NewConfigManager ¶
func NewConfigManager(processor *Processor, configMgr *config.Manager, logger *slog.Logger) *ConfigManager
NewConfigManager creates a new rule configuration manager
func (*ConfigManager) DeleteRule ¶
func (rcm *ConfigManager) DeleteRule(ctx context.Context, ruleID string) error
DeleteRule removes a rule configuration from NATS KV.
func (*ConfigManager) GetRule ¶
func (rcm *ConfigManager) GetRule(ctx context.Context, ruleID string) (*Definition, error)
GetRule retrieves a rule configuration from NATS KV.
func (*ConfigManager) InitializeKVStore ¶
func (rcm *ConfigManager) InitializeKVStore(natsClient *natsclient.Client) error
InitializeKVStore initializes the KVStore for direct KV operations
func (*ConfigManager) ListRules ¶
func (rcm *ConfigManager) ListRules(ctx context.Context) (map[string]Definition, error)
ListRules returns all rule configurations from the KV store.
Prefers direct KV reads when a kvStore is wired (consistent with SaveRule/GetRule/DeleteRule). Falls back to the processor's runtime config for deployments that haven't called InitializeKVStore — kept so the existing runtime-config-only path still works. Callers that need full Definition data should ensure kvStore is initialised; the fallback returns a stub Definition carrying only ID/Type/Name/Enabled.
func (*ConfigManager) ReconcileCount ¶
func (rcm *ConfigManager) ReconcileCount() int
ReconcileCount returns the total number of times reconcileFromKV has completed successfully. Intended for integration tests that verify debounce coalescing. For tests only.
func (*ConfigManager) SaveRule ¶
func (rcm *ConfigManager) SaveRule(ctx context.Context, ruleID string, ruleDef Definition) error
SaveRule saves a rule configuration to NATS KV.
func (*ConfigManager) SeedFromRuntime ¶
func (rcm *ConfigManager) SeedFromRuntime(ctx context.Context) error
SeedFromRuntime writes file-loaded rules from the running processor into KV idempotently. Uses Create (not Put) so operator edits already in KV are never overwritten — a key-already-exists error is treated as a no-op.
Reads from rp.ruleDefinitions (populated by loadRules/Initialize) since rp.ruleConfigs (from GetRuntimeConfig) is only populated by ApplyConfigUpdate, not by the initial file/inline load path.
Partial failures are logged but do not abort seeding. The method returns nil even when individual writes fail so that hot-reload startup is not blocked.
func (*ConfigManager) Start ¶
func (rcm *ConfigManager) Start(_ context.Context) error
Start begins watching for rule configuration updates.
When processor is nil (Pattern-B CRUD path), Start is a no-op so that cmd/semstreams/main.go can safely call it without wiring a watcher.
When processor is non-nil (component-internal path), Start:
- Seeds file-loaded rules into KV idempotently via SeedFromRuntime.
- Opens a KV watcher on "rules.*".
- Spawns a goroutine that debounces watcher events and calls reconcileFromKV on each coalesced burst.
func (*ConfigManager) Stop ¶
func (rcm *ConfigManager) Stop() error
Stop stops the configuration manager and waits for the watcher goroutine to exit cleanly.
func (*ConfigManager) WatchRules ¶
func (rcm *ConfigManager) WatchRules(_ context.Context, _ func(ruleID string, rule Rule, operation string)) error
WatchRules watches for rule changes and returns active rules
type CronRule ¶
type CronRule struct {
// contains filtered or unexported fields
}
CronRule holds a parsed cron-rule definition ready for the scheduler to register and fire. It carries everything the scheduler needs at fire time without re-reading the source Definition.
Fields are unexported behind accessor methods so the scheduler and metrics layer can hold a CronRule reference without coupling to its internals — important for the future per-tenant fan-out path which will need to build per-iteration Actions copies without mutating the shared CronRule.
func NewCronRule ¶
func NewCronRule(def Definition) (*CronRule, error)
NewCronRule builds a CronRule from a Definition. Returns an error if any cron-specific validation fails: wrong Type, missing schedule, schedule fails to parse, cooldown unparseable, forbidden field present, or any action with an empty Type.
func (*CronRule) Actions ¶
Actions returns a clone of the action list to execute on each fire. The clone protects callers (per-iteration fan-out, in particular) from accidentally mutating the shared CronRule's slice across fires.
func (*CronRule) Cooldown ¶
Cooldown returns the duration after which a fire may run again. Zero means no cooldown.
func (*CronRule) Description ¶
Description returns the rule's description string.
func (*CronRule) Enabled ¶
Enabled reports whether the rule should be registered with the scheduler.
func (*CronRule) FireEveryN ¶
FireEveryN returns the action-gating factor (every Nth tick fires actions). Zero or one means "fire every tick."
func (*CronRule) Metadata ¶
Metadata returns the rule's metadata map, or nil if none was supplied. Useful for carrying provenance (e.g. operating-model entry IDs) from the source Definition through to action substitution.
func (*CronRule) Schedule ¶
Schedule returns the parsed cron schedule. Used by the scheduler to compute next-fire times and by the missed-fire detector to compute expected fire intervals.
func (*CronRule) ScheduleString ¶
ScheduleString returns the original cron expression, useful for metrics labels and substitution variables.
type CronScheduler ¶
type CronScheduler struct {
// contains filtered or unexported fields
}
CronScheduler dispatches CronRule actions on a cron schedule. Methods are safe for concurrent use; the entries map is guarded by mu.
func NewCronScheduler ¶
func NewCronScheduler(cfg CronSchedulerConfig) (*CronScheduler, error)
NewCronScheduler builds a scheduler that will dispatch actions through cfg.Executor. Returns an error only when cfg.Executor is nil — the other fields are all optional with documented degraded-mode behaviour.
func (*CronScheduler) Deregister ¶
func (s *CronScheduler) Deregister(ruleID string)
Deregister removes a cron rule from the scheduler. It is a no-op when the rule isn't registered, so hot-reload can call it unconditionally.
func (*CronScheduler) Register ¶
func (s *CronScheduler) Register(rule *CronRule) error
Register schedules a CronRule for periodic firing. Disabled rules are skipped silently (logged at Debug). Returns an error if the rule is already registered — callers performing hot-reload should call Deregister first.
func (*CronScheduler) RegisteredCount ¶
func (s *CronScheduler) RegisteredCount() int
RegisteredCount returns the number of rules currently registered. Used by the processor for logging and (Chunk 5) the registered-rules gauge.
func (*CronScheduler) Start ¶
func (s *CronScheduler) Start(ctx context.Context) error
Start kicks off the scheduler ticker. The supplied context is captured and passed to every subsequent fire callback so action dispatches inherit the processor's cancellation. Calling Start twice is an error; the scheduler is single-shot per Start/Stop cycle.
Before the ticker starts, Start runs a one-shot restoreFromTracker pass against the persisted last-fired records (when a tracker is configured). It seeds each entry's in-memory lastFiredNanos cache so the cooldown gate and `$schedule.last_fired_at` substitution behave correctly across restarts, and it logs a Warn for any rule whose schedule expected at least one fire between the persisted timestamp and now (log-only per ADR-031 product direction). Tracker failures are logged but do not block startup; a clean cron tick is more important than a perfect audit log.
func (*CronScheduler) Stop ¶
func (s *CronScheduler) Stop() context.Context
Stop signals the scheduler to halt. It returns the context returned by robfig's Cron.Stop, which closes when all in-flight fires have completed. Callers should select on the returned context with their shutdown deadline. Calling Stop on a never-started scheduler is safe.
type CronSchedulerConfig ¶
type CronSchedulerConfig struct {
// Executor is required. A nil executor is rejected because every
// fire would silently no-op, hiding misconfiguration.
Executor ActionExecutorInterface
// Tracker is optional. Pass nil to run without persistence — no
// cross-restart missed-fire detection, no cooldown hydration on
// startup. Tests typically pass nil; the production processor
// wires in a tracker bound to the RULE_SCHEDULES bucket.
Tracker *ScheduleTracker
// Metrics is optional. Pass nil to run without Prometheus
// observability — every recordX call short-circuits on the nil
// receiver. Tests that don't assert on metrics pass nil; the
// production processor wires in metrics scoped to its registry.
Metrics *cronMetrics
// Logger is optional. Defaults to slog.Default() when nil so the
// scheduler always has something to log to.
Logger *slog.Logger
}
CronSchedulerConfig groups the collaborators NewCronScheduler needs. Constructed by initializeCronScheduler in production; tests build it inline with whichever fields they exercise. Following the team's "4+ args → request struct" convention so future additions (timeouts, custom parsers, observers) don't grow a positional arg list.
type Definition ¶
type Definition struct {
ID string `json:"id"`
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
Enabled bool `json:"enabled"`
Conditions []expression.ConditionExpression `json:"conditions"`
Logic string `json:"logic"`
Cooldown string `json:"cooldown,omitempty"`
Entity EntityConfig `json:"entity,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
OnEnter []Action `json:"on_enter,omitempty"` // Fires on false→true transition
OnExit []Action `json:"on_exit,omitempty"` // Fires on true→false transition
WhileTrue []Action `json:"while_true,omitempty"` // Fires on every update while true
OnRecovery []Action `json:"on_recovery,omitempty"` // Fires once on restart if still matching; falls back to OnEnter
RelatedPatterns []string `json:"related_patterns,omitempty"` // For pair rules
// RerunOnRecovery opts rules with only OnEnter/WhileTrue defined into the
// bootstrap recovery path. When false (default), such rules must declare
// OnRecovery explicitly to re-fire on restart. Rules that already declare
// OnRecovery always participate regardless of this flag.
RerunOnRecovery bool `json:"rerun_on_recovery,omitempty"`
// MaxIterations limits how many times a rule can enter the matching state for an entity.
// 0 means no limit. Use $state.iteration and $state.max_iterations in When clauses
// for retry budgets (e.g., retry up to 3 times then escalate).
MaxIterations int `json:"max_iterations,omitempty"`
// FireEveryNEvents makes the rule fire its action only every Nth matching
// event. N=0 and N=1 both mean "fire on every match" (backward-compatible
// default). N=2 fires on the 2nd, 4th, 6th matching events; N=10 fires on
// the 10th, 20th, 30th, etc.
//
// The MATCH check still runs on every event — only the action is gated.
// Counter state is per-rule (not per-entity) and resets when the rule
// definition is replaced via applyRuleChanges, so a policy change begins
// with a fresh rhythm.
//
// This is a parallel dimension to time-based AlertCooldownPeriod; neither
// replaces the other.
FireEveryNEvents int `json:"fire_every_n_events,omitempty"`
// Schedule is the cron expression for `type: "cron"` rules. Required
// when Type == "cron"; ignored otherwise. Supports POSIX 5-field
// (`min hour dom mon dow`) and the descriptors @hourly / @daily /
// @weekly / @monthly / @yearly. Parsed via robfig/cron/v3 at config
// load time; bad expressions fail rule construction.
Schedule string `json:"schedule,omitempty"`
// Actions is the action list for `type: "cron"` rules. Cron rules use
// this field instead of OnEnter/OnExit/WhileTrue because they have no
// state-transition semantics — every scheduled tick fires every Action
// here. Required when Type == "cron"; ignored otherwise.
Actions []Action `json:"actions,omitempty"`
}
Definition represents a JSON rule configuration
type Dependencies ¶
type Dependencies struct {
NATSClient *natsclient.Client
Logger *slog.Logger
}
Dependencies provides dependencies for rule creation
type EntityConfig ¶
type EntityConfig struct {
Pattern string `json:"pattern"` // Entity ID pattern to match
WatchBuckets []string `json:"watch_buckets"` // KV buckets to watch
}
EntityConfig defines entity-specific configuration
type EntityStateEvaluator ¶
type EntityStateEvaluator interface {
// EvaluateEntityState evaluates the rule directly against EntityState triples.
// Rule conditions should use full predicate paths (e.g., "sensor.measurement.fahrenheit").
EvaluateEntityState(entityState *gtypes.EntityState) bool
}
EntityStateEvaluator is an optional interface for rules that can evaluate directly against EntityState triples, bypassing the message transformation layer. This is more efficient for rules that need to access triple predicates directly.
type Evaluation ¶
type Evaluation struct {
Rule Definition
EntityID string
RelatedID string // "" for single-entity rules
CurrentlyMatching bool
Entity *gtypes.EntityState // nil for message-path rules
Related *gtypes.EntityState // nil for single-entity or message-path
Revision uint64 // KV revision that triggered this evaluation; 0 if unknown
Bootstrap bool // true during watcher initial-state replay
}
Evaluation groups the inputs to a single stateful rule evaluation. Zero values are meaningful: Revision=0 means "no KV revision available" (message-path rules), Bootstrap=false means live traffic (not startup replay), Entity/Related=nil means message-path rule.
type Event ¶
type Event interface {
// EventType returns the type identifier for this event
EventType() string
// Subject returns the NATS subject for publishing this event
Subject() string
// Payload returns the event data as a generic map
Payload() map[string]any
// Validate checks if the event is valid and ready to publish
Validate() error
}
Event represents a generic event that can be published by rules. This interface allows rules to be decoupled from specific event types while still integrating with graph events when EnableGraphIntegration is enabled.
type Example ¶
type Example struct {
Name string `json:"name"`
Description string `json:"description"`
Config Definition `json:"config"`
}
Example provides example configurations
type ExecutionContext ¶
type ExecutionContext struct {
// EntityID is the primary entity identifier.
EntityID string
// RelatedID is the related entity identifier (empty for single-entity rules).
RelatedID string
// Entity is the full entity state with triples (nil for message-path rules).
Entity *gtypes.EntityState
// Related is the related entity state (nil for single-entity rules or message-path).
Related *gtypes.EntityState
// State is the current match state including iteration tracking.
// May be nil for first evaluation before state is persisted.
State *MatchState
// Schedule carries cron-rule context (rule ID, spec, prior fire
// timestamp). Populated by CronScheduler.fire on time-driven
// dispatches; nil for message-path and KV-watch rules. The
// `$schedule.*` substitution layer reads this; see
// cron_substitution.go for the namespace contract.
Schedule *ScheduleContext
}
ExecutionContext carries typed data through the rule evaluation → action pipeline. It replaces the previous (entityID, relatedID string) action signature, providing actions with the full entity state and match state for richer execution logic.
func (*ExecutionContext) RuleID ¶
func (ec *ExecutionContext) RuleID() string
RuleID returns the originating rule's identifier, or an empty string if the execution context has no associated match state (e.g. message-path actions invoked outside a stateful evaluator). Callers use this to scope feedback loop prevention to the rule that caused the write.
func (*ExecutionContext) SubstituteVariables ¶
func (ec *ExecutionContext) SubstituteVariables(template string) string
SubstituteVariables replaces template variables with values from the execution context. Supported variables:
- $now: Current wallclock as RFC3339 UTC (always available)
- $entity.id: The primary entity ID
- $related.id: The related entity ID (for pair rules)
- $state.iteration: Current iteration count
- $state.max_iterations: Configured max iterations
- $schedule.id: Cron rule ID (cron rules only)
- $schedule.spec: Cron expression (cron rules only)
- $schedule.last_fired_at: Prior fire timestamp, RFC3339 UTC; empty on first fire
Entity triple values can be accessed via $entity.triple.<predicate> syntax.
If any template variable survives substitution (e.g. $entity.triple.X where X isn't on the entity at fire time — a common race with late-arriving triples, or a cron-only $schedule.* token reaching an expression rule), the literal stays in the output and a warning is logged so the author sees the silent-pass. Downstream callers that feed the result into an identifier (KV key, NATS subject) will then get a loud failure instead of mysteriously wrong behaviour.
type ExpressionRule ¶
type ExpressionRule struct {
// contains filtered or unexported fields
}
ExpressionRule implements Rule interface for expression-based condition evaluation
func NewExpressionRule ¶
func NewExpressionRule(def Definition) (*ExpressionRule, error)
NewExpressionRule creates a new expression-based rule
func (*ExpressionRule) Evaluate ¶
func (r *ExpressionRule) Evaluate(messages []message.Message) bool
Evaluate evaluates the rule against messages
func (*ExpressionRule) EvaluateEntityState ¶
func (r *ExpressionRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
EvaluateEntityState evaluates the rule directly against EntityState triples. This bypasses the message transformation layer and evaluates conditions directly against triple predicates (e.g., "sensor.measurement.fahrenheit").
func (*ExpressionRule) ExecuteEvents ¶
func (r *ExpressionRule) ExecuteEvents(messages []message.Message) ([]Event, error)
ExecuteEvents generates events when rule triggers
func (*ExpressionRule) Subscribe ¶
func (r *ExpressionRule) Subscribe() []string
Subscribe returns subjects this rule subscribes to
type ExpressionRuleFactory ¶
type ExpressionRuleFactory struct {
// contains filtered or unexported fields
}
ExpressionRuleFactory creates expression-based rules
func NewExpressionRuleFactory ¶
func NewExpressionRuleFactory() *ExpressionRuleFactory
NewExpressionRuleFactory creates a new expression rule factory
func (*ExpressionRuleFactory) Create ¶
func (f *ExpressionRuleFactory) Create(_ string, def Definition, _ Dependencies) (Rule, error)
Create creates an expression rule from definition
func (*ExpressionRuleFactory) Schema ¶
func (f *ExpressionRuleFactory) Schema() Schema
Schema returns the expression rule schema
func (*ExpressionRuleFactory) Type ¶
func (f *ExpressionRuleFactory) Type() string
Type returns the factory type
func (*ExpressionRuleFactory) Validate ¶
func (f *ExpressionRuleFactory) Validate(def Definition) error
Validate validates the rule definition
type Factory ¶
type Factory interface {
// Create creates a rule instance from configuration
Create(id string, config Definition, deps Dependencies) (Rule, error)
// Type returns the rule type this factory creates
Type() string
// Schema returns the configuration schema for UI discovery
Schema() Schema
// Validate validates a rule configuration
Validate(config Definition) error
}
Factory creates rules from configuration
func GetRuleFactory ¶
GetRuleFactory returns a registered rule factory
type KVTestHelper ¶
type KVTestHelper struct {
// contains filtered or unexported fields
}
KVTestHelper provides utilities for KV-based rule testing
func NewKVTestHelper ¶
func NewKVTestHelper(t *testing.T, bucket jetstream.KeyValue) *KVTestHelper
NewKVTestHelper creates a new KV test helper
func (*KVTestHelper) DeleteEntity ¶
func (h *KVTestHelper) DeleteEntity(entityID string)
DeleteEntity removes an entity from the KV bucket
func (*KVTestHelper) UpdateEntityProperty ¶
func (h *KVTestHelper) UpdateEntityProperty(entityID string, predicateFull string, value interface{})
UpdateEntityProperty updates a specific property of an entity
func (*KVTestHelper) WriteEntityState ¶
func (h *KVTestHelper) WriteEntityState(entity *gtypes.EntityState)
WriteEntityState writes an entity state to the KV bucket
type KVWriter ¶
type KVWriter interface {
// UpdateJSON performs a CAS read-modify-write on a JSON document in the named bucket.
// The updateFn receives the current value (empty map if key doesn't exist) and mutates it.
// Uses exponential backoff retry on CAS conflicts (via natsclient.KVStore.UpdateJSON).
UpdateJSON(ctx context.Context, bucket, key string, updateFn func(current map[string]any) error) error
// PutJSON writes a JSON value to the named bucket (last writer wins, no CAS).
PutJSON(ctx context.Context, bucket, key string, value map[string]any) error
}
KVWriter performs read-modify-write operations on domain KV buckets. It abstracts the underlying NATS KV infrastructure for testability.
type LastFireRecord ¶
type LastFireRecord struct {
// RuleID is the cron rule's stable identifier. Duplicates the KV key
// so that List() consumers can decode a record without re-threading
// the key alongside it.
RuleID string `json:"rule_id"`
// ScheduleSpec is the rule's cron expression at the time of the
// recorded fire. Useful for operators reading the bucket directly
// and for missed-fire detection that wants to compute the expected
// fire interval without re-loading rule definitions.
ScheduleSpec string `json:"schedule_spec"`
// LastFiredAt is the wallclock at which the most recent fire began
// dispatching actions. UTC; serialised RFC3339Nano via Go's default
// time.Time JSON encoder. Nanosecond precision is preserved best-
// effort across platforms — callers that depend on exact equality
// (e.g. round-trip tests) should truncate to a known resolution
// before write rather than relying on the encoder.
LastFiredAt time.Time `json:"last_fired_at"`
}
LastFireRecord is the on-disk shape of a per-rule last-fired record. JSON field names are part of the framework's public contract — out-of- process readers depend on them; renames require a migration.
type MatchState ¶
type MatchState struct {
RuleID string `json:"rule_id"`
EntityKey string `json:"entity_key"`
IsMatching bool `json:"is_matching"`
LastTransition string `json:"last_transition"` // ""|"entered"|"exited"
TransitionAt time.Time `json:"transition_at,omitempty"`
SourceRevision uint64 `json:"source_revision"`
LastChecked time.Time `json:"last_checked"`
// Iteration tracks how many times this rule has entered the matching state
// for this entity. Incremented on each TransitionEntered, preserved through exits.
Iteration int `json:"iteration"`
// MaxIterations is the configured limit from the rule Definition.
// 0 means no limit. Used with $state.iteration in When clauses for retry budgets.
MaxIterations int `json:"max_iterations,omitempty"`
// FieldValues tracks the last-seen values of fields used in transition conditions.
// Keys are field names (e.g., "workflow.plan.status"), values are the string
// representation of the last observed value. Used by the transition operator
// to detect field-level state changes across evaluations.
FieldValues map[string]string `json:"field_values,omitempty"`
}
MatchState tracks whether a rule's condition is currently matching for a specific entity or entity pair.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics holds Prometheus metrics for RuleProcessor component
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a component that processes messages through rules
func NewProcessor ¶
func NewProcessor(natsClient *natsclient.Client, config *Config) (*Processor, error)
NewProcessor creates a new rule processor
func NewProcessorWithMetrics ¶
func NewProcessorWithMetrics(natsClient *natsclient.Client, config *Config, metricsRegistry *metric.MetricsRegistry) (*Processor, error)
NewProcessorWithMetrics creates a new rule processor with optional metrics
func (*Processor) ApplyConfigUpdate ¶
ApplyConfigUpdate applies validated configuration changes
func (*Processor) ConfigSchema ¶
func (rp *Processor) ConfigSchema() component.ConfigSchema
ConfigSchema returns configuration schema for component interface
func (*Processor) DataFlow ¶
func (rp *Processor) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Processor) DebugStatus ¶
DebugStatus returns extended debug information for the rule processor. Implements component.DebugStatusProvider.
func (*Processor) GetRuleMetrics ¶
GetRuleMetrics returns metrics for all rules
func (*Processor) GetRuntimeConfig ¶
GetRuntimeConfig returns current runtime configuration
func (*Processor) Health ¶
func (rp *Processor) Health() component.HealthStatus
Health returns current health status
func (*Processor) Initialize ¶
Initialize loads rules and prepares the processor
func (*Processor) InputPorts ¶
InputPorts returns declared input ports
func (*Processor) OutputPorts ¶
OutputPorts returns declared output ports
func (*Processor) SetDecoder ¶
SetDecoder installs the payload Decoder used to unmarshal incoming BaseMessage envelopes. Called by the component factory after construction with message.NewDecoder(deps.PayloadRegistry). Mirrors SetToolRegistry. A nil arg leaves the processor unable to handle semantic messages — handleSemanticMessage will fail-fast.
func (*Processor) SetToolRegistry ¶
func (rp *Processor) SetToolRegistry(r component.ToolRegistryReader)
SetToolRegistry installs the shared tool registry. Called by the component factory after construction with deps.ToolRegistry. The registry flows from here to ActionExecutor at Initialize time so publish_agent's default_tools resolution sees the right tools.
A nil arg is allowed and disables tool name resolution (deployments without agentic-tools).
func (*Processor) UpdateWatchBuckets ¶
UpdateWatchBuckets dynamically updates the entity watch buckets and patterns.
func (*Processor) UpdateWatchPatterns ¶
UpdateWatchPatterns dynamically updates the entity watch patterns for ENTITY_STATES. DEPRECATED: Use UpdateWatchBuckets for multi-bucket support.
type Publisher ¶
type Publisher interface {
// Publish sends a message to a NATS subject.
// The implementation determines whether to use core NATS or JetStream
// based on port configuration.
Publish(ctx context.Context, subject string, data []byte) error
}
Publisher handles publishing messages to NATS subjects. It abstracts the decision between core NATS and JetStream publishing.
type Rule ¶
type Rule interface {
// Name returns the human-readable name of this rule
Name() string
// Subscribe returns the NATS subjects this rule should listen to
Subscribe() []string
// Evaluate checks if the rule conditions are met for the given messages
Evaluate(messages []message.Message) bool
// ExecuteEvents generates events when rule conditions are satisfied
ExecuteEvents(messages []message.Message) ([]Event, error)
}
Rule interface defines the event-based contract for processing rules. Rules evaluate messages and generate events when conditions are met.
func CreateRuleFromDefinition ¶
func CreateRuleFromDefinition(def Definition, deps Dependencies) (Rule, error)
CreateRuleFromDefinition creates a rule using the appropriate factory
type ScheduleContext ¶
type ScheduleContext struct {
// ID is the firing rule's stable identifier — same value as
// Definition.ID and CronRule.ID. Always non-empty for a cron fire.
ID string
// Spec is the cron expression as the rule was registered with it.
// Captured at fire time so a hot-reload that changes the schedule
// doesn't retroactively alter substitution output for in-flight
// dispatches. Always non-empty for a cron fire.
Spec string
// LastFiredAt is the wallclock of the most recent successful fire
// before this one. Zero value (time.IsZero) on the first fire of
// a freshly-registered rule, before any persisted record exists.
// The substitution renders zero as the empty string so authors can
// detect first-fire via `len("$schedule.last_fired_at") == 0` in
// downstream conditional templates.
LastFiredAt time.Time
}
ScheduleContext carries the cron-rule context that's available at fire time but not derivable from any other rule field. ExecutionContext holds a *ScheduleContext on cron-fire dispatches; expression-rule dispatches leave the field nil and the substitution is a no-op.
Fields are exported because the substitution layer reads them directly; the type is constructed by the cron scheduler in fire() and never mutated thereafter.
type ScheduleTracker ¶
type ScheduleTracker struct {
// contains filtered or unexported fields
}
ScheduleTracker persists last-fired timestamps for cron rules to KV. Methods are safe for concurrent use; the underlying NATS KV client already serialises writes.
func NewScheduleTracker ¶
func NewScheduleTracker(bucket jetstream.KeyValue, logger *slog.Logger) *ScheduleTracker
NewScheduleTracker builds a tracker bound to the supplied bucket. A nil bucket is permitted so the rule processor can degrade gracefully when bucket creation fails on startup — every method returns an error when bucket is nil rather than panicking, mirroring StateTracker's nil-tolerant behaviour.
func (*ScheduleTracker) Bucket ¶
func (st *ScheduleTracker) Bucket() jetstream.KeyValue
Bucket returns the underlying jetstream.KeyValue handle. Exposed so in-process callers (governance startup hooks, ops dashboards in the same binary) can use the full KV API — Watch for live updates, ListKeys for filtered scans — without re-resolving the bucket via js.KeyValue. The returned handle may be nil if the tracker was constructed in nil-bucket mode (degraded startup); callers must nil-check.
func (*ScheduleTracker) Delete ¶
func (st *ScheduleTracker) Delete(ctx context.Context, ruleID string) error
Delete removes the persisted record for a rule. Called when a rule is removed from configuration so stale records don't linger. Missing records are treated as success — Delete is idempotent.
func (*ScheduleTracker) LastFiredAt ¶
func (st *ScheduleTracker) LastFiredAt(ctx context.Context, ruleID string) (LastFireRecord, error)
LastFiredAt returns the most recent fire timestamp for the given rule. Returns ErrScheduleRecordNotFound when no fire has been recorded — the caller should treat that as "never fired", not as an error to surface.
The schedule spec stored alongside the timestamp is returned too so missed-fire detection can compute the expected interval without re-loading the live rule definition (which may have been hot-reloaded to a different spec since the last fire).
func (*ScheduleTracker) List ¶
func (st *ScheduleTracker) List(ctx context.Context) ([]LastFireRecord, error)
List returns every persisted last-fire record. Used by the scheduler on Start to detect missed fires and by operators inspecting bucket state. Records are returned in undefined order; callers that need a stable order should sort by RuleID.
A bucket with no records returns an empty slice and no error.
func (*ScheduleTracker) RecordFire ¶
func (st *ScheduleTracker) RecordFire(ctx context.Context, ruleID, scheduleSpec string, firedAt time.Time) error
RecordFire upserts the last-fired record for the given rule. Called by CronScheduler.fire after a successful action dispatch. The schedule spec is captured at the moment of the fire so a hot-reload that changes the cron expression doesn't retroactively rewrite history.
Errors are surfaced to the caller; the scheduler logs them as warnings (a transient KV write failure must not crash the scheduler goroutine — the next fire will overwrite the record).
type Schema ¶
type Schema struct {
Type string `json:"type"`
DisplayName string `json:"display_name"`
Description string `json:"description"`
Category string `json:"category"`
Icon string `json:"icon,omitempty"`
Properties map[string]component.PropertySchema `json:"properties"`
Required []string `json:"required"`
Examples []Example `json:"examples,omitempty"`
}
Schema describes the configuration schema for a rule type
type StateTracker ¶
type StateTracker struct {
// contains filtered or unexported fields
}
StateTracker manages rule match state persistence in NATS KV. It tracks which rules are currently matching for which entities to enable proper transition detection for stateful ECA rules.
func NewStateTracker ¶
func NewStateTracker(bucket jetstream.KeyValue, logger *slog.Logger) *StateTracker
NewStateTracker creates a new StateTracker with the given KV bucket.
func (*StateTracker) Delete ¶
func (st *StateTracker) Delete(ctx context.Context, ruleID, entityKey string) error
Delete removes the match state for a rule and entity key.
func (*StateTracker) DeleteAllForEntity ¶
func (st *StateTracker) DeleteAllForEntity(ctx context.Context, entityID string) error
DeleteAllForEntity removes all rule states associated with an entity. This is called when an entity is deleted to clean up orphaned state.
func (*StateTracker) Get ¶
func (st *StateTracker) Get(ctx context.Context, ruleID, entityKey string) (MatchState, error)
Get retrieves the current match state for a rule and entity key. Returns ErrStateNotFound if no state exists for this combination.
func (*StateTracker) Set ¶
func (st *StateTracker) Set(ctx context.Context, state MatchState) error
Set persists the match state for a rule and entity.
type StatefulEvaluator ¶
type StatefulEvaluator struct {
// contains filtered or unexported fields
}
StatefulEvaluator handles rule evaluation with state tracking. It manages state transitions (entered/exited/none) and executes the appropriate actions based on the transition type.
func NewStatefulEvaluator ¶
func NewStatefulEvaluator(stateTracker *StateTracker, actionExecutor ActionExecutorInterface, logger *slog.Logger) *StatefulEvaluator
NewStatefulEvaluator creates a new stateful evaluator with the given dependencies. If logger is nil, uses the default logger.
func (*StatefulEvaluator) Evaluate ¶
func (e *StatefulEvaluator) Evaluate(ctx context.Context, ev Evaluation) (Transition, error)
Evaluate runs a stateful rule evaluation and fires the appropriate actions for the detected transition:
- TransitionEntered → OnEnter (increments iteration)
- TransitionExited → OnExit (preserves iteration)
- TransitionNone+match → WhileTrue
Action-level When clauses are evaluated before each action runs. New state is persisted to the StateTracker on the way out.
When Evaluation.Bootstrap is true, the watcher is replaying state after a restart. A persisted IsMatching=true state that still matches would otherwise produce TransitionNone — we promote it to a synthetic Entered (fires OnRecovery, or OnEnter under RerunOnRecovery) so state machines resume instead of hanging on restart.
type Status ¶
type Status struct {
DebounceDelayMs int `json:"debounce_delay_ms"`
PendingEvaluations int `json:"pending_evaluations"`
TotalEvaluations int `json:"total_evaluations"`
TotalTriggers int `json:"total_triggers"`
DebouncedCount int `json:"debounced_count"` // Matches Tester's test expectations
RulesLoaded int `json:"rules_loaded"`
LastEvaluationTime time.Time `json:"last_evaluation_time,omitempty"`
// TrackedRevisions is the total number of (rule,entity,revision) tuples
// currently in the feedback-loop-prevention map. Monitor this to detect
// leaks from writes that the watcher never delivers — a healthy
// processor's count should stay roughly proportional to in-flight rule
// actions, bounded by the sweeper.
TrackedRevisions int `json:"tracked_revisions"`
}
Status represents the current status of rule evaluation for debug observability
type TestRule ¶
type TestRule struct {
// contains filtered or unexported fields
}
TestRule is a functional rule implementation for testing
func NewTestRule ¶
func NewTestRule(id, name string, subjects []string, conditions []expression.ConditionExpression) *TestRule
NewTestRule creates a new test rule
func (*TestRule) EvaluateEntityState ¶
func (r *TestRule) EvaluateEntityState(entityState *gtypes.EntityState) bool
EvaluateEntityState evaluates the rule directly against EntityState triples. This implements the EntityStateEvaluator interface for KV watch-based evaluation.
func (*TestRule) ExecuteEvents ¶
ExecuteEvents generates events when rule triggers
type TestRuleFactory ¶
type TestRuleFactory struct {
// contains filtered or unexported fields
}
TestRuleFactory creates test rules for integration testing
func NewTestRuleFactory ¶
func NewTestRuleFactory() *TestRuleFactory
NewTestRuleFactory creates a new test rule factory
func (*TestRuleFactory) Create ¶
func (f *TestRuleFactory) Create(ruleID string, def Definition, _ Dependencies) (Rule, error)
Create creates a test rule from definition
func (*TestRuleFactory) Examples ¶
func (f *TestRuleFactory) Examples() []Example
Examples returns test rule examples
func (*TestRuleFactory) Schema ¶
func (f *TestRuleFactory) Schema() Schema
Schema returns the test rule schema
func (*TestRuleFactory) Type ¶
func (f *TestRuleFactory) Type() string
Type returns the factory type
func (*TestRuleFactory) Validate ¶
func (f *TestRuleFactory) Validate(_ Definition) error
Validate validates the rule definition
type Transition ¶
type Transition string
Transition represents the type of state change detected
const ( // TransitionNone indicates no state change TransitionNone Transition = "" // TransitionEntered indicates condition became true TransitionEntered Transition = "entered" // TransitionExited indicates condition became false TransitionExited Transition = "exited" )
func DetectTransition ¶
func DetectTransition(wasMatching, nowMatching bool) Transition
DetectTransition compares previous and current match state to determine transition type. Returns TransitionEntered if condition changed from false to true. Returns TransitionExited if condition changed from true to false. Returns TransitionNone if state did not change.
type TripleMutator ¶
type TripleMutator interface {
// AddTriple adds a triple via NATS request/response and returns the KV revision.
AddTriple(ctx context.Context, ruleID string, triple message.Triple) (uint64, error)
// RemoveTriple removes a triple via NATS request/response and returns the KV revision.
RemoveTriple(ctx context.Context, ruleID, subject, predicate string) (uint64, error)
}
TripleMutator handles triple mutations via NATS request/response. The returned uint64 is the KV revision after the write, used for per-rule feedback loop prevention. The ruleID identifies the originating rule so the revision can be scoped to that rule only — pass an empty ruleID for ad-hoc mutations that should not be tracked.
type ValidationError ¶
ValidationError represents a payload validation error.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
type WorkflowTriggerPayload ¶
type WorkflowTriggerPayload struct {
WorkflowID string `json:"workflow_id"`
EntityID string `json:"entity_id"`
TriggeredAt time.Time `json:"triggered_at"`
RelatedID string `json:"related_id,omitempty"`
Context map[string]any `json:"context,omitempty"`
}
WorkflowTriggerPayload represents a message sent by the rule processor to trigger a reactive workflow. This payload is wrapped in a BaseMessage for proper deserialization by the reactive workflow engine.
func (*WorkflowTriggerPayload) MarshalJSON ¶
func (p *WorkflowTriggerPayload) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler. This marshals just the payload fields - BaseMessage handles the wrapping.
func (*WorkflowTriggerPayload) Schema ¶
func (p *WorkflowTriggerPayload) Schema() message.Type
Schema returns the message type for workflow trigger payloads.
func (*WorkflowTriggerPayload) UnmarshalJSON ¶
func (p *WorkflowTriggerPayload) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler.
func (*WorkflowTriggerPayload) Validate ¶
func (p *WorkflowTriggerPayload) Validate() error
Validate ensures the payload has required fields.
Source Files
¶
- actions.go
- config.go
- config_validation.go
- cron_metrics.go
- cron_rule.go
- cron_scheduler.go
- cron_substitution.go
- doc.go
- entity_watcher.go
- execution_context.go
- expression_factory.go
- factory.go
- interfaces.go
- kv_config_integration.go
- kv_test_helpers.go
- kv_writer.go
- message_handler.go
- metrics.go
- processor.go
- publisher.go
- rule_factory.go
- rule_loader.go
- runtime_config.go
- schedule_tracker.go
- state_tracker.go
- stateful_evaluator.go
- status.go
- test_rule_factory.go
- triple_mutator.go
- workflow_trigger_payload.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package boid implements Boids-inspired local coordination rules for multi-agent teams.
|
Package boid implements Boids-inspired local coordination rules for multi-agent teams. |
|
Package expression - Expression evaluator implementation
|
Package expression - Expression evaluator implementation |