Documentation
¶
Index ¶
- Constants
- Variables
- func DebounceKey(triggerSubject, actionSubject string) string
- func ExtractJSONValue(data interface{}, path []string) interface{}
- func ExtractVariable(template string) string
- func HasJSONPath(data interface{}, path []string) bool
- func IsTemplate(s string) bool
- func SplitPathRespectingBraces(path string) ([]string, error)
- func TraverseJSONPath(data interface{}, path []string) (interface{}, error)
- func TraverseJSONPathString(data interface{}, pathStr string) (interface{}, error)
- func ValidatePattern(pattern string) error
- type Action
- type CompiledPattern
- type Condition
- type Conditions
- type Debouncer
- type ElementError
- type EvaluationContext
- type Evaluator
- type ForEachResult
- type HTTPAction
- type HTTPRequestContext
- type HTTPTrigger
- type IndexStats
- type JSONPathTraverser
- func (t *JSONPathTraverser) ExtractValue(data interface{}, path []string) interface{}
- func (t *JSONPathTraverser) HasPath(data interface{}, path []string) bool
- func (t *JSONPathTraverser) TraversePath(data interface{}, path []string) (interface{}, error)
- func (t *JSONPathTraverser) TraversePathString(data interface{}, pathStr string) (interface{}, error)
- type KVContext
- func (kv *KVContext) GetAllBuckets() []string
- func (kv *KVContext) GetField(field string) (interface{}, bool)
- func (kv *KVContext) GetFieldWithContext(field string, msgData map[string]interface{}, timeCtx *TimeContext, ...) (interface{}, bool)
- func (kv *KVContext) GetStats() map[string]interface{}
- func (kv *KVContext) HasBucket(bucketName string) bool
- type LocalKVCache
- func (c *LocalKVCache) Clear()
- func (c *LocalKVCache) Delete(bucket, key string)
- func (c *LocalKVCache) Get(bucket, key string) (interface{}, bool)
- func (c *LocalKVCache) GetAllBuckets() []string
- func (c *LocalKVCache) GetAllKeys(bucket string) []string
- func (c *LocalKVCache) GetStats() map[string]interface{}
- func (c *LocalKVCache) IsEnabled() bool
- func (c *LocalKVCache) Set(bucket, key string, value interface{})
- func (c *LocalKVCache) SetEnabled(enabled bool)
- type MockTimeProvider
- type NATSAction
- type NATSTrigger
- type PatternCache
- type PatternMatcher
- type PatternRule
- type Processor
- func (p *Processor) GetStats() ProcessorStats
- func (p *Processor) GetSubjects() []string
- func (p *Processor) LoadRules(rules []Rule) error
- func (p *Processor) ProcessForSubscription(triggerSubject, messageSubject string, payload []byte, ...) ([]*Action, error)
- func (p *Processor) ProcessHTTP(path, method string, payload []byte, headers map[string]string) ([]*Action, error)
- func (p *Processor) ProcessNATS(subject string, payload []byte, headers map[string]string) ([]*Action, error)
- func (p *Processor) ProcessWithSubject(subject string, payload []byte, headers map[string]string) ([]*Action, error)
- func (p *Processor) ReplaceHTTPRules(rules map[string][]*Rule)
- func (p *Processor) ReplaceRules(rules map[string][]*Rule)
- func (p *Processor) SetMaxForEachIterations(max int)
- func (p *Processor) SetTimeProvider(provider TimeProvider)
- type ProcessorStats
- type RetryConfig
- type Rule
- type RuleIndex
- func (idx *RuleIndex) Add(rule *Rule)
- func (idx *RuleIndex) Clear()
- func (idx *RuleIndex) FindAllMatching(subject string) []*Rule
- func (idx *RuleIndex) GetRuleCounts() (exactCount, patternCount int)
- func (idx *RuleIndex) GetStats() IndexStats
- func (idx *RuleIndex) GetSubjects() []string
- func (idx *RuleIndex) GetSubscriptionSubjects() []string
- type RulesLoader
- type SignatureVerification
- type SubjectContext
- type SystemTimeProvider
- type TemplateEngine
- type TimeContext
- type TimeProvider
- type Trigger
- type WildcardType
Constants ¶
const ( // DefaultMaxForEachIterations is the default limit for forEach array processing. // Prevents runaway iteration on large arrays. Can be overridden via configuration. DefaultMaxForEachIterations = 100 )
Variables ¶
var ErrMalformedPayload = errors.New("malformed payload")
ErrMalformedPayload indicates the message payload cannot be parsed and retrying will not help.
Functions ¶
func DebounceKey ¶
DebounceKey builds the debounce map key from trigger and action subjects.
func ExtractJSONValue ¶
func ExtractJSONValue(data interface{}, path []string) interface{}
ExtractJSONValue is a package-level convenience function that returns nil on error
func ExtractVariable ¶
ExtractVariable extracts the variable name from a template string Examples:
"{temperature}" -> "temperature"
"{@time.hour}" -> "@time.hour"
"{@kv.config.sensor:max}" -> "@kv.config.sensor:max"
"temperature" -> "" (not a template)
"{}" -> "" (malformed)
Exported for use in validation (loader.go)
func HasJSONPath ¶
HasJSONPath is a package-level convenience function to check path existence
func IsTemplate ¶
IsTemplate checks if a string looks like a template variable: "{something}" Simple check - if it contains braces, we treat it as a template Exported for use in validation (loader.go)
func SplitPathRespectingBraces ¶
SplitPathRespectingBraces splits a path on dots while treating {...} as atomic tokens This is critical for KV field parsing where variables can contain dots.
Examples:
- "profile.tier" → ["profile", "tier"]
- "{user.name}" → ["{user.name}"]
- "profile.{user.name}.tier" → ["profile", "{user.name}", "tier"]
- "{outer.{inner}.x}" → ["{outer.{inner}.x}"] (nested braces preserved)
Returns error for unmatched braces.
func TraverseJSONPath ¶
TraverseJSONPath is a package-level convenience function using the default traverser This is what most code will use
func TraverseJSONPathString ¶
TraverseJSONPathString is a package-level convenience function for string paths
func ValidatePattern ¶
ValidatePattern validates NATS pattern syntax
Types ¶
type Action ¶
type Action struct {
NATS *NATSAction `json:"nats,omitempty" yaml:"nats,omitempty"`
HTTP *HTTPAction `json:"http,omitempty" yaml:"http,omitempty"`
RuleName string `json:"-" yaml:"-"`
TraceID string `json:"-" yaml:"-"`
}
Action defines what happens when rule matches (NATS or HTTP)
type CompiledPattern ¶
type CompiledPattern struct {
// contains filtered or unexported fields
}
CompiledPattern represents a pre-compiled pattern for efficient matching
type Condition ¶
type Condition struct {
Field string `json:"field" yaml:"field"` // Template: "{temperature}" or "{@time.hour}"
Operator string `json:"operator" yaml:"operator"` // eq, gt, contains, etc.
Value interface{} `json:"value,omitempty" yaml:"value,omitempty"` // Literal or template: 30 or "{@kv.config:max_temp}"
// For array operators (any/all/none) - nested conditions to evaluate against array elements
Conditions *Conditions `json:"conditions,omitempty" yaml:"conditions,omitempty"`
}
Condition represents a single evaluation criterion with template-based variable resolution
Both Field and Value now support template syntax for dynamic comparisons:
Field Examples:
- Message field: "{temperature}"
- Nested field: "{sensor.reading.value}"
- System time: "{@time.hour}"
- Subject token: "{@subject.1}"
- KV lookup: "{@kv.sensor_config.{sensor_id}:max_temp}"
Value Examples:
- Literal: 30
- Literal string: "active"
- Message field: "{threshold}"
- KV lookup: "{@kv.config.global:max_temperature}"
- System variable: "{@time.hour}"
This enables powerful variable-to-variable comparisons:
field: "{temperature}"
operator: gt
value: "{@kv.sensor_config.{sensor_id}:max_temp}"
Type preservation:
- Numbers remain numbers for accurate numeric comparison
- Strings remain strings
- Booleans remain booleans
- Type coercion is performed automatically when needed
type Conditions ¶
type Conditions struct {
Operator string `json:"operator" yaml:"operator"` // "and" or "or"
Items []Condition `json:"items" yaml:"items"`
Groups []Conditions `json:"groups,omitempty" yaml:"groups,omitempty"` // For nested condition groups
}
Conditions represents a group of conditions with a logical operator
type Debouncer ¶
type Debouncer struct {
// contains filtered or unexported fields
}
Debouncer tracks per-rule last-fired timestamps to suppress rapid re-fires. Thread-safe via sync.Map for concurrent worker access. State is in-memory only — resets on restart (first message always fires).
func NewDebouncer ¶
func NewDebouncer() *Debouncer
func (*Debouncer) ShouldFire ¶
ShouldFire returns true if the debounce window has expired (or no prior fire exists). Uses LoadOrStore for the initial case and CompareAndSwap at the window boundary so exactly one concurrent caller fires when the window expires.
type ElementError ¶
ElementError tracks individual element processing failures
type EvaluationContext ¶
type EvaluationContext struct {
// Message data
Msg map[string]interface{} // CURRENT context (root message OR array element during forEach)
RawPayload []byte
Headers map[string]string
// Original message reference for @msg prefix
// ALWAYS points to root message, even when Msg points to array element
OriginalMsg map[string]interface{}
// Context (NATS or HTTP, one will be nil)
Subject *SubjectContext
HTTP *HTTPRequestContext
// Shared contexts
Time *TimeContext
KV *KVContext
// contains filtered or unexported fields
}
EvaluationContext provides all data needed for condition evaluation and template processing Supports both NATS and HTTP contexts, and now includes support for forEach array iteration
func NewEvaluationContext ¶
func NewEvaluationContext( payload []byte, headers map[string]string, subjectCtx *SubjectContext, httpCtx *HTTPRequestContext, timeCtx *TimeContext, kvCtx *KVContext, sigVerification *SignatureVerification, logger *slog.Logger, ) (*EvaluationContext, error)
NewEvaluationContext creates a new evaluation context Either subjectCtx OR httpCtx should be provided (not both)
func (*EvaluationContext) ResolveValue ¶
func (c *EvaluationContext) ResolveValue(path string) (interface{}, bool)
ResolveValue resolves a field value from the context Supports message fields, system fields (@subject, @path, @header, @time, @kv, @signature) Also supports @msg prefix for explicit root message access during forEach
func (*EvaluationContext) WithElement ¶
func (c *EvaluationContext) WithElement(element map[string]interface{}) *EvaluationContext
WithElement creates a child context for processing an array element. The new context has Msg set to the element while preserving OriginalMsg for @msg access to the root message. All other fields are inherited.
type Evaluator ¶
type Evaluator struct {
// contains filtered or unexported fields
}
Evaluator processes rule conditions against an EvaluationContext
func NewEvaluator ¶
NewEvaluator creates a new Evaluator
func (*Evaluator) Evaluate ¶
func (e *Evaluator) Evaluate(conditions *Conditions, context *EvaluationContext) bool
Evaluate checks if a message satisfies the conditions
func (*Evaluator) SetMetrics ¶
SetMetrics sets the metrics collector (optional)
type ForEachResult ¶
type ForEachResult struct {
TotalElements int
ProcessedElements int
FilteredElements int
FailedElements int
Errors []ElementError
}
ForEachResult tracks detailed forEach processing statistics
type HTTPAction ¶
type HTTPAction struct {
URL string `json:"url" yaml:"url"`
Method string `json:"method" yaml:"method"`
Payload string `json:"payload,omitempty" yaml:"payload,omitempty"`
Passthrough bool `json:"passthrough,omitempty" yaml:"passthrough,omitempty"`
Headers map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
RawPayload []byte `json:"-" yaml:"-"` // Populated during processing
Retry *RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"`
// Array iteration fields for forEach functionality
// ForEach must use template syntax: "{arrayField}" or "{nested.array}" or "{@items}"
ForEach string `json:"forEach,omitempty" yaml:"forEach,omitempty"`
Filter *Conditions `json:"filter,omitempty" yaml:"filter,omitempty"`
}
HTTPAction represents making an HTTP request
type HTTPRequestContext ¶
type HTTPRequestContext struct {
Path string // Full path: "/webhooks/github/pr"
PathTokens []string // Tokens: ["webhooks", "github", "pr"]
Method string // HTTP method: "POST", "GET", etc.
Count int // Token count
}
HTTPRequestContext provides HTTP-specific context for rule evaluation
func NewHTTPRequestContext ¶
func NewHTTPRequestContext(path, method string) *HTTPRequestContext
NewHTTPRequestContext creates an HTTPRequestContext from path and method
func (*HTTPRequestContext) GetField ¶
func (hc *HTTPRequestContext) GetField(fieldName string) (interface{}, bool)
GetField retrieves an HTTP field for template/condition processing Supports: "@path", "@path.0", "@path.1", "@path.count", "@method"
func (*HTTPRequestContext) GetToken ¶
func (hc *HTTPRequestContext) GetToken(index int) string
GetToken safely retrieves a path token by index
type HTTPTrigger ¶
type HTTPTrigger struct {
Path string `json:"path" yaml:"path"`
Method string `json:"method,omitempty" yaml:"method,omitempty"` // Optional, defaults to all methods
}
HTTPTrigger represents an HTTP endpoint-based trigger
type IndexStats ¶
type IndexStats struct {
// contains filtered or unexported fields
}
IndexStats tracks index performance metrics
type JSONPathTraverser ¶
type JSONPathTraverser struct {
}
JSONPathTraverser provides unified JSON path traversal for message fields, KV values, and any other JSON data Supports both object property access and array index access using dot notation Examples:
- "user.name" → object property access
- "readings.0.value" → array index access
- "data.sensors.2.readings.5.timestamp" → mixed nested access
func NewJSONPathTraverser ¶
func NewJSONPathTraverser() *JSONPathTraverser
NewJSONPathTraverser creates a new path traverser instance
func (*JSONPathTraverser) ExtractValue ¶
func (t *JSONPathTraverser) ExtractValue(data interface{}, path []string) interface{}
ExtractValue is a convenience function for simple path extraction Returns nil if the path doesn't exist (no error)
func (*JSONPathTraverser) HasPath ¶
func (t *JSONPathTraverser) HasPath(data interface{}, path []string) bool
HasPath checks if a path exists in the data structure
func (*JSONPathTraverser) TraversePath ¶
func (t *JSONPathTraverser) TraversePath(data interface{}, path []string) (interface{}, error)
TraversePath navigates through a JSON structure using a dot-separated path Supports both map[string]interface{} for objects and []interface{} for arrays Returns the value at the path and an error if the path is invalid
func (*JSONPathTraverser) TraversePathString ¶
func (t *JSONPathTraverser) TraversePathString(data interface{}, pathStr string) (interface{}, error)
TraversePathString is a convenience method that takes a dot-separated string path Example: "user.profile.email" or "readings.0.value"
type KVContext ¶
type KVContext struct {
// contains filtered or unexported fields
}
KVContext provides access to NATS Key-Value stores for rule evaluation and templating Now includes local cache support for improved performance
func NewKVContext ¶
func NewKVContext(stores map[string]jetstream.KeyValue, logger *slog.Logger, localCache *LocalKVCache) *KVContext
NewKVContext creates a new KV context with the provided KV stores and optional local cache
func (*KVContext) GetAllBuckets ¶
GetAllBuckets returns the names of all configured KV buckets
func (*KVContext) GetField ¶
GetField retrieves a value from KV store (cache first, then NATS KV fallback) Supports format: "@kv.bucket_name.key_name[:json.path.to.field]" The colon (:) delimiter is now optional. Returns the value and whether it was found successfully
func (*KVContext) GetFieldWithContext ¶
func (kv *KVContext) GetFieldWithContext(field string, msgData map[string]interface{}, timeCtx *TimeContext, subjectCtx *SubjectContext) (interface{}, bool)
GetFieldWithContext retrieves a value with variable substitution support Now properly handles missing variables by returning empty strings
type LocalKVCache ¶
type LocalKVCache struct {
// contains filtered or unexported fields
}
LocalKVCache provides in-memory caching for NATS KV buckets Uses simple map structure for fast access and easy debugging
func NewLocalKVCache ¶
func NewLocalKVCache(logger *slog.Logger) *LocalKVCache
NewLocalKVCache creates a new local KV cache instance
func (*LocalKVCache) Clear ¶
func (c *LocalKVCache) Clear()
Clear removes all entries from the cache Useful for testing or manual cache invalidation
func (*LocalKVCache) Delete ¶
func (c *LocalKVCache) Delete(bucket, key string)
Delete removes a value from the local cache Used when KV stream indicates a key was deleted
func (*LocalKVCache) Get ¶
func (c *LocalKVCache) Get(bucket, key string) (interface{}, bool)
Get retrieves a value from the local cache Returns the value and whether it was found
func (*LocalKVCache) GetAllBuckets ¶
func (c *LocalKVCache) GetAllBuckets() []string
GetAllBuckets returns all bucket names in the cache
func (*LocalKVCache) GetAllKeys ¶
func (c *LocalKVCache) GetAllKeys(bucket string) []string
GetAllKeys returns all keys for a specific bucket Useful for debugging and monitoring
func (*LocalKVCache) GetStats ¶
func (c *LocalKVCache) GetStats() map[string]interface{}
GetStats returns cache statistics for monitoring
func (*LocalKVCache) IsEnabled ¶
func (c *LocalKVCache) IsEnabled() bool
IsEnabled returns whether the cache is currently enabled
func (*LocalKVCache) Set ¶
func (c *LocalKVCache) Set(bucket, key string, value interface{})
Set stores a value in the local cache Value should be the parsed JSON object, not raw bytes
func (*LocalKVCache) SetEnabled ¶
func (c *LocalKVCache) SetEnabled(enabled bool)
SetEnabled enables or disables the cache Useful for runtime configuration or troubleshooting
type MockTimeProvider ¶
type MockTimeProvider struct {
// contains filtered or unexported fields
}
MockTimeProvider for testing with fixed time
func NewMockTimeProvider ¶
func NewMockTimeProvider(fixedTime time.Time) *MockTimeProvider
NewMockTimeProvider creates a mock provider with fixed time
func (*MockTimeProvider) GetContextAt ¶
func (m *MockTimeProvider) GetContextAt(t time.Time) *TimeContext
GetContextAt returns context for specified time
func (*MockTimeProvider) GetCurrentContext ¶
func (m *MockTimeProvider) GetCurrentContext() *TimeContext
GetCurrentContext returns context for the fixed time
func (*MockTimeProvider) SetTime ¶
func (m *MockTimeProvider) SetTime(t time.Time)
SetTime updates the fixed time
type NATSAction ¶
type NATSAction struct {
Subject string `json:"subject" yaml:"subject"`
Mode string `json:"mode,omitempty" yaml:"mode,omitempty"`
Payload string `json:"payload,omitempty" yaml:"payload,omitempty"`
Passthrough bool `json:"passthrough,omitempty" yaml:"passthrough,omitempty"`
Headers map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
RawPayload []byte `json:"-" yaml:"-"` // Populated during processing
// Array iteration fields for forEach functionality
// ForEach must use template syntax: "{arrayField}" or "{nested.array}" or "{@items}"
ForEach string `json:"forEach,omitempty" yaml:"forEach,omitempty"`
Filter *Conditions `json:"filter,omitempty" yaml:"filter,omitempty"`
}
NATSAction represents publishing to a NATS subject
type NATSTrigger ¶
type NATSTrigger struct {
Subject string `json:"subject" yaml:"subject"`
}
NATSTrigger represents a NATS subject-based trigger
type PatternCache ¶
type PatternCache struct {
// contains filtered or unexported fields
}
PatternCache provides optional caching for frequently matched subjects
func NewPatternCache ¶
func NewPatternCache(maxSize int) *PatternCache
NewPatternCache creates a new pattern cache
func (*PatternCache) Get ¶
func (pc *PatternCache) Get(subject string) (bool, bool)
Get retrieves a cached result
func (*PatternCache) Set ¶
func (pc *PatternCache) Set(subject string, matches bool)
Set stores a result in the cache
func (*PatternCache) Stats ¶
func (pc *PatternCache) Stats() map[string]int
Stats returns cache statistics
type PatternMatcher ¶
type PatternMatcher struct {
// contains filtered or unexported fields
}
PatternMatcher handles NATS-style subject pattern matching
func NewPatternMatcher ¶
func NewPatternMatcher(pattern string) (*PatternMatcher, error)
NewPatternMatcher creates a new pattern matcher for the given pattern
func (*PatternMatcher) GetPattern ¶
func (pm *PatternMatcher) GetPattern() string
GetPattern returns the original pattern string
func (*PatternMatcher) IsPattern ¶
func (pm *PatternMatcher) IsPattern() bool
IsPattern returns true if this contains wildcards
func (*PatternMatcher) Match ¶
func (pm *PatternMatcher) Match(subject string) bool
Match checks if the given subject matches this pattern
type PatternRule ¶
type PatternRule struct {
Rule *Rule
Matcher *PatternMatcher
}
PatternRule wraps a rule with its compiled pattern matcher
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
func NewProcessor ¶
func NewProcessor(log *slog.Logger, metrics *metrics.Metrics, kvCtx *KVContext, sigVerification *SignatureVerification) *Processor
NewProcessor creates a new processor with optional signature verification
func (*Processor) GetStats ¶
func (p *Processor) GetStats() ProcessorStats
GetStats returns processor statistics
func (*Processor) GetSubjects ¶
GetSubjects returns all NATS subjects for subscription setup
func (*Processor) LoadRules ¶
LoadRules is used by CLI tooling (lint, test, check) to load rules from files. In serve mode, rules arrive via KV Watch which calls ReplaceRules/ReplaceHTTPRules directly.
func (*Processor) ProcessForSubscription ¶
func (p *Processor) ProcessForSubscription(triggerSubject, messageSubject string, payload []byte, headers map[string]string) ([]*Action, error)
ProcessForSubscription processes a message using O(1) lookup by trigger subject. triggerSubject is the trigger pattern (e.g., "sensors.tank.>") used for rule lookup. messageSubject is the actual message subject (e.g., "sensors.tank.001") used for template variables. Falls back to ProcessNATS (pattern matching) if no KV rules are loaded.
func (*Processor) ProcessHTTP ¶
func (p *Processor) ProcessHTTP(path, method string, payload []byte, headers map[string]string) ([]*Action, error)
ProcessHTTP processes an HTTP request through the rule engine
func (*Processor) ProcessNATS ¶
func (p *Processor) ProcessNATS(subject string, payload []byte, headers map[string]string) ([]*Action, error)
ProcessNATS processes a NATS message through the rule engine
func (*Processor) ProcessWithSubject ¶
func (p *Processor) ProcessWithSubject(subject string, payload []byte, headers map[string]string) ([]*Action, error)
ProcessWithSubject is kept for backward compatibility. Used by internal/broker/subscription.go.
func (*Processor) ReplaceHTTPRules ¶
ReplaceHTTPRules atomically swaps the KV-loaded HTTP rule set. Called by RuleKVManager when KV Watch detects changes. The map is keyed by HTTP path (e.g., "/webhook/github").
func (*Processor) ReplaceRules ¶
ReplaceRules atomically swaps the KV-loaded rule set. Called by RuleKVManager when KV Watch detects changes. The map is keyed by trigger subject (e.g., "sensors.tank.>").
func (*Processor) SetMaxForEachIterations ¶
SetMaxForEachIterations sets the maximum allowed forEach iterations
func (*Processor) SetTimeProvider ¶
func (p *Processor) SetTimeProvider(provider TimeProvider)
SetTimeProvider allows injecting a mock time provider for testing
type ProcessorStats ¶
type RetryConfig ¶
type RetryConfig struct {
MaxAttempts int `json:"maxAttempts" yaml:"maxAttempts"`
InitialDelay string `json:"initialDelay" yaml:"initialDelay"` // e.g., "1s"
MaxDelay string `json:"maxDelay" yaml:"maxDelay"` // e.g., "30s"
}
RetryConfig defines retry behavior for HTTP actions
type Rule ¶
type Rule struct {
Name string `json:"name,omitempty" yaml:"name,omitempty"`
Trigger Trigger `json:"trigger" yaml:"trigger"`
Conditions *Conditions `json:"conditions,omitempty" yaml:"conditions,omitempty"`
Action Action `json:"action" yaml:"action"`
Debounce string `json:"debounce,omitempty" yaml:"debounce,omitempty"`
DebounceDuration time.Duration `json:"-" yaml:"-"`
}
Rule represents a generic rule with trigger and action
type RuleIndex ¶
type RuleIndex struct {
// contains filtered or unexported fields
}
RuleIndex provides efficient rule lookup for NATS subjects Supports both exact matches (O(1)) and wildcard patterns (O(n))
func NewRuleIndex ¶
NewRuleIndex creates a new rule index
func (*RuleIndex) Add ¶
Add indexes a NATS-triggered rule for efficient lookup HTTP-triggered rules are ignored (they're stored separately in Processor)
func (*RuleIndex) FindAllMatching ¶
FindAllMatching returns all rules that match the given NATS subject First checks exact matches (O(1)), then pattern matches (O(n))
func (*RuleIndex) GetRuleCounts ¶
GetRuleCounts returns the number of exact and pattern rules
func (*RuleIndex) GetStats ¶
func (idx *RuleIndex) GetStats() IndexStats
GetStats returns index statistics
func (*RuleIndex) GetSubjects ¶
GetSubjects returns all unique NATS subjects (both exact and patterns)
func (*RuleIndex) GetSubscriptionSubjects ¶
GetSubscriptionSubjects returns subjects that should be used for JetStream subscriptions Converts wildcard patterns to NATS subscription format
type RulesLoader ¶
type RulesLoader struct {
// contains filtered or unexported fields
}
RulesLoader handles loading and validating rule definitions from YAML files
func NewRulesLoader ¶
func NewRulesLoader(log *slog.Logger, kvBuckets []string) *RulesLoader
NewRulesLoader creates a new rules loader
func (*RulesLoader) ExpandEnvironmentVariables ¶
func (l *RulesLoader) ExpandEnvironmentVariables(rule *Rule)
ExpandEnvironmentVariables expands ${VAR_NAME} placeholders in a rule. Public wrapper for KV-loaded rules that need env var expansion outside the file-based loading path.
func (*RulesLoader) LoadFromDirectory ¶
func (l *RulesLoader) LoadFromDirectory(dirPath string) ([]Rule, error)
LoadFromDirectory loads all .yaml and .yml files from a directory, recursively, while skipping any directories with a "_test" suffix.
func (*RulesLoader) LoadFromFile ¶
func (l *RulesLoader) LoadFromFile(filePath string) ([]Rule, error)
LoadFromFile loads rules from a single YAML file
func (*RulesLoader) ValidateRule ¶
func (l *RulesLoader) ValidateRule(rule *Rule) error
ValidateRule validates a single rule (trigger + conditions + action). Public wrapper for KV-loaded rules that need validation outside the file-based loading path.
type SignatureVerification ¶
SignatureVerification holds configuration for signature verification. This is a lightweight struct passed to EvaluationContext instead of the full config.
func NewSignatureVerification ¶
func NewSignatureVerification(enabled bool, pubKeyHeader, sigHeader string) *SignatureVerification
NewSignatureVerification creates a SignatureVerification from config values
type SubjectContext ¶
type SubjectContext struct {
Full string `json:"full"` // Full subject: "sensors.temperature.room1"
Tokens []string `json:"tokens"` // Split tokens: ["sensors", "temperature", "room1"]
Count int `json:"count"` // Token count: 3
}
SubjectContext provides access to NATS subject information for templates and conditions
func NewSubjectContext ¶
func NewSubjectContext(subject string) *SubjectContext
NewSubjectContext creates a SubjectContext from a NATS subject string
func (*SubjectContext) GetField ¶
func (sc *SubjectContext) GetField(fieldName string) (interface{}, bool)
GetField retrieves a subject field for template/condition processing Supports: "@subject", "@subject.0", "@subject.1", "@subject.count"
func (*SubjectContext) GetToken ¶
func (sc *SubjectContext) GetToken(index int) string
GetToken safely retrieves a token by index, returns empty string if out of bounds
type SystemTimeProvider ¶
type SystemTimeProvider struct{}
SystemTimeProvider uses system time
func NewSystemTimeProvider ¶
func NewSystemTimeProvider() *SystemTimeProvider
NewSystemTimeProvider creates a new system time provider
func (*SystemTimeProvider) GetContextAt ¶
func (p *SystemTimeProvider) GetContextAt(t time.Time) *TimeContext
GetContextAt returns time context for specific time (useful for testing)
func (*SystemTimeProvider) GetCurrentContext ¶
func (p *SystemTimeProvider) GetCurrentContext() *TimeContext
GetCurrentContext returns time context for current moment
type TemplateEngine ¶
type TemplateEngine struct {
// contains filtered or unexported fields
}
TemplateEngine processes rule template strings.
func NewTemplateEngine ¶
func NewTemplateEngine(log *slog.Logger) *TemplateEngine
NewTemplateEngine creates a new TemplateEngine.
func (*TemplateEngine) Execute ¶
func (te *TemplateEngine) Execute(template string, context *EvaluationContext) (string, error)
Execute renders a template string using the provided context. Optimized: Uses a single-pass recursive scanner instead of Regex.
type TimeContext ¶
type TimeContext struct {
// contains filtered or unexported fields
}
TimeContext holds current time information and pre-computed fields
func (*TimeContext) GetAllFieldNames ¶
func (tc *TimeContext) GetAllFieldNames() []string
GetAllFieldNames returns list of all available time field names
func (*TimeContext) GetField ¶
func (tc *TimeContext) GetField(fieldName string) (interface{}, bool)
GetField safely retrieves a time field value
type TimeProvider ¶
type TimeProvider interface {
GetCurrentContext() *TimeContext
GetContextAt(time.Time) *TimeContext
}
TimeProvider interface for creating time contexts (enables mocking)
type Trigger ¶
type Trigger struct {
NATS *NATSTrigger `json:"nats,omitempty" yaml:"nats,omitempty"`
HTTP *HTTPTrigger `json:"http,omitempty" yaml:"http,omitempty"`
}
Trigger defines what initiates rule evaluation (NATS or HTTP)
type WildcardType ¶
type WildcardType int
WildcardType represents the type of token in a pattern
const ( ExactToken WildcardType = iota SingleWildcard // * GreedyWildcard // > )