rule

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxForEachIterations is the default limit for forEach array processing.
	// Prevents runaway iteration on large arrays. Can be overridden via configuration.
	DefaultMaxForEachIterations = 100
)

Variables

View Source
var ErrMalformedPayload = errors.New("malformed payload")

ErrMalformedPayload indicates the message payload cannot be parsed and retrying will not help.

Functions

func DebounceKey

func DebounceKey(triggerSubject, actionSubject string) string

DebounceKey builds the debounce map key from trigger and action subjects.

func ExtractJSONValue

func ExtractJSONValue(data interface{}, path []string) interface{}

ExtractJSONValue is a package-level convenience function that returns nil on error

func ExtractVariable

func ExtractVariable(template string) string

ExtractVariable extracts the variable name from a template string Examples:

"{temperature}" -> "temperature"
"{@time.hour}" -> "@time.hour"
"{@kv.config.sensor:max}" -> "@kv.config.sensor:max"
"temperature" -> "" (not a template)
"{}" -> "" (malformed)

Exported for use in validation (loader.go)

func HasJSONPath

func HasJSONPath(data interface{}, path []string) bool

HasJSONPath is a package-level convenience function to check path existence

func IsTemplate

func IsTemplate(s string) bool

IsTemplate checks if a string looks like a template variable: "{something}" Simple check - if it contains braces, we treat it as a template Exported for use in validation (loader.go)

func SplitPathRespectingBraces

func SplitPathRespectingBraces(path string) ([]string, error)

SplitPathRespectingBraces splits a path on dots while treating {...} as atomic tokens This is critical for KV field parsing where variables can contain dots.

Examples:

  • "profile.tier" → ["profile", "tier"]
  • "{user.name}" → ["{user.name}"]
  • "profile.{user.name}.tier" → ["profile", "{user.name}", "tier"]
  • "{outer.{inner}.x}" → ["{outer.{inner}.x}"] (nested braces preserved)

Returns error for unmatched braces.

func TraverseJSONPath

func TraverseJSONPath(data interface{}, path []string) (interface{}, error)

TraverseJSONPath is a package-level convenience function using the default traverser This is what most code will use

func TraverseJSONPathString

func TraverseJSONPathString(data interface{}, pathStr string) (interface{}, error)

TraverseJSONPathString is a package-level convenience function for string paths

func ValidatePattern

func ValidatePattern(pattern string) error

ValidatePattern validates NATS pattern syntax

Types

type Action

type Action struct {
	NATS     *NATSAction `json:"nats,omitempty" yaml:"nats,omitempty"`
	HTTP     *HTTPAction `json:"http,omitempty" yaml:"http,omitempty"`
	RuleName string      `json:"-" yaml:"-"`
	TraceID  string      `json:"-" yaml:"-"`
}

Action defines what happens when rule matches (NATS or HTTP)

type CompiledPattern

type CompiledPattern struct {
	// contains filtered or unexported fields
}

CompiledPattern represents a pre-compiled pattern for efficient matching

type Condition

type Condition struct {
	Field    string      `json:"field" yaml:"field"`                     // Template: "{temperature}" or "{@time.hour}"
	Operator string      `json:"operator" yaml:"operator"`               // eq, gt, contains, etc.
	Value    interface{} `json:"value,omitempty" yaml:"value,omitempty"` // Literal or template: 30 or "{@kv.config:max_temp}"

	// For array operators (any/all/none) - nested conditions to evaluate against array elements
	Conditions *Conditions `json:"conditions,omitempty" yaml:"conditions,omitempty"`
}

Condition represents a single evaluation criterion with template-based variable resolution

Both Field and Value now support template syntax for dynamic comparisons:

Field Examples:

  • Message field: "{temperature}"
  • Nested field: "{sensor.reading.value}"
  • System time: "{@time.hour}"
  • Subject token: "{@subject.1}"
  • KV lookup: "{@kv.sensor_config.{sensor_id}:max_temp}"

Value Examples:

  • Literal: 30
  • Literal string: "active"
  • Message field: "{threshold}"
  • KV lookup: "{@kv.config.global:max_temperature}"
  • System variable: "{@time.hour}"

This enables powerful variable-to-variable comparisons:

