Documentation
¶
Overview ¶
Package reactive provides a reactive workflow engine built on rules engine primitives.
The reactive workflow engine replaces imperative step sequencing with typed rules that react to KV state changes and NATS messages. Workflows are defined in Go code, enabling compile-time type checking of all data flows.
Key concepts:
- TriggerSource: unified reactive primitive (KV watch, subject consumer, or both)
- RuleContext: provides typed access to execution state and triggering message
- ConditionFunc: typed condition evaluation against RuleContext
- StateMutatorFunc: typed state mutation after action completion
See ADR-021 for architectural details.
Index ¶
- Variables
- func CanProceed(state any) bool
- func ClearError(state any)
- func ClearPendingTask(state any)
- func CompleteExecution(state any, phase string)
- func EscalateExecution(state any, reason string)
- func FailExecution(state any, errMsg string)
- func GetID(state any) string
- func GetIteration(state any) int
- func GetPhase(state any) string
- func GetWorkflowID(state any) string
- func IncrementIteration(state any) int
- func InitializeExecution(state any, id, workflowID string, timeout time.Duration)
- func IsExpired(state any) bool
- func IsTerminal(state any) bool
- func IsTerminalStatus(status ExecutionStatus) bool
- func MatchesPattern(key, pattern string) bool
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func RecordTimelineEntry(state any, entry TimelineEntry)
- func Register(registry *component.Registry) error
- func SetDeadline(state any, deadline time.Time)
- func SetError(state any, errMsg string)
- func SetIteration(state any, iteration int)
- func SetPendingTask(state any, taskID string, ruleID string)
- func SetPhase(state any, phase string, ruleID string, triggerMode TriggerMode, ...)
- func SetStatus(state any, status ExecutionStatus)
- func TimeoutExecution(state any)
- type Action
- type ActionType
- type CallbackFields
- type CallbackInjectable
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) DebugStatus() any
- func (c *Component) Engine() *Engine
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) RegisterWorkflow(def *Definition) error
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(_ time.Duration) error
- type Condition
- type ConditionFunc
- func Always() ConditionFunc
- func And(conditions ...ConditionFunc) ConditionFunc
- func HasMessage() ConditionFunc
- func HasState() ConditionFunc
- func Never() ConditionFunc
- func Not(condition ConditionFunc) ConditionFunc
- func Or(conditions ...ConditionFunc) ConditionFunc
- func PhaseIs(phase string) ConditionFunc
- func StatusIs(status ExecutionStatus) ConditionFunc
- type ConditionResult
- type Config
- type ConsumerError
- type Definition
- type DispatchError
- type DispatchResult
- type Dispatcher
- func (d *Dispatcher) DispatchAction(ctx context.Context, ruleCtx *RuleContext, rule *RuleDef, def *Definition) (*DispatchResult, error)
- func (d *Dispatcher) HandleCallback(ctx context.Context, ruleCtx *RuleContext, rule *RuleDef, result any, ...) (*DispatchResult, error)
- func (d *Dispatcher) HandleEscalation(ctx context.Context, ruleCtx *RuleContext, reason string, def *Definition) (*DispatchResult, error)
- func (d *Dispatcher) HandleFailure(ctx context.Context, ruleCtx *RuleContext, errMsg string, def *Definition) (*DispatchResult, error)
- type DispatcherOption
- type Engine
- func (e *Engine) Initialize(ctx context.Context) error
- func (e *Engine) IsRunning() bool
- func (e *Engine) RegisterWorkflow(def *Definition) error
- func (e *Engine) Registry() *WorkflowRegistry
- func (e *Engine) Start(ctx context.Context) error
- func (e *Engine) StateBucket() string
- func (e *Engine) Stop()
- func (e *Engine) Uptime() time.Duration
- type EngineError
- type EngineMetrics
- func (m *EngineMetrics) DecrementPendingCallbacks()
- func (m *EngineMetrics) IncrementPendingCallbacks()
- func (m *EngineMetrics) RecordActionDispatch(workflowID, ruleID, actionType string)
- func (m *EngineMetrics) RecordCallbackReceived(workflowID, ruleID, status string, latencySeconds float64)
- func (m *EngineMetrics) RecordExecutionCompleted(workflowID string, durationSeconds float64)
- func (m *EngineMetrics) RecordExecutionCreated(workflowID string)
- func (m *EngineMetrics) RecordExecutionEscalated(workflowID, reason string, durationSeconds float64)
- func (m *EngineMetrics) RecordExecutionFailed(workflowID, reason string, durationSeconds float64)
- func (m *EngineMetrics) RecordExecutionTimedOut(workflowID string, durationSeconds float64)
- func (m *EngineMetrics) RecordRuleDuration(workflowID, ruleID string, durationSeconds float64)
- func (m *EngineMetrics) RecordRuleEvaluation(workflowID, ruleID string, fired bool)
- func (m *EngineMetrics) SetPendingCallbacks(count int)
- type EngineOption
- type EvaluationResult
- type Evaluator
- func (e *Evaluator) CleanupExpiredCooldowns(maxAge time.Duration) int
- func (e *Evaluator) ClearExecutionState(executionID string)
- func (e *Evaluator) EvaluateRule(ctx *RuleContext, rule *RuleDef, workflowID string, executionID string) EvaluationResult
- func (e *Evaluator) EvaluateRules(ctx *RuleContext, def *Definition, executionID string) (*RuleDef, *EvaluationResult)
- func (e *Evaluator) RecordFiring(workflowID, executionID, ruleID string)
- type EventConfig
- type ExecutionState
- type ExecutionStatus
- type KVOperation
- type KVWatchEvent
- type KVWatcher
- type MessageDeserializeError
- type MetricsRecorder
- type PayloadBuilderFunc
- type Publisher
- type RegistryError
- type RuleBuilder
- func (b *RuleBuilder) Build() (RuleDef, error)
- func (b *RuleBuilder) Complete() *RuleBuilder
- func (b *RuleBuilder) CompleteWithEvent(subject string, buildPayload PayloadBuilderFunc) *RuleBuilder
- func (b *RuleBuilder) CompleteWithMutation(mutateState StateMutatorFunc) *RuleBuilder
- func (b *RuleBuilder) MustBuild() RuleDef
- func (b *RuleBuilder) Mutate(mutateState StateMutatorFunc) *RuleBuilder
- func (b *RuleBuilder) OnJetStreamSubject(streamName, subject string, messageFactory func() any) *RuleBuilder
- func (b *RuleBuilder) OnSubject(subject string, messageFactory func() any) *RuleBuilder
- func (b *RuleBuilder) Publish(subject string, buildPayload PayloadBuilderFunc) *RuleBuilder
- func (b *RuleBuilder) PublishAsync(subject string, buildPayload PayloadBuilderFunc, expectedResultType string, ...) *RuleBuilder
- func (b *RuleBuilder) PublishWithMutation(subject string, buildPayload PayloadBuilderFunc, mutateState StateMutatorFunc) *RuleBuilder
- func (b *RuleBuilder) WatchKV(bucket, pattern string) *RuleBuilder
- func (b *RuleBuilder) When(description string, condition ConditionFunc) *RuleBuilder
- func (b *RuleBuilder) WhenAll() *RuleBuilder
- func (b *RuleBuilder) WhenAny() *RuleBuilder
- func (b *RuleBuilder) WithCooldown(d time.Duration) *RuleBuilder
- func (b *RuleBuilder) WithMaxFirings(n int) *RuleBuilder
- func (b *RuleBuilder) WithStateLookup(bucket string, keyFunc func(msg any) string) *RuleBuilder
- type RuleContext
- func BuildRuleContextFromKV(event KVWatchEvent, stateFactory func() any) (*RuleContext, error)
- func BuildRuleContextFromMessage(event SubjectMessageEvent, messageFactory func() any, ...) (*RuleContext, error)
- func BuildRuleContextFromMessageWithKV(ctx context.Context, event SubjectMessageEvent, messageFactory func() any, ...) (*RuleContext, error)
- type RuleDef
- type RuleWithWorkflow
- type StateAccessor
- type StateKeyError
- type StateLoadError
- type StateMutatorFunc
- type StateStore
- type SubjectConsumer
- type SubjectMessageEvent
- type SubjectMessageHandler
- type TimelineEntry
- type TriggerMode
- type TriggerSource
- type UnmarshalError
- type ValidationError
- type WatchError
- type WatchHandler
- type WorkflowBuilder
- func (b *WorkflowBuilder) AddRule(rule RuleDef) *WorkflowBuilder
- func (b *WorkflowBuilder) AddRuleFromBuilder(rb *RuleBuilder) *WorkflowBuilder
- func (b *WorkflowBuilder) Build() (*Definition, error)
- func (b *WorkflowBuilder) MustBuild() *Definition
- func (b *WorkflowBuilder) WithDescription(desc string) *WorkflowBuilder
- func (b *WorkflowBuilder) WithEvents(events EventConfig) *WorkflowBuilder
- func (b *WorkflowBuilder) WithMaxIterations(n int) *WorkflowBuilder
- func (b *WorkflowBuilder) WithOnComplete(subject string) *WorkflowBuilder
- func (b *WorkflowBuilder) WithOnEscalate(subject string) *WorkflowBuilder
- func (b *WorkflowBuilder) WithOnFail(subject string) *WorkflowBuilder
- func (b *WorkflowBuilder) WithStateBucket(bucket string) *WorkflowBuilder
- func (b *WorkflowBuilder) WithStateFactory(factory func() any) *WorkflowBuilder
- func (b *WorkflowBuilder) WithTimeout(d time.Duration) *WorkflowBuilder
- type WorkflowCompletionEvent
- type WorkflowEscalationEvent
- type WorkflowFailureEvent
- type WorkflowRegistry
- func (r *WorkflowRegistry) Clear()
- func (r *WorkflowRegistry) Count() int
- func (r *WorkflowRegistry) Get(workflowID string) *Definition
- func (r *WorkflowRegistry) GetAll() []*Definition
- func (r *WorkflowRegistry) GetResultTypeFactory(typeKey string) func() message.Payload
- func (r *WorkflowRegistry) GetRulesForSubject(subject string) []*RuleWithWorkflow
- func (r *WorkflowRegistry) GetRulesForTrigger(triggerMode TriggerMode, bucket, _ string) []*RuleWithWorkflow
- func (r *WorkflowRegistry) List() []string
- func (r *WorkflowRegistry) Register(def *Definition) error
- func (r *WorkflowRegistry) RegisterResultType(typeKey string, factory func() message.Payload) error
- func (r *WorkflowRegistry) Unregister(workflowID string)
- func (r *WorkflowRegistry) ValidationSummary() string
Constants ¶
This section is empty.
Variables ¶
var ConditionHelpers = struct { // PhaseIs returns a condition that checks if the execution phase matches. PhaseIs func(phase string) ConditionFunc // PhaseIn returns a condition that checks if the execution phase is in the list. PhaseIn func(phases ...string) ConditionFunc // StatusIs returns a condition that checks if the execution status matches. StatusIs func(status ExecutionStatus) ConditionFunc // IterationLessThan returns a condition that checks if iteration is below limit. IterationLessThan func(max int) ConditionFunc // IterationEquals returns a condition that checks if iteration equals value. IterationEquals func(n int) ConditionFunc // HasError returns a condition that checks if execution has an error. HasError func() ConditionFunc // NoError returns a condition that checks if execution has no error. NoError func() ConditionFunc // IsWaiting returns a condition that checks if execution is waiting for callback. IsWaiting func() ConditionFunc // NotWaiting returns a condition that checks if execution is not waiting. NotWaiting func() ConditionFunc }{ PhaseIs: func(phase string) ConditionFunc { return func(ctx *RuleContext) bool { if ctx.State == nil { return false } base := GetExecutionState(ctx.State) return base != nil && base.Phase == phase } }, PhaseIn: func(phases ...string) ConditionFunc { return func(ctx *RuleContext) bool { if ctx.State == nil { return false } base := GetExecutionState(ctx.State) if base == nil { return false } for _, p := range phases { if base.Phase == p { return true } } return false } }, StatusIs: func(status ExecutionStatus) ConditionFunc { return func(ctx *RuleContext) bool { if ctx.State == nil { return false } base := GetExecutionState(ctx.State) return base != nil && base.Status == status } }, IterationLessThan: func(max int) ConditionFunc { return func(ctx *RuleContext) bool { if ctx.State == nil { return false } base := GetExecutionState(ctx.State) return base != nil && base.Iteration < max } }, IterationEquals: func(n int) ConditionFunc { return func(ctx *RuleContext) bool { if ctx.State == nil { return false } base := GetExecutionState(ctx.State) return base != nil && base.Iteration == n } }, HasError: func() ConditionFunc { return func(ctx *RuleContext) bool { if ctx.State == nil { return false } base := GetExecutionState(ctx.State) return base != nil && base.Error != "" } }, NoError: func() ConditionFunc { return func(ctx *RuleContext) bool { if ctx.State == nil { return true } base := GetExecutionState(ctx.State) return base == nil || base.Error == "" } }, IsWaiting: func() ConditionFunc { return func(ctx *RuleContext) bool { if ctx.State == nil { return false } base := GetExecutionState(ctx.State) return base != nil && base.Status == StatusWaiting } }, NotWaiting: func() ConditionFunc { return func(ctx *RuleContext) bool { if ctx.State == nil { return true } base := GetExecutionState(ctx.State) return base == nil || base.Status != StatusWaiting } }, }
ConditionHelpers provides common condition functions for use in rule definitions.
Functions ¶
func CanProceed ¶
CanProceed returns true if the execution can proceed (not terminal, not waiting, not expired).
func ClearError ¶
func ClearError(state any)
ClearError clears the error message from the execution state.
func ClearPendingTask ¶
func ClearPendingTask(state any)
ClearPendingTask clears the pending task after callback received.
func CompleteExecution ¶
CompleteExecution marks the execution as completed successfully.
func EscalateExecution ¶
EscalateExecution marks the execution as escalated.
func FailExecution ¶
FailExecution marks the execution as failed.
func GetIteration ¶
GetIteration returns the current iteration count.
func IncrementIteration ¶
IncrementIteration increments the iteration counter.
func InitializeExecution ¶
InitializeExecution initializes a new execution state.
func IsTerminal ¶
IsTerminal returns true if the execution is in a terminal state.
func IsTerminalStatus ¶
func IsTerminalStatus(status ExecutionStatus) bool
IsTerminalStatus returns true if the status is a terminal state.
func MatchesPattern ¶
MatchesPattern checks if a key matches a NATS-style wildcard pattern. Supports * (single token) and > (multi-token suffix) wildcards.
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new reactive workflow engine component.
func RecordTimelineEntry ¶
func RecordTimelineEntry(state any, entry TimelineEntry)
RecordTimelineEntry adds a timeline entry to the execution state.
func Register ¶
Register registers the reactive workflow engine component factory with the registry.
func SetDeadline ¶
SetDeadline sets the execution deadline.
func SetIteration ¶
SetIteration sets the iteration counter to a specific value.
func SetPendingTask ¶
SetPendingTask marks the execution as waiting for an async callback.
func SetPhase ¶
func SetPhase(state any, phase string, ruleID string, triggerMode TriggerMode, triggerInfo string, action string)
SetPhase updates the execution phase and records a timeline entry.
func SetStatus ¶
func SetStatus(state any, status ExecutionStatus)
SetStatus updates the execution status.
func TimeoutExecution ¶
func TimeoutExecution(state any)
TimeoutExecution marks the execution as timed out.
Types ¶
type Action ¶
type Action struct {
// Type determines the action behavior.
Type ActionType
// PublishSubject for publish/publish_async actions.
PublishSubject string
// ExpectedResultType is the registered payload type name for async results.
// Required for ActionPublishAsync so the engine can deserialize the callback.
// Example: "workflow.planner-result.v1"
ExpectedResultType string
// BuildPayload constructs the typed payload from the rule context.
// This replaces string interpolation entirely — it's a Go function that reads
// typed fields from state and/or message and produces a typed payload.
// Required for ActionPublish and ActionPublishAsync.
BuildPayload PayloadBuilderFunc
// MutateState updates the execution entity after the action completes.
// For sync actions, called immediately with result=nil.
// For async actions, called when the callback arrives with the typed result.
// The mutated state is written back to KV, which may trigger other rules.
MutateState StateMutatorFunc
}
Action defines what happens when a rule fires.
type ActionType ¶
type ActionType int
ActionType determines the action behavior.
const ( // ActionPublishAsync publishes to NATS with callback tracking. The engine // parks the execution until the callback arrives, then calls MutateState. ActionPublishAsync ActionType = iota // ActionPublish publishes to NATS fire-and-forget, then calls MutateState immediately. ActionPublish // ActionMutate updates KV state without publishing. The state change may // trigger other rules via KV watch. ActionMutate // ActionComplete marks the execution as completed and publishes terminal events. ActionComplete )
func (ActionType) String ¶
func (a ActionType) String() string
String returns a human-readable name for the action type.
type CallbackFields ¶
type CallbackFields struct {
// TaskID uniquely identifies this async task.
TaskID string `json:"task_id"`
// CallbackSubject is where the executor should publish the result.
CallbackSubject string `json:"callback_subject"`
// ExecutionID is the workflow execution ID for direct lookup.
ExecutionID string `json:"execution_id"`
}
CallbackFields are injected into async task payloads for callback correlation. Used by the reactive workflow engine to track async step execution.
type CallbackInjectable ¶
type CallbackInjectable interface {
InjectCallback(fields CallbackFields)
}
CallbackInjectable is implemented by payloads that can receive callback fields.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the reactive workflow engine as a discoverable component.
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema.
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics. TODO: Integrate with EngineMetrics to return real-time flow data.
func (*Component) DebugStatus ¶
DebugStatus returns extended debug information for the component.
func (*Component) Engine ¶
Engine returns the underlying workflow engine. This allows callers to register workflows and interact with the engine directly.
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns current health status.
func (*Component) Initialize ¶
Initialize prepares the component for starting.
func (*Component) InputPorts ¶
InputPorts returns input port definitions.
func (*Component) OutputPorts ¶
OutputPorts returns output port definitions.
func (*Component) RegisterWorkflow ¶
func (c *Component) RegisterWorkflow(def *Definition) error
RegisterWorkflow registers a workflow definition with the engine. Workflows can be registered before or after Start() is called. Registering before Start() is recommended so triggers are active from the beginning.
type Condition ¶
type Condition struct {
// Description is human-readable (for logging/debugging).
Description string
// Evaluate is the typed condition function.
Evaluate ConditionFunc
}
Condition is evaluated against a RuleContext using a ConditionFunc. No more string-based "${steps.reviewer.verdict} == 'approved'" comparisons.
type ConditionFunc ¶
type ConditionFunc func(ctx *RuleContext) bool
ConditionFunc evaluates a condition against a RuleContext. Returns true if the condition is met. Implementations use type assertions to access typed fields on State and/or Message.
func And ¶
func And(conditions ...ConditionFunc) ConditionFunc
And combines multiple conditions with logical AND.
func HasMessage ¶
func HasMessage() ConditionFunc
HasMessage returns a condition that checks if a message is present.
func HasState ¶
func HasState() ConditionFunc
HasState returns a condition that checks if state is present.
func Or ¶
func Or(conditions ...ConditionFunc) ConditionFunc
Or combines multiple conditions with logical OR.
func PhaseIs ¶
func PhaseIs(phase string) ConditionFunc
PhaseIs returns a condition that checks if the execution phase matches.
func StatusIs ¶
func StatusIs(status ExecutionStatus) ConditionFunc
StatusIs returns a condition that checks if the execution status matches.
type ConditionResult ¶
type ConditionResult struct {
// Description is the condition's description.
Description string
// Passed indicates whether the condition evaluated to true.
Passed bool
}
ConditionResult contains the result of evaluating a single condition.
type Config ¶
type Config struct {
// Port configuration for inputs and outputs
Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration for workflow inputs and outputs,category:basic"`
// StateBucket is the KV bucket for workflow execution state
StateBucket string `` /* 155-byte string literal not displayed */
// CallbackStreamName is the JetStream stream for callback messages
CallbackStreamName string `` /* 158-byte string literal not displayed */
// EventStreamName is the JetStream stream for workflow events
EventStreamName string `` /* 150-byte string literal not displayed */
// DefaultTimeout is the default timeout for workflows without explicit timeout
DefaultTimeout string `json:"default_timeout" schema:"type:string,description:Default timeout for workflows (e.g. 10m),default:10m,category:basic"`
// DefaultMaxIterations is the default max iterations for loop workflows
DefaultMaxIterations int `` /* 141-byte string literal not displayed */
// CleanupRetention is how long to retain completed executions
CleanupRetention string `` /* 142-byte string literal not displayed */
// CleanupInterval is how often to run cleanup
CleanupInterval string `` /* 134-byte string literal not displayed */
// TaskTimeoutDefault is the default timeout for async tasks
TaskTimeoutDefault string `json:"task_timeout_default" schema:"type:string,description:Default timeout for async tasks,default:5m,category:advanced"`
// ConsumerNamePrefix is prepended to consumer names for uniqueness
ConsumerNamePrefix string `json:"consumer_name_prefix,omitempty" schema:"type:string,description:Prefix for NATS consumer names,category:advanced"`
// EnableMetrics enables Prometheus metrics
EnableMetrics bool `json:"enable_metrics" schema:"type:bool,description:Enable Prometheus metrics,default:true,category:advanced"`
}
Config represents the configuration for the reactive workflow engine.
func (Config) GetCleanupInterval ¶
GetCleanupInterval returns the parsed cleanup interval duration.
func (Config) GetCleanupRetention ¶
GetCleanupRetention returns the parsed cleanup retention duration.
func (Config) GetDefaultTimeout ¶
GetDefaultTimeout returns the parsed default timeout duration.
func (Config) GetTaskTimeoutDefault ¶
GetTaskTimeoutDefault returns the parsed task timeout duration.
type ConsumerError ¶
ConsumerError represents an error with a subject consumer.
func (*ConsumerError) Error ¶
func (e *ConsumerError) Error() string
Error implements the error interface.
func (*ConsumerError) Unwrap ¶
func (e *ConsumerError) Unwrap() error
Unwrap returns the underlying error.
type Definition ¶
type Definition struct {
// ID is the unique workflow identifier.
ID string
// Description is a human-readable description.
Description string
// StateBucket is the KV bucket for storing execution state.
StateBucket string
// StateFactory creates a zero-value instance of the workflow's concrete state type.
// Must return a pointer to a struct that embeds ExecutionState.
StateFactory func() any
// MaxIterations limits loop iterations (0 = unlimited).
MaxIterations int
// Timeout is the maximum execution duration.
Timeout time.Duration
// Rules defines the reactive rules for this workflow.
// Rules are evaluated in order; the first matching rule fires.
Rules []RuleDef
// Events defines typed event subjects for workflow lifecycle events.
Events EventConfig
}
Definition defines a reactive workflow as Go code.
func (*Definition) Validate ¶
func (d *Definition) Validate() error
Validate checks if the workflow definition is properly configured.
type DispatchError ¶
DispatchError represents an error during action dispatch.
func (*DispatchError) Error ¶
func (e *DispatchError) Error() string
Error implements the error interface.
func (*DispatchError) Unwrap ¶
func (e *DispatchError) Unwrap() error
Unwrap returns the underlying error.
type DispatchResult ¶
type DispatchResult struct {
// TaskID is set for async actions to correlate callbacks.
TaskID string
// NewRevision is the KV revision after state mutation.
NewRevision uint64
// Published indicates if a message was published.
Published bool
// StateUpdated indicates if state was written to KV.
StateUpdated bool
// PartialFailure indicates the action partially succeeded but had errors.
// For example, publish succeeded but state write failed.
PartialFailure bool
// PartialError contains the error message for partial failures.
PartialError string
}
DispatchResult contains the result of dispatching an action.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher executes actions for reactive workflow rules. It handles NATS publishing, KV state mutations, and async callback tracking.
func NewDispatcher ¶
func NewDispatcher(logger *slog.Logger, opts ...DispatcherOption) *Dispatcher
NewDispatcher creates a new action dispatcher.
func (*Dispatcher) DispatchAction ¶
func (d *Dispatcher) DispatchAction( ctx context.Context, ruleCtx *RuleContext, rule *RuleDef, def *Definition, ) (*DispatchResult, error)
DispatchAction executes the action defined in a rule. The ctx contains both the triggering data and the current state. Returns the dispatch result and any error that occurred.
func (*Dispatcher) HandleCallback ¶
func (d *Dispatcher) HandleCallback( ctx context.Context, ruleCtx *RuleContext, rule *RuleDef, result any, def *Definition, ) (*DispatchResult, error)
HandleCallback processes an async callback result. It looks up the execution by task ID, deserializes the result, invokes the state mutator, and writes the updated state to KV.
func (*Dispatcher) HandleEscalation ¶
func (d *Dispatcher) HandleEscalation( ctx context.Context, ruleCtx *RuleContext, reason string, def *Definition, ) (*DispatchResult, error)
HandleEscalation records an escalation and optionally publishes escalation event.
func (*Dispatcher) HandleFailure ¶
func (d *Dispatcher) HandleFailure( ctx context.Context, ruleCtx *RuleContext, errMsg string, def *Definition, ) (*DispatchResult, error)
HandleFailure records a failure and optionally publishes failure event.
type DispatcherOption ¶
type DispatcherOption func(*Dispatcher)
DispatcherOption configures a Dispatcher.
func WithKVWatcher ¶
func WithKVWatcher(w *KVWatcher) DispatcherOption
WithKVWatcher sets the KV watcher for feedback loop prevention.
func WithPublisher ¶
func WithPublisher(p Publisher) DispatcherOption
WithPublisher sets the NATS publisher for the dispatcher.
func WithSource ¶
func WithSource(source string) DispatcherOption
WithSource sets the source identifier for published messages.
func WithStateStore ¶
func WithStateStore(s StateStore) DispatcherOption
WithStateStore sets the KV state store for the dispatcher.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine orchestrates the reactive workflow engine. It manages all sub-components: triggers, evaluator, and dispatcher.
func NewEngine ¶
func NewEngine( config Config, natsClient *natsclient.Client, opts ...EngineOption, ) *Engine
NewEngine creates a new reactive workflow engine.
func (*Engine) Initialize ¶
Initialize prepares the engine for starting. Call this after registering workflows but before Start.
func (*Engine) RegisterWorkflow ¶
func (e *Engine) RegisterWorkflow(def *Definition) error
RegisterWorkflow registers a workflow and starts its triggers if the engine is running. This enables registering workflows after the engine has started.
func (*Engine) Registry ¶
func (e *Engine) Registry() *WorkflowRegistry
Registry returns the workflow registry.
func (*Engine) StateBucket ¶
StateBucket returns the configured KV bucket name for workflow state.
type EngineError ¶
EngineError represents an error from the engine.
func (*EngineError) Error ¶
func (e *EngineError) Error() string
Error implements the error interface.
func (*EngineError) Unwrap ¶
func (e *EngineError) Unwrap() error
Unwrap returns the underlying error.
type EngineMetrics ¶
type EngineMetrics struct {
// contains filtered or unexported fields
}
EngineMetrics holds Prometheus metrics for the reactive workflow engine. It implements MetricsRecorder interface.
func GetMetrics ¶
func GetMetrics(registry *metric.MetricsRegistry) *EngineMetrics
GetMetrics returns the singleton metrics instance, creating and registering it if needed.
func (*EngineMetrics) DecrementPendingCallbacks ¶
func (m *EngineMetrics) DecrementPendingCallbacks()
DecrementPendingCallbacks decrements the pending callbacks counter.
func (*EngineMetrics) IncrementPendingCallbacks ¶
func (m *EngineMetrics) IncrementPendingCallbacks()
IncrementPendingCallbacks increments the pending callbacks counter.
func (*EngineMetrics) RecordActionDispatch ¶
func (m *EngineMetrics) RecordActionDispatch(workflowID, ruleID, actionType string)
RecordActionDispatch records an action dispatch (implements MetricsRecorder).
func (*EngineMetrics) RecordCallbackReceived ¶
func (m *EngineMetrics) RecordCallbackReceived(workflowID, ruleID, status string, latencySeconds float64)
RecordCallbackReceived records receiving an async callback.
func (*EngineMetrics) RecordExecutionCompleted ¶
func (m *EngineMetrics) RecordExecutionCompleted(workflowID string, durationSeconds float64)
RecordExecutionCompleted records a successful execution completion.
func (*EngineMetrics) RecordExecutionCreated ¶
func (m *EngineMetrics) RecordExecutionCreated(workflowID string)
RecordExecutionCreated records a new execution being created (implements MetricsRecorder).
func (*EngineMetrics) RecordExecutionEscalated ¶
func (m *EngineMetrics) RecordExecutionEscalated(workflowID, reason string, durationSeconds float64)
RecordExecutionEscalated records an execution that was escalated.
func (*EngineMetrics) RecordExecutionFailed ¶
func (m *EngineMetrics) RecordExecutionFailed(workflowID, reason string, durationSeconds float64)
RecordExecutionFailed records a failed execution.
func (*EngineMetrics) RecordExecutionTimedOut ¶
func (m *EngineMetrics) RecordExecutionTimedOut(workflowID string, durationSeconds float64)
RecordExecutionTimedOut records an execution that timed out.
func (*EngineMetrics) RecordRuleDuration ¶
func (m *EngineMetrics) RecordRuleDuration(workflowID, ruleID string, durationSeconds float64)
RecordRuleDuration records the duration of a rule evaluation and execution.
func (*EngineMetrics) RecordRuleEvaluation ¶
func (m *EngineMetrics) RecordRuleEvaluation(workflowID, ruleID string, fired bool)
RecordRuleEvaluation records a rule evaluation (implements MetricsRecorder).
func (*EngineMetrics) SetPendingCallbacks ¶
func (m *EngineMetrics) SetPendingCallbacks(count int)
SetPendingCallbacks sets the current number of pending callbacks.
type EngineOption ¶
type EngineOption func(*Engine)
EngineOption configures an Engine.
func WithEngineLogger ¶
func WithEngineLogger(logger *slog.Logger) EngineOption
WithEngineLogger sets the logger for the engine.
func WithEngineMetrics ¶
func WithEngineMetrics(metrics MetricsRecorder) EngineOption
WithEngineMetrics sets the metrics recorder for the engine.
type EvaluationResult ¶
type EvaluationResult struct {
// RuleID is the ID of the rule that was evaluated.
RuleID string
// ShouldFire indicates whether the rule's conditions are met and it should fire.
ShouldFire bool
// Reason explains why the rule should or should not fire.
Reason string
// ConditionResults contains the result of each condition evaluation.
ConditionResults []ConditionResult
}
EvaluationResult contains the result of evaluating a rule.
type Evaluator ¶
type Evaluator struct {
// contains filtered or unexported fields
}
Evaluator evaluates rules against RuleContext to determine which rules should fire. It tracks cooldowns and firing counts to enforce rule constraints.
func NewEvaluator ¶
NewEvaluator creates a new rule evaluator.
func (*Evaluator) CleanupExpiredCooldowns ¶
CleanupExpiredCooldowns removes cooldown entries older than maxAge. This prevents unbounded memory growth in long-running engines. Returns the number of entries cleaned up.
func (*Evaluator) ClearExecutionState ¶
ClearExecutionState clears firing counts for a completed execution. Call this when an execution completes to free memory.
func (*Evaluator) EvaluateRule ¶
func (e *Evaluator) EvaluateRule( ctx *RuleContext, rule *RuleDef, workflowID string, executionID string, ) EvaluationResult
EvaluateRule evaluates a rule against a RuleContext and returns whether it should fire. This checks: 1. Terminal state (completed/failed/escalated/timed_out executions never fire) 2. Cooldown constraints 3. Max firings constraints 4. All conditions (using AND/OR logic based on rule.Logic)
func (*Evaluator) EvaluateRules ¶
func (e *Evaluator) EvaluateRules( ctx *RuleContext, def *Definition, executionID string, ) (*RuleDef, *EvaluationResult)
EvaluateRules evaluates all rules in a workflow definition against a RuleContext. Returns the first rule that should fire, or nil if no rule matches. Rules are evaluated in definition order for deterministic behavior.
func (*Evaluator) RecordFiring ¶
RecordFiring records that a rule has fired, updating cooldown and firing count.
type EventConfig ¶
type EventConfig struct {
// OnComplete is the subject to publish when the workflow completes successfully.
OnComplete string
// OnFail is the subject to publish when the workflow fails.
OnFail string
// OnEscalate is the subject to publish when the workflow is escalated.
OnEscalate string
}
EventConfig defines typed event subjects for external consumers.
type ExecutionState ¶
type ExecutionState struct {
// ID is the unique execution identifier (KV key).
ID string `json:"id"`
// WorkflowID references the workflow definition.
WorkflowID string `json:"workflow_id"`
// Phase is the current execution phase (typed enum per workflow).
Phase string `json:"phase"`
// Iteration tracks retry/loop count.
Iteration int `json:"iteration"`
// Status is the overall execution status.
Status ExecutionStatus `json:"status"`
// Error holds the last error message if any.
Error string `json:"error,omitempty"`
// PendingTaskID is set when waiting for an async callback.
PendingTaskID string `json:"pending_task_id,omitempty"`
// PendingRuleID identifies which rule is awaiting a callback.
PendingRuleID string `json:"pending_rule_id,omitempty"`
// Timestamps
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
// Deadline is the absolute time when this execution times out.
Deadline *time.Time `json:"deadline,omitempty"`
// Timeline records rule firings for debugging.
Timeline []TimelineEntry `json:"timeline,omitempty"`
}
ExecutionState represents the typed state of a workflow execution. Each workflow definition declares its own concrete state type that embeds ExecutionState.
func ExtractExecutionState ¶
func ExtractExecutionState(state any) *ExecutionState
ExtractExecutionState extracts the ExecutionState from a typed state struct. It first checks if the state implements StateAccessor, then falls back to reflection. Returns nil if the state is nil or doesn't contain an ExecutionState.
func GetExecutionState ¶
func GetExecutionState(state any) *ExecutionState
GetExecutionState extracts the embedded ExecutionState from a typed state struct. Returns nil if the state is nil or does not contain an ExecutionState.
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus represents the overall status of a workflow execution.
const ( // StatusPending indicates the execution has been created but not started. StatusPending ExecutionStatus = "pending" // StatusRunning indicates the execution is actively processing rules. StatusRunning ExecutionStatus = "running" // StatusWaiting indicates the execution is waiting for an async callback. StatusWaiting ExecutionStatus = "waiting" // StatusCompleted indicates the execution finished successfully. StatusCompleted ExecutionStatus = "completed" // StatusFailed indicates the execution failed with an error. StatusFailed ExecutionStatus = "failed" // StatusEscalated indicates the execution was escalated (e.g., max iterations exceeded). StatusEscalated ExecutionStatus = "escalated" // StatusTimedOut indicates the execution exceeded its timeout. StatusTimedOut ExecutionStatus = "timed_out" )
func GetStatus ¶
func GetStatus(state any) ExecutionStatus
GetStatus returns the current status of the execution.
type KVOperation ¶
type KVOperation int
KVOperation indicates the type of KV state change.
const ( // KVOperationPut indicates a create or update. KVOperationPut KVOperation = iota // KVOperationDelete indicates a deletion. KVOperationDelete )
func (KVOperation) String ¶
func (op KVOperation) String() string
String returns a human-readable name for the operation.
type KVWatchEvent ¶
type KVWatchEvent struct {
// Bucket is the KV bucket name.
Bucket string
// Key is the KV entry key.
Key string
// Value is the raw KV entry value (JSON).
Value []byte
// Revision is the KV entry revision.
Revision uint64
// Operation indicates the type of change.
Operation KVOperation
// Timestamp is when the event was received.
Timestamp time.Time
}
KVWatchEvent represents a KV state change event.
type KVWatcher ¶
type KVWatcher struct {
// contains filtered or unexported fields
}
KVWatcher manages KV watch operations for state-triggered rules. It watches one or more KV buckets for state changes and triggers rule evaluation when matching keys are updated.
func NewKVWatcher ¶
NewKVWatcher creates a new KV watcher.
func (*KVWatcher) RecordOwnRevision ¶
RecordOwnRevision records a revision that we wrote, to prevent feedback loops. Call this after successfully writing to KV so we skip evaluation when we see our own update.
func (*KVWatcher) StartWatch ¶
func (w *KVWatcher) StartWatch( ctx context.Context, bucket jetstream.KeyValue, pattern string, handler WatchHandler, ) error
StartWatch starts watching a KV bucket for changes matching the pattern. Multiple handlers can be registered for the same bucket+pattern; all will be called when a matching update occurs.
type MessageDeserializeError ¶
MessageDeserializeError represents an error deserializing a message.
func (*MessageDeserializeError) Error ¶
func (e *MessageDeserializeError) Error() string
Error implements the error interface.
func (*MessageDeserializeError) Unwrap ¶
func (e *MessageDeserializeError) Unwrap() error
Unwrap returns the underlying error.
type MetricsRecorder ¶
type MetricsRecorder interface {
RecordRuleEvaluation(workflowID, ruleID string, fired bool)
RecordActionDispatch(workflowID, ruleID, actionType string)
RecordExecutionCreated(workflowID string)
}
MetricsRecorder records engine metrics.
type PayloadBuilderFunc ¶
type PayloadBuilderFunc func(ctx *RuleContext) (message.Payload, error)
PayloadBuilderFunc constructs a typed NATS message payload from rule context. Has access to both state and message for maximum flexibility. Output: a typed payload struct ready for json.Marshal + NATS publish.
func SimplePayloadBuilder ¶
func SimplePayloadBuilder(payload message.Payload) PayloadBuilderFunc
SimplePayloadBuilder creates a PayloadBuilderFunc that returns a fixed payload. Useful for simple cases where the payload doesn't depend on context.
type Publisher ¶
type Publisher interface {
// Publish sends a message to a NATS subject.
Publish(ctx context.Context, subject string, data []byte) error
}
Publisher handles publishing messages to NATS subjects.
type RegistryError ¶
RegistryError represents an error from the workflow registry.
func (*RegistryError) Error ¶
func (e *RegistryError) Error() string
Error implements the error interface.
func (*RegistryError) Unwrap ¶
func (e *RegistryError) Unwrap() error
Unwrap returns the underlying error.
type RuleBuilder ¶
type RuleBuilder struct {
// contains filtered or unexported fields
}
RuleBuilder provides a fluent API for building rule definitions.
func (*RuleBuilder) Build ¶
func (b *RuleBuilder) Build() (RuleDef, error)
Build validates and returns the rule definition.
func (*RuleBuilder) Complete ¶
func (b *RuleBuilder) Complete() *RuleBuilder
Complete sets the action to mark the execution as completed.
func (*RuleBuilder) CompleteWithEvent ¶
func (b *RuleBuilder) CompleteWithEvent(subject string, buildPayload PayloadBuilderFunc) *RuleBuilder
CompleteWithEvent sets the action to mark the execution as completed and publish a completion event.
func (*RuleBuilder) CompleteWithMutation ¶
func (b *RuleBuilder) CompleteWithMutation(mutateState StateMutatorFunc) *RuleBuilder
CompleteWithMutation sets the action to mark the execution as completed after applying a final state mutation.
func (*RuleBuilder) MustBuild ¶
func (b *RuleBuilder) MustBuild() RuleDef
MustBuild validates and returns the rule definition, panicking on error. Use this only in tests or during application initialization.
func (*RuleBuilder) Mutate ¶
func (b *RuleBuilder) Mutate(mutateState StateMutatorFunc) *RuleBuilder
Mutate sets the action to update state without publishing.
func (*RuleBuilder) OnJetStreamSubject ¶
func (b *RuleBuilder) OnJetStreamSubject(streamName, subject string, messageFactory func() any) *RuleBuilder
OnJetStreamSubject sets the rule to trigger on JetStream messages.
func (*RuleBuilder) OnSubject ¶
func (b *RuleBuilder) OnSubject(subject string, messageFactory func() any) *RuleBuilder
OnSubject sets the rule to trigger on NATS subject messages.
func (*RuleBuilder) Publish ¶
func (b *RuleBuilder) Publish(subject string, buildPayload PayloadBuilderFunc) *RuleBuilder
Publish sets the action to publish a message fire-and-forget.
func (*RuleBuilder) PublishAsync ¶
func (b *RuleBuilder) PublishAsync( subject string, buildPayload PayloadBuilderFunc, expectedResultType string, mutateState StateMutatorFunc, ) *RuleBuilder
PublishAsync sets the action to publish a message and wait for a callback. Note: Only one action method should be called per rule. If multiple action methods are called, only the last one takes effect (actions are mutually exclusive).
func (*RuleBuilder) PublishWithMutation ¶
func (b *RuleBuilder) PublishWithMutation( subject string, buildPayload PayloadBuilderFunc, mutateState StateMutatorFunc, ) *RuleBuilder
PublishWithMutation sets the action to publish a message and mutate state.
func (*RuleBuilder) WatchKV ¶
func (b *RuleBuilder) WatchKV(bucket, pattern string) *RuleBuilder
WatchKV sets the rule to trigger on KV state changes.
func (*RuleBuilder) When ¶
func (b *RuleBuilder) When(description string, condition ConditionFunc) *RuleBuilder
When adds a condition that must be true for the rule to fire.
func (*RuleBuilder) WhenAll ¶
func (b *RuleBuilder) WhenAll() *RuleBuilder
WhenAll sets the rule to require all conditions to be true (AND logic). This is the default behavior.
func (*RuleBuilder) WhenAny ¶
func (b *RuleBuilder) WhenAny() *RuleBuilder
WhenAny sets the rule to require any condition to be true (OR logic).
func (*RuleBuilder) WithCooldown ¶
func (b *RuleBuilder) WithCooldown(d time.Duration) *RuleBuilder
WithCooldown sets the cooldown period to prevent rapid re-firing.
func (*RuleBuilder) WithMaxFirings ¶
func (b *RuleBuilder) WithMaxFirings(n int) *RuleBuilder
WithMaxFirings limits how many times this rule can fire per execution.
func (*RuleBuilder) WithStateLookup ¶
func (b *RuleBuilder) WithStateLookup(bucket string, keyFunc func(msg any) string) *RuleBuilder
WithStateLookup configures state loading for message-triggered rules. This enables "event + state" patterns where the message arrival triggers the rule but conditions evaluate against both message and KV state.
type RuleContext ¶
type RuleContext struct {
// State is the typed execution entity from KV.
// Nil when trigger mode is TriggerMessageOnly.
// Type assertion to concrete state type: ctx.State.(*PlanReviewState)
State any
// Message is the typed triggering message from NATS.
// Nil when trigger mode is TriggerStateOnly.
// Type assertion to concrete message type: ctx.Message.(*PlanReviewResult)
Message any
// KVRevision is the KV entry revision (for optimistic concurrency).
// Zero when no KV state is involved.
KVRevision uint64
// Subject is the NATS subject the message arrived on (for subject-triggered rules).
// Empty when trigger mode is TriggerStateOnly.
Subject string
// KVKey is the KV key that triggered the rule (for KV-triggered rules).
// Empty when trigger mode is TriggerMessageOnly without state lookup.
KVKey string
}
RuleContext provides typed access to both the accumulated state and the triggering event. Which fields are populated depends on the TriggerMode:
TriggerStateOnly: State is set, Message is nil TriggerMessageOnly: Message is set, State is nil TriggerMessageAndState: Both State and Message are set
func BuildRuleContextFromKV ¶
func BuildRuleContextFromKV(event KVWatchEvent, stateFactory func() any) (*RuleContext, error)
BuildRuleContextFromKV builds a RuleContext from a KV watch event. The stateFactory is used to create a typed instance for unmarshaling.
func BuildRuleContextFromMessage ¶
func BuildRuleContextFromMessage( event SubjectMessageEvent, messageFactory func() any, stateLoader func(key string) (any, uint64, error), stateKeyFunc func(msg any) string, ) (*RuleContext, error)
BuildRuleContextFromMessage builds a RuleContext from a subject message event. If stateLoader is provided, it loads state from KV using the extracted key.
func BuildRuleContextFromMessageWithKV ¶
func BuildRuleContextFromMessageWithKV( ctx context.Context, event SubjectMessageEvent, messageFactory func() any, bucket jetstream.KeyValue, stateFactory func() any, stateKeyFunc func(msg any) string, ) (*RuleContext, error)
BuildRuleContextFromMessageWithKV builds a RuleContext from a message with KV state lookup. This is the combined message+state trigger pattern.
type RuleDef ¶
type RuleDef struct {
// ID is the unique rule identifier within a workflow.
ID string
// Trigger defines what causes this rule to evaluate.
Trigger TriggerSource
// Conditions that must all be true for the rule to fire.
// Evaluated against RuleContext which provides typed access to
// both KV state and triggering message.
Conditions []Condition
// Logic determines how conditions are combined ("and" or "or").
// Default is "and" (all conditions must be true).
Logic string
// Action to perform when conditions are met.
Action Action
// Cooldown prevents re-firing within this duration.
Cooldown time.Duration
// MaxFirings limits how many times this rule can fire per execution (0 = unlimited).
MaxFirings int
}
RuleDef defines a reactive rule. This is the Go-native equivalent of the current JSON workflow step definitions.
type RuleWithWorkflow ¶
type RuleWithWorkflow struct {
Rule *RuleDef
Definition *Definition
}
RuleWithWorkflow pairs a rule with its parent workflow definition.
type StateAccessor ¶
type StateAccessor interface {
GetExecutionState() *ExecutionState
}
StateAccessor provides a standard interface for accessing the embedded ExecutionState from custom workflow state structs. Implementing this interface allows the engine to access and modify the base execution state without reflection.
Example implementation:
type PlanReviewState struct {
reactive.ExecutionState
PlanContent *PlanContent
}
func (s *PlanReviewState) GetExecutionState() *reactive.ExecutionState {
return &s.ExecutionState
}
type StateKeyError ¶
StateKeyError represents an error extracting the state key from a message.
func (*StateKeyError) Error ¶
func (e *StateKeyError) Error() string
Error implements the error interface.
type StateLoadError ¶
StateLoadError represents an error loading state from KV.
func (*StateLoadError) Error ¶
func (e *StateLoadError) Error() string
Error implements the error interface.
func (*StateLoadError) Unwrap ¶
func (e *StateLoadError) Unwrap() error
Unwrap returns the underlying error.
type StateMutatorFunc ¶
type StateMutatorFunc func(ctx *RuleContext, result any) error
StateMutatorFunc updates the execution state after an action completes. For sync actions (Mutate, Complete, Publish), result is nil. For async actions (PublishAsync), result is the typed callback result. The mutated state is written back to KV, potentially triggering other rules.
func ChainMutators ¶
func ChainMutators(mutators ...StateMutatorFunc) StateMutatorFunc
ChainMutators combines multiple state mutators into a single mutator. They are executed in order.
func IncrementIterationMutator ¶
func IncrementIterationMutator() StateMutatorFunc
IncrementIterationMutator creates a StateMutatorFunc that increments the iteration counter.
func PhaseTransition ¶
func PhaseTransition(newPhase string) StateMutatorFunc
PhaseTransition creates a StateMutatorFunc that updates the execution phase. This is a common pattern for workflows that progress through phases.
func SetErrorMutator ¶
func SetErrorMutator(errorMsg string) StateMutatorFunc
SetErrorMutator creates a StateMutatorFunc that sets an error on the execution state.
type StateStore ¶
type StateStore interface {
// Get retrieves an entry by key.
Get(ctx context.Context, key string) (jetstream.KeyValueEntry, error)
// Put stores a value and returns the new revision.
Put(ctx context.Context, key string, value []byte) (uint64, error)
// Update performs a CAS update with explicit revision.
// Returns ErrKVRevisionMismatch on conflict.
Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error)
}
StateStore provides read/write access to execution state in KV.
type SubjectConsumer ¶
type SubjectConsumer struct {
// contains filtered or unexported fields
}
SubjectConsumer manages JetStream subject consumers for message-triggered rules. It consumes messages from NATS subjects and triggers rule evaluation when messages arrive.
func NewSubjectConsumer ¶
func NewSubjectConsumer(logger *slog.Logger) *SubjectConsumer
NewSubjectConsumer creates a new subject consumer.
func (*SubjectConsumer) StartConsumer ¶
func (c *SubjectConsumer) StartConsumer( ctx context.Context, js jetstream.JetStream, streamName string, subject string, consumerName string, handler SubjectMessageHandler, ) error
StartConsumer starts consuming messages from a JetStream stream/subject. The handler is called for each message received.
func (*SubjectConsumer) StopAll ¶
func (c *SubjectConsumer) StopAll()
StopAll stops all active consumers. Safe to call multiple times.
func (*SubjectConsumer) StopConsumer ¶
func (c *SubjectConsumer) StopConsumer(streamName, consumerName string)
StopConsumer stops a specific consumer.
type SubjectMessageEvent ¶
type SubjectMessageEvent struct {
// Subject is the NATS subject the message was received on.
Subject string
// Data is the raw message payload.
Data []byte
// Timestamp is when the message was received.
Timestamp time.Time
// StreamSequence is the sequence number in the stream.
StreamSequence uint64
// DeliveryCount is how many times this message has been delivered.
DeliveryCount uint64
}
SubjectMessageEvent represents a message received on a subject.
type SubjectMessageHandler ¶
type SubjectMessageHandler func(ctx context.Context, event SubjectMessageEvent, msg jetstream.Msg)
SubjectMessageHandler is called when a message arrives on a subject. The handler should call msg.Ack(), msg.Nak(), or msg.Term() to acknowledge the message.
type TimelineEntry ¶
type TimelineEntry struct {
Timestamp time.Time `json:"timestamp"`
RuleID string `json:"rule_id"`
TriggerMode string `json:"trigger_mode"` // "kv", "message", "message+state"
TriggerInfo string `json:"trigger_info"` // KV key or NATS subject that triggered
Action string `json:"action"`
Phase string `json:"phase_after"`
Iteration int `json:"iteration"`
}
TimelineEntry records a rule firing event.
type TriggerMode ¶
type TriggerMode int
TriggerMode indicates which reactive primitive(s) a trigger uses.
const ( // TriggerInvalid indicates an invalid or unconfigured trigger. TriggerInvalid TriggerMode = iota // TriggerStateOnly uses KV watch only. TriggerStateOnly // TriggerMessageOnly uses subject consumer only. TriggerMessageOnly // TriggerMessageAndState uses subject consumer with KV state lookup. TriggerMessageAndState )
func (TriggerMode) String ¶
func (m TriggerMode) String() string
String returns a human-readable name for the trigger mode.
type TriggerSource ¶
type TriggerSource struct {
// WatchBucket is the KV bucket to watch for state changes.
// When set without Subject, KV changes trigger rule evaluation.
WatchBucket string `json:"watch_bucket,omitempty"`
// WatchPattern is the key pattern within the bucket (supports NATS wildcards).
WatchPattern string `json:"watch_pattern,omitempty"`
// Subject is the NATS subject to consume messages from.
// When set, message arrival triggers rule evaluation.
// Supports NATS wildcards (e.g., "workflow.callback.plan-review.>").
Subject string `json:"subject,omitempty"`
// StreamName is the JetStream stream to consume from.
// Required when Subject is set and durable consumption is needed.
// When empty with Subject set, uses Core NATS subscription (ephemeral).
StreamName string `json:"stream_name,omitempty"`
// MessageFactory creates a zero-value instance of the expected message type
// for typed deserialization. Required when Subject is set.
MessageFactory func() any `json:"-"`
// StateBucket is the KV bucket to load state from when a message triggers
// the rule. This enables "event + state" patterns where the message arrival
// is the trigger but conditions evaluate against both the message and the
// accumulated KV state.
//
// When Subject is set AND StateBucket is set:
// - Message arrival triggers the rule
// - Engine loads state from StateBucket using StateKeyFunc
// - RuleContext contains both .Message and .State
//
// When only WatchBucket is set (no Subject):
// - KV change triggers the rule
// - RuleContext contains .State only (.Message is nil)
//
// When only Subject is set (no StateBucket or WatchBucket):
// - Message arrival triggers the rule
// - RuleContext contains .Message only (.State is nil)
StateBucket string `json:"state_bucket,omitempty"`
// StateKeyFunc extracts the KV key from the triggering message to load state.
// Required when Subject + StateBucket are both set.
// Example: func(msg any) string {
// return "plan-review." + msg.(*MyCallbackResult).TaskID
// }
StateKeyFunc func(msg any) string `json:"-"`
}
TriggerSource defines what causes a rule to evaluate. A rule can watch KV state, consume stream messages, or both. When both are configured, the message arrival is the trigger and KV state is loaded for condition evaluation — this enables "event + state" patterns in a single rule.
func (*TriggerSource) Mode ¶
func (t *TriggerSource) Mode() TriggerMode
Mode returns which reactive primitive(s) this trigger uses.
func (*TriggerSource) Validate ¶
func (t *TriggerSource) Validate() error
Validate checks if the trigger source is properly configured.
type UnmarshalError ¶
UnmarshalError represents an error unmarshaling KV state.
func (*UnmarshalError) Error ¶
func (e *UnmarshalError) Error() string
Error implements the error interface.
func (*UnmarshalError) Unwrap ¶
func (e *UnmarshalError) Unwrap() error
Unwrap returns the underlying error.
type ValidationError ¶
ValidationError represents a validation error for a specific field.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
Error implements the error interface.
type WatchError ¶
WatchError represents an error starting a KV watch.
func (*WatchError) Error ¶
func (e *WatchError) Error() string
Error implements the error interface.
func (*WatchError) Unwrap ¶
func (e *WatchError) Unwrap() error
Unwrap returns the underlying error.
type WatchHandler ¶
type WatchHandler func(ctx context.Context, event KVWatchEvent)
WatchHandler is called when a KV entry is updated. The handler should return quickly; long-running operations should be done asynchronously.
type WorkflowBuilder ¶
type WorkflowBuilder struct {
// contains filtered or unexported fields
}
WorkflowBuilder provides a fluent API for building workflow definitions.
func NewWorkflow ¶
func NewWorkflow(id string) *WorkflowBuilder
NewWorkflow starts building a new workflow definition.
func (*WorkflowBuilder) AddRule ¶
func (b *WorkflowBuilder) AddRule(rule RuleDef) *WorkflowBuilder
AddRule adds a rule to the workflow.
func (*WorkflowBuilder) AddRuleFromBuilder ¶
func (b *WorkflowBuilder) AddRuleFromBuilder(rb *RuleBuilder) *WorkflowBuilder
AddRuleFromBuilder adds a rule built using RuleBuilder.
func (*WorkflowBuilder) Build ¶
func (b *WorkflowBuilder) Build() (*Definition, error)
Build validates and returns the workflow definition.
func (*WorkflowBuilder) MustBuild ¶
func (b *WorkflowBuilder) MustBuild() *Definition
MustBuild validates and returns the workflow definition, panicking on error. Use this only in tests or during application initialization.
func (*WorkflowBuilder) WithDescription ¶
func (b *WorkflowBuilder) WithDescription(desc string) *WorkflowBuilder
WithDescription sets the workflow description.
func (*WorkflowBuilder) WithEvents ¶
func (b *WorkflowBuilder) WithEvents(events EventConfig) *WorkflowBuilder
WithEvents sets the event configuration for workflow lifecycle events.
func (*WorkflowBuilder) WithMaxIterations ¶
func (b *WorkflowBuilder) WithMaxIterations(n int) *WorkflowBuilder
WithMaxIterations sets the maximum loop iterations for this workflow.
func (*WorkflowBuilder) WithOnComplete ¶
func (b *WorkflowBuilder) WithOnComplete(subject string) *WorkflowBuilder
WithOnComplete sets the subject to publish when the workflow completes successfully.
func (*WorkflowBuilder) WithOnEscalate ¶
func (b *WorkflowBuilder) WithOnEscalate(subject string) *WorkflowBuilder
WithOnEscalate sets the subject to publish when the workflow is escalated.
func (*WorkflowBuilder) WithOnFail ¶
func (b *WorkflowBuilder) WithOnFail(subject string) *WorkflowBuilder
WithOnFail sets the subject to publish when the workflow fails.
func (*WorkflowBuilder) WithStateBucket ¶
func (b *WorkflowBuilder) WithStateBucket(bucket string) *WorkflowBuilder
WithStateBucket sets the KV bucket for storing execution state.
func (*WorkflowBuilder) WithStateFactory ¶
func (b *WorkflowBuilder) WithStateFactory(factory func() any) *WorkflowBuilder
WithStateFactory sets the factory function for creating typed state instances. The factory must return a pointer to a struct that embeds ExecutionState.
func (*WorkflowBuilder) WithTimeout ¶
func (b *WorkflowBuilder) WithTimeout(d time.Duration) *WorkflowBuilder
WithTimeout sets the maximum execution duration for this workflow.
type WorkflowCompletionEvent ¶
type WorkflowCompletionEvent struct {
ExecutionID string `json:"execution_id"`
WorkflowID string `json:"workflow_id"`
Status string `json:"status"`
Phase string `json:"final_phase"`
CompletedAt time.Time `json:"completed_at"`
}
WorkflowCompletionEvent is published when a workflow completes successfully.
func (*WorkflowCompletionEvent) MarshalJSON ¶
func (e *WorkflowCompletionEvent) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler.
func (*WorkflowCompletionEvent) Schema ¶
func (e *WorkflowCompletionEvent) Schema() message.Type
Schema returns the message type for WorkflowCompletionEvent.
func (*WorkflowCompletionEvent) UnmarshalJSON ¶
func (e *WorkflowCompletionEvent) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler.
func (*WorkflowCompletionEvent) Validate ¶
func (e *WorkflowCompletionEvent) Validate() error
Validate validates the completion event.
type WorkflowEscalationEvent ¶
type WorkflowEscalationEvent struct {
ExecutionID string `json:"execution_id"`
WorkflowID string `json:"workflow_id"`
Phase string `json:"phase"`
Reason string `json:"reason"`
EscalatedAt time.Time `json:"escalated_at"`
}
WorkflowEscalationEvent is published when a workflow is escalated.
func (*WorkflowEscalationEvent) MarshalJSON ¶
func (e *WorkflowEscalationEvent) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler.
func (*WorkflowEscalationEvent) Schema ¶
func (e *WorkflowEscalationEvent) Schema() message.Type
Schema returns the message type for WorkflowEscalationEvent.
func (*WorkflowEscalationEvent) UnmarshalJSON ¶
func (e *WorkflowEscalationEvent) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler.
func (*WorkflowEscalationEvent) Validate ¶
func (e *WorkflowEscalationEvent) Validate() error
Validate validates the escalation event.
type WorkflowFailureEvent ¶
type WorkflowFailureEvent struct {
ExecutionID string `json:"execution_id"`
WorkflowID string `json:"workflow_id"`
Phase string `json:"phase"`
Error string `json:"error"`
FailedAt time.Time `json:"failed_at"`
}
WorkflowFailureEvent is published when a workflow fails.
func (*WorkflowFailureEvent) MarshalJSON ¶
func (e *WorkflowFailureEvent) MarshalJSON() ([]byte, error)
MarshalJSON implements json.Marshaler.
func (*WorkflowFailureEvent) Schema ¶
func (e *WorkflowFailureEvent) Schema() message.Type
Schema returns the message type for WorkflowFailureEvent.
func (*WorkflowFailureEvent) UnmarshalJSON ¶
func (e *WorkflowFailureEvent) UnmarshalJSON(data []byte) error
UnmarshalJSON implements json.Unmarshaler.
func (*WorkflowFailureEvent) Validate ¶
func (e *WorkflowFailureEvent) Validate() error
Validate validates the failure event.
type WorkflowRegistry ¶
type WorkflowRegistry struct {
// contains filtered or unexported fields
}
WorkflowRegistry manages workflow definitions and their associated types. It provides thread-safe registration and lookup of workflows.
func NewWorkflowRegistry ¶
func NewWorkflowRegistry(logger *slog.Logger) *WorkflowRegistry
NewWorkflowRegistry creates a new workflow registry.
func (*WorkflowRegistry) Clear ¶
func (r *WorkflowRegistry) Clear()
Clear removes all registered workflows and result types. Used primarily for testing.
func (*WorkflowRegistry) Count ¶
func (r *WorkflowRegistry) Count() int
Count returns the number of registered workflows.
func (*WorkflowRegistry) Get ¶
func (r *WorkflowRegistry) Get(workflowID string) *Definition
Get returns a workflow definition by ID. Returns nil if not found.
func (*WorkflowRegistry) GetAll ¶
func (r *WorkflowRegistry) GetAll() []*Definition
GetAll returns all registered workflow definitions.
func (*WorkflowRegistry) GetResultTypeFactory ¶
func (r *WorkflowRegistry) GetResultTypeFactory(typeKey string) func() message.Payload
GetResultTypeFactory returns the factory function for a result type. Returns nil if not found.
func (*WorkflowRegistry) GetRulesForSubject ¶
func (r *WorkflowRegistry) GetRulesForSubject(subject string) []*RuleWithWorkflow
GetRulesForSubject returns all rules that trigger on the given subject pattern.
func (*WorkflowRegistry) GetRulesForTrigger ¶
func (r *WorkflowRegistry) GetRulesForTrigger(triggerMode TriggerMode, bucket, _ string) []*RuleWithWorkflow
GetRulesForTrigger returns all rules across all workflows that match the given trigger. This is used for routing incoming events to the appropriate rules. Note: keyPattern is reserved for future pattern-based filtering.
func (*WorkflowRegistry) List ¶
func (r *WorkflowRegistry) List() []string
List returns all registered workflow IDs.
func (*WorkflowRegistry) Register ¶
func (r *WorkflowRegistry) Register(def *Definition) error
Register registers a workflow definition. Returns an error if the workflow ID is already registered.
func (*WorkflowRegistry) RegisterResultType ¶
func (r *WorkflowRegistry) RegisterResultType(typeKey string, factory func() message.Payload) error
RegisterResultType registers a factory function for deserializing callback results. The typeKey should be in format "domain.category.version".
func (*WorkflowRegistry) Unregister ¶
func (r *WorkflowRegistry) Unregister(workflowID string)
Unregister removes a workflow definition.
func (*WorkflowRegistry) ValidationSummary ¶
func (r *WorkflowRegistry) ValidationSummary() string
ValidationSummary returns a summary of all registered workflows for debugging.