field: "{temperature}"
operator: gt
value: "{@kv.sensor_config.{sensor_id}:max_temp}"

Type preservation:

  • Numbers remain numbers for accurate numeric comparison
  • Strings remain strings
  • Booleans remain booleans
  • Type coercion is performed automatically when needed

type Conditions

type Conditions struct {
	Operator string       `json:"operator" yaml:"operator"` // "and" or "or"
	Items    []Condition  `json:"items" yaml:"items"`
	Groups   []Conditions `json:"groups,omitempty" yaml:"groups,omitempty"` // For nested condition groups
}

Conditions represents a group of conditions with a logical operator

type Debouncer

type Debouncer struct {
	// contains filtered or unexported fields
}

Debouncer tracks per-rule last-fired timestamps to suppress rapid re-fires. Thread-safe via sync.Map for concurrent worker access. State is in-memory only — resets on restart (first message always fires).

func NewDebouncer

func NewDebouncer() *Debouncer

func (*Debouncer) ShouldFire

func (d *Debouncer) ShouldFire(key string, duration time.Duration) bool

ShouldFire returns true if the debounce window has expired (or no prior fire exists). Uses LoadOrStore for the initial case and CompareAndSwap at the window boundary so exactly one concurrent caller fires when the window expires.

type ElementError

type ElementError struct {
	Index     int
	ErrorType string
	Error     error
}

ElementError tracks individual element processing failures

type EvaluationContext

type EvaluationContext struct {
	// Message data
	Msg        map[string]interface{} // CURRENT context (root message OR array element during forEach)
	RawPayload []byte
	Headers    map[string]string

	// Original message reference for @msg prefix
	// ALWAYS points to root message, even when Msg points to array element
	OriginalMsg map[string]interface{}

	// Context (NATS or HTTP, one will be nil)
	Subject *SubjectContext
	HTTP    *HTTPRequestContext

	// Shared contexts
	Time *TimeContext
	KV   *KVContext
	// contains filtered or unexported fields
}

EvaluationContext provides all data needed for condition evaluation and template processing Supports both NATS and HTTP contexts, and now includes support for forEach array iteration

func NewEvaluationContext

func NewEvaluationContext(
	payload []byte,
	headers map[string]string,
	subjectCtx *SubjectContext,
	httpCtx *HTTPRequestContext,
	timeCtx *TimeContext,
	kvCtx *KVContext,
	sigVerification *SignatureVerification,
	logger *slog.Logger,
) (*EvaluationContext, error)

NewEvaluationContext creates a new evaluation context Either subjectCtx OR httpCtx should be provided (not both)

func (*EvaluationContext) ResolveValue

func (c *EvaluationContext) ResolveValue(path string) (interface{}, bool)

ResolveValue resolves a field value from the context Supports message fields, system fields (@subject, @path, @header, @time, @kv, @signature) Also supports @msg prefix for explicit root message access during forEach

func (*EvaluationContext) WithElement

func (c *EvaluationContext) WithElement(element map[string]interface{}) *EvaluationContext

WithElement creates a child context for processing an array element. The new context has Msg set to the element while preserving OriginalMsg for @msg access to the root message. All other fields are inherited.

type Evaluator

type Evaluator struct {
	// contains filtered or unexported fields
}

Evaluator processes rule conditions against an EvaluationContext

func NewEvaluator

func NewEvaluator(log *slog.Logger) *Evaluator

NewEvaluator creates a new Evaluator

func (*Evaluator) Evaluate

func (e *Evaluator) Evaluate(conditions *Conditions, context *EvaluationContext) bool

Evaluate checks if a message satisfies the conditions

func (*Evaluator) SetMetrics

func (e *Evaluator) SetMetrics(m *metrics.Metrics)

SetMetrics sets the metrics collector (optional)

type ForEachResult

type ForEachResult struct {
	TotalElements     int
	ProcessedElements int
	FilteredElements  int
	FailedElements    int
	Errors            []ElementError
}

ForEachResult tracks detailed forEach processing statistics

type HTTPAction

type HTTPAction struct {
	URL         string            `json:"url" yaml:"url"`
	Method      string            `json:"method" yaml:"method"`
	Payload     string            `json:"payload,omitempty" yaml:"payload,omitempty"`
	Passthrough bool              `json:"passthrough,omitempty" yaml:"passthrough,omitempty"`
	Headers     map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
	RawPayload  []byte            `json:"-" yaml:"-"` // Populated during processing
	Retry       *RetryConfig      `json:"retry,omitempty" yaml:"retry,omitempty"`

	// Array iteration fields for forEach functionality
	// ForEach must use template syntax: "{arrayField}" or "{nested.array}" or "{@items}"
	ForEach string      `json:"forEach,omitempty" yaml:"forEach,omitempty"`
	Filter  *Conditions `json:"filter,omitempty" yaml:"filter,omitempty"`
}

HTTPAction represents making an HTTP request

type HTTPRequestContext

type HTTPRequestContext struct {
	Path       string   // Full path: "/webhooks/github/pr"
	PathTokens []string // Tokens: ["webhooks", "github", "pr"]
	Method     string   // HTTP method: "POST", "GET", etc.
	Count      int      // Token count
}

HTTPRequestContext provides HTTP-specific context for rule evaluation

func NewHTTPRequestContext

func NewHTTPRequestContext(path, method string) *HTTPRequestContext

NewHTTPRequestContext creates an HTTPRequestContext from path and method

func (*HTTPRequestContext) GetField

func (hc *HTTPRequestContext) GetField(fieldName string) (interface{}, bool)

GetField retrieves an HTTP field for template/condition processing Supports: "@path", "@path.0", "@path.1", "@path.count", "@method"

func (*HTTPRequestContext) GetToken

func (hc *HTTPRequestContext) GetToken(index int) string

GetToken safely retrieves a path token by index

type HTTPTrigger

type HTTPTrigger struct {
	Path   string `json:"path" yaml:"path"`
	Method string `json:"method,omitempty" yaml:"method,omitempty"` // Optional, defaults to all methods
}

HTTPTrigger represents an HTTP endpoint-based trigger

type IndexStats

type IndexStats struct {
	// contains filtered or unexported fields
}

IndexStats tracks index performance metrics

type JSONPathTraverser

type JSONPathTraverser struct {
}

JSONPathTraverser provides unified JSON path traversal for message fields, KV values, and any other JSON data Supports both object property access and array index access using dot notation Examples:

  • "user.name" → object property access
  • "readings.0.value" → array index access
  • "data.sensors.2.readings.5.timestamp" → mixed nested access

func NewJSONPathTraverser

func NewJSONPathTraverser() *JSONPathTraverser

NewJSONPathTraverser creates a new path traverser instance

func (*JSONPathTraverser) ExtractValue

func (t *JSONPathTraverser) ExtractValue(data interface{}, path []string) interface{}

ExtractValue is a convenience function for simple path extraction Returns nil if the path doesn't exist (no error)

func (*JSONPathTraverser) HasPath

func (t *JSONPathTraverser) HasPath(data interface{}, path []string) bool

HasPath checks if a path exists in the data structure

func (*JSONPathTraverser) TraversePath

func (t *JSONPathTraverser) TraversePath(data interface{}, path []string) (interface{}, error)

TraversePath navigates through a JSON structure using a dot-separated path Supports both map[string]interface{} for objects and []interface{} for arrays Returns the value at the path and an error if the path is invalid

func (*JSONPathTraverser) TraversePathString

func (t *JSONPathTraverser) TraversePathString(data interface{}, pathStr string) (interface{}, error)

TraversePathString is a convenience method that takes a dot-separated string path Example: "user.profile.email" or "readings.0.value"

type KVContext

type KVContext struct {
	// contains filtered or unexported fields
}

KVContext provides access to NATS Key-Value stores for rule evaluation and templating Now includes local cache support for improved performance

func NewKVContext

func NewKVContext(stores map[string]jetstream.KeyValue, logger *slog.Logger, localCache *LocalKVCache) *KVContext

NewKVContext creates a new KV context with the provided KV stores and optional local cache

func (*KVContext) GetAllBuckets

func (kv *KVContext) GetAllBuckets() []string

GetAllBuckets returns the names of all configured KV buckets

func (*KVContext) GetField

func (kv *KVContext) GetField(field string) (interface{}, bool)

GetField retrieves a value from KV store (cache first, then NATS KV fallback) Supports format: "@kv.bucket_name.key_name[:json.path.to.field]" The colon (:) delimiter is now optional. Returns the value and whether it was found successfully

func (*KVContext) GetFieldWithContext

func (kv *KVContext) GetFieldWithContext(field string, msgData map[string]interface{}, timeCtx *TimeContext, subjectCtx *SubjectContext) (interface{}, bool)

GetFieldWithContext retrieves a value with variable substitution support Now properly handles missing variables by returning empty strings

func (*KVContext) GetStats

func (kv *KVContext) GetStats() map[string]interface{}

GetStats returns basic statistics about the KV context

func (*KVContext) HasBucket

func (kv *KVContext) HasBucket(bucketName string) bool

HasBucket checks if a specific bucket is configured

type LocalKVCache

type LocalKVCache struct {
	// contains filtered or unexported fields
}

LocalKVCache provides in-memory caching for NATS KV buckets Uses simple map structure for fast access and easy debugging

func NewLocalKVCache

func NewLocalKVCache(logger *slog.Logger) *LocalKVCache

NewLocalKVCache creates a new local KV cache instance

func (*LocalKVCache) Clear

func (c *LocalKVCache) Clear()

Clear removes all entries from the cache Useful for testing or manual cache invalidation

func (*LocalKVCache) Delete

func (c *LocalKVCache) Delete(bucket, key string)

Delete removes a value from the local cache Used when KV stream indicates a key was deleted

func (*LocalKVCache) Get

func (c *LocalKVCache) Get(bucket, key string) (interface{}, bool)

Get retrieves a value from the local cache Returns the value and whether it was found

func (*LocalKVCache) GetAllBuckets

func (c *LocalKVCache) GetAllBuckets() []string

GetAllBuckets returns all bucket names in the cache

func (*LocalKVCache) GetAllKeys

func (c *LocalKVCache) GetAllKeys(bucket string) []string

GetAllKeys returns all keys for a specific bucket Useful for debugging and monitoring

func (*LocalKVCache) GetStats

func (c *LocalKVCache) GetStats() map[string]interface{}

GetStats returns cache statistics for monitoring

func (*LocalKVCache) IsEnabled

func (c *LocalKVCache) IsEnabled() bool

IsEnabled returns whether the cache is currently enabled

func (*LocalKVCache) Set

func (c *LocalKVCache) Set(bucket, key string, value interface{})

Set stores a value in the local cache Value should be the parsed JSON object, not raw bytes

func (*LocalKVCache) SetEnabled

func (c *LocalKVCache) SetEnabled(enabled bool)

SetEnabled enables or disables the cache Useful for runtime configuration or troubleshooting

type MockTimeProvider

type MockTimeProvider struct {
	// contains filtered or unexported fields
}

MockTimeProvider for testing with fixed time

func NewMockTimeProvider

func NewMockTimeProvider(fixedTime time.Time) *MockTimeProvider

NewMockTimeProvider creates a mock provider with fixed time

func (*MockTimeProvider) GetContextAt

func (m *MockTimeProvider) GetContextAt(t time.Time) *TimeContext

GetContextAt returns context for specified time

func (*MockTimeProvider) GetCurrentContext

func (m *MockTimeProvider) GetCurrentContext() *TimeContext

GetCurrentContext returns context for the fixed time

func (*MockTimeProvider) SetTime

func (m *MockTimeProvider) SetTime(t time.Time)

SetTime updates the fixed time

type NATSAction

type NATSAction struct {
	Subject     string            `json:"subject" yaml:"subject"`
	Mode        string            `json:"mode,omitempty" yaml:"mode,omitempty"`
	Payload     string            `json:"payload,omitempty" yaml:"payload,omitempty"`
	Passthrough bool              `json:"passthrough,omitempty" yaml:"passthrough,omitempty"`
	Headers     map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
	RawPayload  []byte            `json:"-" yaml:"-"` // Populated during processing

	// Array iteration fields for forEach functionality
	// ForEach must use template syntax: "{arrayField}" or "{nested.array}" or "{@items}"
	ForEach string      `json:"forEach,omitempty" yaml:"forEach,omitempty"`
	Filter  *Conditions `json:"filter,omitempty" yaml:"filter,omitempty"`
}

NATSAction represents publishing to a NATS subject

type NATSTrigger

type NATSTrigger struct {
	Subject string `json:"subject" yaml:"subject"`
}

NATSTrigger represents a NATS subject-based trigger

type PatternCache

type PatternCache struct {
	// contains filtered or unexported fields
}

PatternCache provides optional caching for frequently matched subjects

func NewPatternCache

func NewPatternCache(maxSize int) *PatternCache

NewPatternCache creates a new pattern cache

func (*PatternCache) Clear

func (pc *PatternCache) Clear()

Clear empties the cache

func (*PatternCache) Get

func (pc *PatternCache) Get(subject string) (bool, bool)

Get retrieves a cached result

func (*PatternCache) Set

func (pc *PatternCache) Set(subject string, matches bool)

Set stores a result in the cache

func (*PatternCache) Stats

func (pc *PatternCache) Stats() map[string]int

Stats returns cache statistics

type PatternMatcher

type PatternMatcher struct {
	// contains filtered or unexported fields
}

PatternMatcher handles NATS-style subject pattern matching

func NewPatternMatcher

func NewPatternMatcher(pattern string) (*PatternMatcher, error)

NewPatternMatcher creates a new pattern matcher for the given pattern

func (*PatternMatcher) GetPattern

func (pm *PatternMatcher) GetPattern() string

GetPattern returns the original pattern string

func (*PatternMatcher) IsPattern

func (pm *PatternMatcher) IsPattern() bool

IsPattern returns true if this contains wildcards

func (*PatternMatcher) Match

func (pm *PatternMatcher) Match(subject string) bool

Match checks if the given subject matches this pattern

type PatternRule

type PatternRule struct {
	Rule    *Rule
	Matcher *PatternMatcher
}

PatternRule wraps a rule with its compiled pattern matcher

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(log *slog.Logger, metrics *metrics.Metrics, kvCtx *KVContext, sigVerification *SignatureVerification) *Processor

NewProcessor creates a new processor with optional signature verification

func (*Processor) GetStats

func (p *Processor) GetStats() ProcessorStats

GetStats returns processor statistics

func (*Processor) GetSubjects

func (p *Processor) GetSubjects() []string

GetSubjects returns all NATS subjects for subscription setup

func (*Processor) LoadRules

func (p *Processor) LoadRules(rules []Rule) error

LoadRules is used by CLI tooling (lint, test, check) to load rules from files. In serve mode, rules arrive via KV Watch which calls ReplaceRules/ReplaceHTTPRules directly.

func (*Processor) ProcessForSubscription

func (p *Processor) ProcessForSubscription(triggerSubject, messageSubject string, payload []byte, headers map[string]string) ([]*Action, error)

ProcessForSubscription processes a message using O(1) lookup by trigger subject. triggerSubject is the trigger pattern (e.g., "sensors.tank.>") used for rule lookup. messageSubject is the actual message subject (e.g., "sensors.tank.001") used for template variables. Falls back to ProcessNATS (pattern matching) if no KV rules are loaded.

func (*Processor) ProcessHTTP

func (p *Processor) ProcessHTTP(path, method string, payload []byte, headers map[string]string) ([]*Action, error)

ProcessHTTP processes an HTTP request through the rule engine

func (*Processor) ProcessNATS

func (p *Processor) ProcessNATS(subject string, payload []byte, headers map[string]string) ([]*Action, error)

ProcessNATS processes a NATS message through the rule engine

func (*Processor) ProcessWithSubject

func (p *Processor) ProcessWithSubject(subject string, payload []byte, headers map[string]string) ([]*Action, error)

ProcessWithSubject is kept for backward compatibility. Used by internal/broker/subscription.go.

func (*Processor) ReplaceHTTPRules

func (p *Processor) ReplaceHTTPRules(rules map[string][]*Rule)

ReplaceHTTPRules atomically swaps the KV-loaded HTTP rule set. Called by RuleKVManager when KV Watch detects changes. The map is keyed by HTTP path (e.g., "/webhook/github").

func (*Processor) ReplaceRules

func (p *Processor) ReplaceRules(rules map[string][]*Rule)

ReplaceRules atomically swaps the KV-loaded rule set. Called by RuleKVManager when KV Watch detects changes. The map is keyed by trigger subject (e.g., "sensors.tank.>").

func (*Processor) SetMaxForEachIterations

func (p *Processor) SetMaxForEachIterations(max int)

SetMaxForEachIterations sets the maximum allowed forEach iterations

func (*Processor) SetTimeProvider

func (p *Processor) SetTimeProvider(provider TimeProvider)

SetTimeProvider allows injecting a mock time provider for testing

type ProcessorStats

type ProcessorStats struct {
	Processed uint64
	Matched   uint64
	Errors    uint64
}

type RetryConfig

type RetryConfig struct {
	MaxAttempts  int    `json:"maxAttempts" yaml:"maxAttempts"`
	InitialDelay string `json:"initialDelay" yaml:"initialDelay"` // e.g., "1s"
	MaxDelay     string `json:"maxDelay" yaml:"maxDelay"`         // e.g., "30s"
}

RetryConfig defines retry behavior for HTTP actions

type Rule

type Rule struct {
	Name             string        `json:"name,omitempty" yaml:"name,omitempty"`
	Trigger          Trigger       `json:"trigger" yaml:"trigger"`
	Conditions       *Conditions   `json:"conditions,omitempty" yaml:"conditions,omitempty"`
	Action           Action        `json:"action" yaml:"action"`
	Debounce         string        `json:"debounce,omitempty" yaml:"debounce,omitempty"`
	DebounceDuration time.Duration `json:"-" yaml:"-"`
}

Rule represents a generic rule with trigger and action

func ParseYAML

func ParseYAML(data []byte) ([]Rule, error)

ParseYAML parses a YAML byte slice into a slice of rules. Used by RuleKVManager to parse KV values. The format is identical to file-based rules (trigger + conditions + action).

func (*Rule) RuleName

func (r *Rule) RuleName() string

RuleName returns a human-readable identifier for the rule. Prefers explicit Name, falls back to trigger subject (NATS) or path (HTTP), then "unknown".

type RuleIndex

type RuleIndex struct {
	// contains filtered or unexported fields
}

RuleIndex provides efficient rule lookup for NATS subjects Supports both exact matches (O(1)) and wildcard patterns (O(n))

func NewRuleIndex

func NewRuleIndex(log *slog.Logger) *RuleIndex

NewRuleIndex creates a new rule index

func (*RuleIndex) Add

func (idx *RuleIndex) Add(rule *Rule)

Add indexes a NATS-triggered rule for efficient lookup HTTP-triggered rules are ignored (they're stored separately in Processor)

func (*RuleIndex) Clear

func (idx *RuleIndex) Clear()

Clear removes all indexed rules

func (*RuleIndex) FindAllMatching

func (idx *RuleIndex) FindAllMatching(subject string) []*Rule

FindAllMatching returns all rules that match the given NATS subject First checks exact matches (O(1)), then pattern matches (O(n))

func (*RuleIndex) GetRuleCounts

func (idx *RuleIndex) GetRuleCounts() (exactCount, patternCount int)

GetRuleCounts returns the number of exact and pattern rules

func (*RuleIndex) GetStats

func (idx *RuleIndex) GetStats() IndexStats

GetStats returns index statistics

func (*RuleIndex) GetSubjects

func (idx *RuleIndex) GetSubjects() []string

GetSubjects returns all unique NATS subjects (both exact and patterns)

func (*RuleIndex) GetSubscriptionSubjects

func (idx *RuleIndex) GetSubscriptionSubjects() []string

GetSubscriptionSubjects returns subjects that should be used for JetStream subscriptions Converts wildcard patterns to NATS subscription format

type RulesLoader

type RulesLoader struct {
	// contains filtered or unexported fields
}

RulesLoader handles loading and validating rule definitions from YAML files

func NewRulesLoader

func NewRulesLoader(log *slog.Logger, kvBuckets []string) *RulesLoader

NewRulesLoader creates a new rules loader

func (*RulesLoader) ExpandEnvironmentVariables

func (l *RulesLoader) ExpandEnvironmentVariables(rule *Rule)

ExpandEnvironmentVariables expands ${VAR_NAME} placeholders in a rule. Public wrapper for KV-loaded rules that need env var expansion outside the file-based loading path.

func (*RulesLoader) LoadFromDirectory

func (l *RulesLoader) LoadFromDirectory(dirPath string) ([]Rule, error)

LoadFromDirectory loads all .yaml and .yml files from a directory, recursively, while skipping any directories with a "_test" suffix.

func (*RulesLoader) LoadFromFile

func (l *RulesLoader) LoadFromFile(filePath string) ([]Rule, error)

LoadFromFile loads rules from a single YAML file

func (*RulesLoader) ValidateRule

func (l *RulesLoader) ValidateRule(rule *Rule) error

ValidateRule validates a single rule (trigger + conditions + action). Public wrapper for KV-loaded rules that need validation outside the file-based loading path.

type SignatureVerification

type SignatureVerification struct {
	Enabled         bool
	PublicKeyHeader string
	SignatureHeader string
}

SignatureVerification holds configuration for signature verification. This is a lightweight struct passed to EvaluationContext instead of the full config.

func NewSignatureVerification

func NewSignatureVerification(enabled bool, pubKeyHeader, sigHeader string) *SignatureVerification

NewSignatureVerification creates a SignatureVerification from config values

type SubjectContext

type SubjectContext struct {
	Full   string   `json:"full"`   // Full subject: "sensors.temperature.room1"
	Tokens []string `json:"tokens"` // Split tokens: ["sensors", "temperature", "room1"]
	Count  int      `json:"count"`  // Token count: 3
}

SubjectContext provides access to NATS subject information for templates and conditions

func NewSubjectContext

func NewSubjectContext(subject string) *SubjectContext

NewSubjectContext creates a SubjectContext from a NATS subject string

func (*SubjectContext) GetField

func (sc *SubjectContext) GetField(fieldName string) (interface{}, bool)

GetField retrieves a subject field for template/condition processing Supports: "@subject", "@subject.0", "@subject.1", "@subject.count"

func (*SubjectContext) GetToken

func (sc *SubjectContext) GetToken(index int) string

GetToken safely retrieves a token by index, returns empty string if out of bounds

type SystemTimeProvider

type SystemTimeProvider struct{}

SystemTimeProvider uses system time

func NewSystemTimeProvider

func NewSystemTimeProvider() *SystemTimeProvider

NewSystemTimeProvider creates a new system time provider

func (*SystemTimeProvider) GetContextAt

func (p *SystemTimeProvider) GetContextAt(t time.Time) *TimeContext

GetContextAt returns time context for specific time (useful for testing)

func (*SystemTimeProvider) GetCurrentContext

func (p *SystemTimeProvider) GetCurrentContext() *TimeContext

GetCurrentContext returns time context for current moment

type TemplateEngine

type TemplateEngine struct {
	// contains filtered or unexported fields
}

TemplateEngine processes rule template strings.

func NewTemplateEngine

func NewTemplateEngine(log *slog.Logger) *TemplateEngine

NewTemplateEngine creates a new TemplateEngine.

func (*TemplateEngine) Execute

func (te *TemplateEngine) Execute(template string, context *EvaluationContext) (string, error)

Execute renders a template string using the provided context. Optimized: Uses a single-pass recursive scanner instead of Regex.

type TimeContext

type TimeContext struct {
	// contains filtered or unexported fields
}

TimeContext holds current time information and pre-computed fields

func (*TimeContext) GetAllFieldNames

func (tc *TimeContext) GetAllFieldNames() []string

GetAllFieldNames returns list of all available time field names

func (*TimeContext) GetField

func (tc *TimeContext) GetField(fieldName string) (interface{}, bool)

GetField safely retrieves a time field value

type TimeProvider

type TimeProvider interface {
	GetCurrentContext() *TimeContext
	GetContextAt(time.Time) *TimeContext
}

TimeProvider interface for creating time contexts (enables mocking)

type Trigger

type Trigger struct {
	NATS *NATSTrigger `json:"nats,omitempty" yaml:"nats,omitempty"`
	HTTP *HTTPTrigger `json:"http,omitempty" yaml:"http,omitempty"`
}

Trigger defines what initiates rule evaluation (NATS or HTTP)

type WildcardType

type WildcardType int

WildcardType represents the type of token in a pattern

const (
	ExactToken     WildcardType = iota
	SingleWildcard              // *
	GreedyWildcard              // >
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL