reactive

package
v1.0.0-alpha.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package reactive provides a reactive workflow engine built on rules engine primitives.

The reactive workflow engine replaces imperative step sequencing with typed rules that react to KV state changes and NATS messages. Workflows are defined in Go code, enabling compile-time type checking of all data flows.

Key concepts:

  • TriggerSource: unified reactive primitive (KV watch, subject consumer, or both)
  • RuleContext: provides typed access to execution state and triggering message
  • ConditionFunc: typed condition evaluation against RuleContext
  • StateMutatorFunc: typed state mutation after action completion

See ADR-021 for architectural details.

Index

Constants

This section is empty.

Variables

View Source
var ConditionHelpers = struct {
	// PhaseIs returns a condition that checks if the execution phase matches.
	PhaseIs func(phase string) ConditionFunc

	// PhaseIn returns a condition that checks if the execution phase is in the list.
	PhaseIn func(phases ...string) ConditionFunc

	// StatusIs returns a condition that checks if the execution status matches.
	StatusIs func(status ExecutionStatus) ConditionFunc

	// IterationLessThan returns a condition that checks if iteration is below limit.
	IterationLessThan func(max int) ConditionFunc

	// IterationEquals returns a condition that checks if iteration equals value.
	IterationEquals func(n int) ConditionFunc

	// HasError returns a condition that checks if execution has an error.
	HasError func() ConditionFunc

	// NoError returns a condition that checks if execution has no error.
	NoError func() ConditionFunc

	// IsWaiting returns a condition that checks if execution is waiting for callback.
	IsWaiting func() ConditionFunc

	// NotWaiting returns a condition that checks if execution is not waiting.
	NotWaiting func() ConditionFunc
}{
	PhaseIs: func(phase string) ConditionFunc {
		return func(ctx *RuleContext) bool {
			if ctx.State == nil {
				return false
			}
			base := GetExecutionState(ctx.State)
			return base != nil && base.Phase == phase
		}
	},

	PhaseIn: func(phases ...string) ConditionFunc {
		return func(ctx *RuleContext) bool {
			if ctx.State == nil {
				return false
			}
			base := GetExecutionState(ctx.State)
			if base == nil {
				return false
			}
			for _, p := range phases {
				if base.Phase == p {
					return true
				}
			}
			return false
		}
	},

	StatusIs: func(status ExecutionStatus) ConditionFunc {
		return func(ctx *RuleContext) bool {
			if ctx.State == nil {
				return false
			}
			base := GetExecutionState(ctx.State)
			return base != nil && base.Status == status
		}
	},

	IterationLessThan: func(max int) ConditionFunc {
		return func(ctx *RuleContext) bool {
			if ctx.State == nil {
				return false
			}
			base := GetExecutionState(ctx.State)
			return base != nil && base.Iteration < max
		}
	},

	IterationEquals: func(n int) ConditionFunc {
		return func(ctx *RuleContext) bool {
			if ctx.State == nil {
				return false
			}
			base := GetExecutionState(ctx.State)
			return base != nil && base.Iteration == n
		}
	},

	HasError: func() ConditionFunc {
		return func(ctx *RuleContext) bool {
			if ctx.State == nil {
				return false
			}
			base := GetExecutionState(ctx.State)
			return base != nil && base.Error != ""
		}
	},

	NoError: func() ConditionFunc {
		return func(ctx *RuleContext) bool {
			if ctx.State == nil {
				return true
			}
			base := GetExecutionState(ctx.State)
			return base == nil || base.Error == ""
		}
	},

	IsWaiting: func() ConditionFunc {
		return func(ctx *RuleContext) bool {
			if ctx.State == nil {
				return false
			}
			base := GetExecutionState(ctx.State)
			return base != nil && base.Status == StatusWaiting
		}
	},

	NotWaiting: func() ConditionFunc {
		return func(ctx *RuleContext) bool {
			if ctx.State == nil {
				return true
			}
			base := GetExecutionState(ctx.State)
			return base == nil || base.Status != StatusWaiting
		}
	},
}

ConditionHelpers provides common condition functions for use in rule definitions.

Functions

func CanProceed

func CanProceed(state any) bool

CanProceed returns true if the execution can proceed (not terminal, not waiting, not expired).

func ClearError

func ClearError(state any)

ClearError clears the error message from the execution state.

func ClearPendingTask

func ClearPendingTask(state any)

ClearPendingTask clears the pending task after callback received.

func CompleteExecution

func CompleteExecution(state any, phase string)

CompleteExecution marks the execution as completed successfully.

func EscalateExecution

func EscalateExecution(state any, reason string)

EscalateExecution marks the execution as escalated.

func FailExecution

func FailExecution(state any, errMsg string)

FailExecution marks the execution as failed.

func GetID

func GetID(state any) string

GetID returns the execution ID.

func GetIteration

func GetIteration(state any) int

GetIteration returns the current iteration count.

func GetPhase

func GetPhase(state any) string

GetPhase returns the current phase of the execution.

func GetWorkflowID

func GetWorkflowID(state any) string

GetWorkflowID returns the workflow ID.

func IncrementIteration

func IncrementIteration(state any) int

IncrementIteration increments the iteration counter.

func InitializeExecution

func InitializeExecution(state any, id, workflowID string, timeout time.Duration)

InitializeExecution initializes a new execution state.

func IsExpired

func IsExpired(state any) bool

IsExpired checks if the execution has exceeded its deadline.

func IsTerminal

func IsTerminal(state any) bool

IsTerminal returns true if the execution is in a terminal state.

func IsTerminalStatus

func IsTerminalStatus(status ExecutionStatus) bool

IsTerminalStatus returns true if the status is a terminal state.

func MatchesPattern

func MatchesPattern(key, pattern string) bool

MatchesPattern checks if a key matches a NATS-style wildcard pattern. Supports * (single token) and > (multi-token suffix) wildcards.

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new reactive workflow engine component.

func RecordTimelineEntry

func RecordTimelineEntry(state any, entry TimelineEntry)

RecordTimelineEntry adds a timeline entry to the execution state.

func Register

func Register(registry *component.Registry) error

Register registers the reactive workflow engine component factory with the registry.

func SetDeadline

func SetDeadline(state any, deadline time.Time)

SetDeadline sets the execution deadline.

func SetError

func SetError(state any, errMsg string)

SetError sets the error message on the execution state.

func SetIteration

func SetIteration(state any, iteration int)

SetIteration sets the iteration counter to a specific value.

func SetPendingTask

func SetPendingTask(state any, taskID string, ruleID string)

SetPendingTask marks the execution as waiting for an async callback.

func SetPhase

func SetPhase(state any, phase string, ruleID string, triggerMode TriggerMode, triggerInfo string, action string)

SetPhase updates the execution phase and records a timeline entry.

func SetStatus

func SetStatus(state any, status ExecutionStatus)

SetStatus updates the execution status.

func TimeoutExecution

func TimeoutExecution(state any)

TimeoutExecution marks the execution as timed out.

Types

type Action

type Action struct {
	// Type determines the action behavior.
	Type ActionType

	// PublishSubject for publish/publish_async actions.
	PublishSubject string

	// ExpectedResultType is the registered payload type name for async results.
	// Required for ActionPublishAsync so the engine can deserialize the callback.
	// Example: "workflow.planner-result.v1"
	ExpectedResultType string

	// BuildPayload constructs the typed payload from the rule context.
	// This replaces string interpolation entirely — it's a Go function that reads
	// typed fields from state and/or message and produces a typed payload.
	// Required for ActionPublish and ActionPublishAsync.
	BuildPayload PayloadBuilderFunc

	// MutateState updates the execution entity after the action completes.
	// For sync actions, called immediately with result=nil.
	// For async actions, called when the callback arrives with the typed result.
	// The mutated state is written back to KV, which may trigger other rules.
	MutateState StateMutatorFunc
}

Action defines what happens when a rule fires.

func (*Action) Validate

func (a *Action) Validate() error

Validate checks if the action is properly configured.

type ActionType

type ActionType int

ActionType determines the action behavior.

const (
	// ActionPublishAsync publishes to NATS with callback tracking. The engine
	// parks the execution until the callback arrives, then calls MutateState.
	ActionPublishAsync ActionType = iota

	// ActionPublish publishes to NATS fire-and-forget, then calls MutateState immediately.
	ActionPublish

	// ActionMutate updates KV state without publishing. The state change may
	// trigger other rules via KV watch.
	ActionMutate

	// ActionComplete marks the execution as completed and publishes terminal events.
	ActionComplete
)

func (ActionType) String

func (a ActionType) String() string

String returns a human-readable name for the action type.

type CallbackFields

type CallbackFields struct {
	// TaskID uniquely identifies this async task.
	TaskID string `json:"task_id"`

	// CallbackSubject is where the executor should publish the result.
	CallbackSubject string `json:"callback_subject"`

	// ExecutionID is the workflow execution ID for direct lookup.
	ExecutionID string `json:"execution_id"`
}

CallbackFields are injected into async task payloads for callback correlation. Used by the reactive workflow engine to track async step execution.

type CallbackInjectable

type CallbackInjectable interface {
	InjectCallback(fields CallbackFields)
}

CallbackInjectable is implemented by payloads that can receive callback fields.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the reactive workflow engine as a discoverable component.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics. TODO: Integrate with EngineMetrics to return real-time flow data.

func (*Component) DebugStatus

func (c *Component) DebugStatus() any

DebugStatus returns extended debug information for the component.

func (*Component) Engine

func (c *Component) Engine() *Engine

Engine returns the underlying workflow engine. This allows callers to register workflows and interact with the engine directly.

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns current health status.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component for starting.

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns input port definitions.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns output port definitions.

func (*Component) RegisterWorkflow

func (c *Component) RegisterWorkflow(def *Definition) error

RegisterWorkflow registers a workflow definition with the engine. Workflows can be registered before or after Start() is called. Registering before Start() is recommended so triggers are active from the beginning.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start starts the component.

func (*Component) Stop

func (c *Component) Stop(_ time.Duration) error

Stop stops the component within the given timeout. The Engine.Stop() method performs synchronous cleanup (cancels context, stops tickers, stops watchers/consumers), so the timeout parameter is reserved for future async cleanup operations if needed.

type Condition

type Condition struct {
	// Description is human-readable (for logging/debugging).
	Description string

	// Evaluate is the typed condition function.
	Evaluate ConditionFunc
}

Condition is evaluated against a RuleContext using a ConditionFunc. No more string-based "${steps.reviewer.verdict} == 'approved'" comparisons.

type ConditionFunc

type ConditionFunc func(ctx *RuleContext) bool

ConditionFunc evaluates a condition against a RuleContext. Returns true if the condition is met. Implementations use type assertions to access typed fields on State and/or Message.

func Always

func Always() ConditionFunc

Always returns a condition that always evaluates to true.

func And

func And(conditions ...ConditionFunc) ConditionFunc

And combines multiple conditions with logical AND.

func HasMessage

func HasMessage() ConditionFunc

HasMessage returns a condition that checks if a message is present.

func HasState

func HasState() ConditionFunc

HasState returns a condition that checks if state is present.

func Never

func Never() ConditionFunc

Never returns a condition that always evaluates to false.

func Not

func Not(condition ConditionFunc) ConditionFunc

Not negates a condition.

func Or

func Or(conditions ...ConditionFunc) ConditionFunc

Or combines multiple conditions with logical OR.

func PhaseIs

func PhaseIs(phase string) ConditionFunc

PhaseIs returns a condition that checks if the execution phase matches.

func StatusIs

func StatusIs(status ExecutionStatus) ConditionFunc

StatusIs returns a condition that checks if the execution status matches.

type ConditionResult

type ConditionResult struct {
	// Description is the condition's description.
	Description string

	// Passed indicates whether the condition evaluated to true.
	Passed bool
}

ConditionResult contains the result of evaluating a single condition.

type Config

type Config struct {
	// Port configuration for inputs and outputs
	Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration for workflow inputs and outputs,category:basic"`

	// StateBucket is the KV bucket for workflow execution state
	StateBucket string `` /* 155-byte string literal not displayed */

	// CallbackStreamName is the JetStream stream for callback messages
	CallbackStreamName string `` /* 158-byte string literal not displayed */

	// EventStreamName is the JetStream stream for workflow events
	EventStreamName string `` /* 150-byte string literal not displayed */

	// DefaultTimeout is the default timeout for workflows without explicit timeout
	DefaultTimeout string `json:"default_timeout" schema:"type:string,description:Default timeout for workflows (e.g. 10m),default:10m,category:basic"`

	// DefaultMaxIterations is the default max iterations for loop workflows
	DefaultMaxIterations int `` /* 141-byte string literal not displayed */

	// CleanupRetention is how long to retain completed executions
	CleanupRetention string `` /* 142-byte string literal not displayed */

	// CleanupInterval is how often to run cleanup
	CleanupInterval string `` /* 134-byte string literal not displayed */

	// TaskTimeoutDefault is the default timeout for async tasks
	TaskTimeoutDefault string `json:"task_timeout_default" schema:"type:string,description:Default timeout for async tasks,default:5m,category:advanced"`

	// ConsumerNamePrefix is prepended to consumer names for uniqueness
	ConsumerNamePrefix string `json:"consumer_name_prefix,omitempty" schema:"type:string,description:Prefix for NATS consumer names,category:advanced"`

	// EnableMetrics enables Prometheus metrics
	EnableMetrics bool `json:"enable_metrics" schema:"type:bool,description:Enable Prometheus metrics,default:true,category:advanced"`
}

Config represents the configuration for the reactive workflow engine.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration.

func (Config) GetCleanupInterval

func (c Config) GetCleanupInterval() time.Duration

GetCleanupInterval returns the parsed cleanup interval duration.

func (Config) GetCleanupRetention

func (c Config) GetCleanupRetention() time.Duration

GetCleanupRetention returns the parsed cleanup retention duration.

func (Config) GetDefaultTimeout

func (c Config) GetDefaultTimeout() time.Duration

GetDefaultTimeout returns the parsed default timeout duration.

func (Config) GetTaskTimeoutDefault

func (c Config) GetTaskTimeoutDefault() time.Duration

GetTaskTimeoutDefault returns the parsed task timeout duration.

func (Config) Validate

func (c Config) Validate() error

Validate validates the configuration.

type ConsumerError

type ConsumerError struct {
	Stream   string
	Consumer string
	Op       string
	Cause    error
}

ConsumerError represents an error with a subject consumer.

func (*ConsumerError) Error

func (e *ConsumerError) Error() string

Error implements the error interface.

func (*ConsumerError) Unwrap

func (e *ConsumerError) Unwrap() error

Unwrap returns the underlying error.

type Definition

type Definition struct {
	// ID is the unique workflow identifier.
	ID string

	// Description is a human-readable description.
	Description string

	// StateBucket is the KV bucket for storing execution state.
	StateBucket string

	// StateFactory creates a zero-value instance of the workflow's concrete state type.
	// Must return a pointer to a struct that embeds ExecutionState.
	StateFactory func() any

	// MaxIterations limits loop iterations (0 = unlimited).
	MaxIterations int

	// Timeout is the maximum execution duration.
	Timeout time.Duration

	// Rules defines the reactive rules for this workflow.
	// Rules are evaluated in order; the first matching rule fires.
	Rules []RuleDef

	// Events defines typed event subjects for workflow lifecycle events.
	Events EventConfig
}

Definition defines a reactive workflow as Go code.

func (*Definition) Validate

func (d *Definition) Validate() error

Validate checks if the workflow definition is properly configured.

type DispatchError

type DispatchError struct {
	Action  string
	Subject string
	Message string
	Cause   error
}

DispatchError represents an error during action dispatch.

func (*DispatchError) Error

func (e *DispatchError) Error() string

Error implements the error interface.

func (*DispatchError) Unwrap

func (e *DispatchError) Unwrap() error

Unwrap returns the underlying error.

type DispatchResult

type DispatchResult struct {
	// TaskID is set for async actions to correlate callbacks.
	TaskID string

	// NewRevision is the KV revision after state mutation.
	NewRevision uint64

	// Published indicates if a message was published.
	Published bool

	// StateUpdated indicates if state was written to KV.
	StateUpdated bool

	// PartialFailure indicates the action partially succeeded but had errors.
	// For example, publish succeeded but state write failed.
	PartialFailure bool

	// PartialError contains the error message for partial failures.
	PartialError string
}

DispatchResult contains the result of dispatching an action.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher executes actions for reactive workflow rules. It handles NATS publishing, KV state mutations, and async callback tracking.

func NewDispatcher

func NewDispatcher(logger *slog.Logger, opts ...DispatcherOption) *Dispatcher

NewDispatcher creates a new action dispatcher.

func (*Dispatcher) DispatchAction

func (d *Dispatcher) DispatchAction(
	ctx context.Context,
	ruleCtx *RuleContext,
	rule *RuleDef,
	def *Definition,
) (*DispatchResult, error)

DispatchAction executes the action defined in a rule. The ctx contains both the triggering data and the current state. Returns the dispatch result and any error that occurred.

func (*Dispatcher) HandleCallback

func (d *Dispatcher) HandleCallback(
	ctx context.Context,
	ruleCtx *RuleContext,
	rule *RuleDef,
	result any,
	def *Definition,
) (*DispatchResult, error)

HandleCallback processes an async callback result. It looks up the execution by task ID, deserializes the result, invokes the state mutator, and writes the updated state to KV.

func (*Dispatcher) HandleEscalation

func (d *Dispatcher) HandleEscalation(
	ctx context.Context,
	ruleCtx *RuleContext,
	reason string,
	def *Definition,
) (*DispatchResult, error)

HandleEscalation records an escalation and optionally publishes escalation event.

func (*Dispatcher) HandleFailure

func (d *Dispatcher) HandleFailure(
	ctx context.Context,
	ruleCtx *RuleContext,
	errMsg string,
	def *Definition,
) (*DispatchResult, error)

HandleFailure records a failure and optionally publishes failure event.

type DispatcherOption

type DispatcherOption func(*Dispatcher)

DispatcherOption configures a Dispatcher.

func WithKVWatcher

func WithKVWatcher(w *KVWatcher) DispatcherOption

WithKVWatcher sets the KV watcher for feedback loop prevention.

func WithPublisher

func WithPublisher(p Publisher) DispatcherOption

WithPublisher sets the NATS publisher for the dispatcher.

func WithSource

func WithSource(source string) DispatcherOption

WithSource sets the source identifier for published messages.

func WithStateStore

func WithStateStore(s StateStore) DispatcherOption

WithStateStore sets the KV state store for the dispatcher.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine orchestrates the reactive workflow engine. It manages all sub-components: triggers, evaluator, and dispatcher.

func NewEngine

func NewEngine(
	config Config,
	natsClient *natsclient.Client,
	opts ...EngineOption,
) *Engine

NewEngine creates a new reactive workflow engine.

func (*Engine) Initialize

func (e *Engine) Initialize(ctx context.Context) error

Initialize prepares the engine for starting. Call this after registering workflows but before Start.

func (*Engine) IsRunning

func (e *Engine) IsRunning() bool

IsRunning returns whether the engine is running.

func (*Engine) RegisterWorkflow

func (e *Engine) RegisterWorkflow(def *Definition) error

RegisterWorkflow registers a workflow and starts its triggers if the engine is running. This enables registering workflows after the engine has started.

func (*Engine) Registry

func (e *Engine) Registry() *WorkflowRegistry

Registry returns the workflow registry.

func (*Engine) Start

func (e *Engine) Start(ctx context.Context) error

Start starts the engine and all trigger loops.

func (*Engine) StateBucket

func (e *Engine) StateBucket() string

StateBucket returns the configured KV bucket name for workflow state.

func (*Engine) Stop

func (e *Engine) Stop()

Stop stops the engine and all trigger loops.

func (*Engine) Uptime

func (e *Engine) Uptime() time.Duration

Uptime returns how long the engine has been running.

type EngineError

type EngineError struct {
	Op      string
	Message string
	Cause   error
}

EngineError represents an error from the engine.

func (*EngineError) Error

func (e *EngineError) Error() string

Error implements the error interface.

func (*EngineError) Unwrap

func (e *EngineError) Unwrap() error

Unwrap returns the underlying error.

type EngineMetrics

type EngineMetrics struct {
	// contains filtered or unexported fields
}

EngineMetrics holds Prometheus metrics for the reactive workflow engine. It implements MetricsRecorder interface.

func GetMetrics

func GetMetrics(registry *metric.MetricsRegistry) *EngineMetrics

GetMetrics returns the singleton metrics instance, creating and registering it if needed.

func (*EngineMetrics) DecrementPendingCallbacks

func (m *EngineMetrics) DecrementPendingCallbacks()

DecrementPendingCallbacks decrements the pending callbacks counter.

func (*EngineMetrics) IncrementPendingCallbacks

func (m *EngineMetrics) IncrementPendingCallbacks()

IncrementPendingCallbacks increments the pending callbacks counter.

func (*EngineMetrics) RecordActionDispatch

func (m *EngineMetrics) RecordActionDispatch(workflowID, ruleID, actionType string)

RecordActionDispatch records an action dispatch (implements MetricsRecorder).

func (*EngineMetrics) RecordCallbackReceived

func (m *EngineMetrics) RecordCallbackReceived(workflowID, ruleID, status string, latencySeconds float64)

RecordCallbackReceived records receiving an async callback.

func (*EngineMetrics) RecordExecutionCompleted

func (m *EngineMetrics) RecordExecutionCompleted(workflowID string, durationSeconds float64)

RecordExecutionCompleted records a successful execution completion.

func (*EngineMetrics) RecordExecutionCreated

func (m *EngineMetrics) RecordExecutionCreated(workflowID string)

RecordExecutionCreated records a new execution being created (implements MetricsRecorder).

func (*EngineMetrics) RecordExecutionEscalated

func (m *EngineMetrics) RecordExecutionEscalated(workflowID, reason string, durationSeconds float64)

RecordExecutionEscalated records an execution that was escalated.

func (*EngineMetrics) RecordExecutionFailed

func (m *EngineMetrics) RecordExecutionFailed(workflowID, reason string, durationSeconds float64)

RecordExecutionFailed records a failed execution.

func (*EngineMetrics) RecordExecutionTimedOut

func (m *EngineMetrics) RecordExecutionTimedOut(workflowID string, durationSeconds float64)

RecordExecutionTimedOut records an execution that timed out.

func (*EngineMetrics) RecordRuleDuration

func (m *EngineMetrics) RecordRuleDuration(workflowID, ruleID string, durationSeconds float64)

RecordRuleDuration records the duration of a rule evaluation and execution.

func (*EngineMetrics) RecordRuleEvaluation

func (m *EngineMetrics) RecordRuleEvaluation(workflowID, ruleID string, fired bool)

RecordRuleEvaluation records a rule evaluation (implements MetricsRecorder).

func (*EngineMetrics) SetPendingCallbacks

func (m *EngineMetrics) SetPendingCallbacks(count int)

SetPendingCallbacks sets the current number of pending callbacks.

type EngineOption

type EngineOption func(*Engine)

EngineOption configures an Engine.

func WithEngineLogger

func WithEngineLogger(logger *slog.Logger) EngineOption

WithEngineLogger sets the logger for the engine.

func WithEngineMetrics

func WithEngineMetrics(metrics MetricsRecorder) EngineOption

WithEngineMetrics sets the metrics recorder for the engine.

type EvaluationResult

type EvaluationResult struct {
	// RuleID is the ID of the rule that was evaluated.
	RuleID string

	// ShouldFire indicates whether the rule's conditions are met and it should fire.
	ShouldFire bool

	// Reason explains why the rule should or should not fire.
	Reason string

	// ConditionResults contains the result of each condition evaluation.
	ConditionResults []ConditionResult
}

EvaluationResult contains the result of evaluating a rule.

type Evaluator

type Evaluator struct {
	// contains filtered or unexported fields
}

Evaluator evaluates rules against RuleContext to determine which rules should fire. It tracks cooldowns and firing counts to enforce rule constraints.

func NewEvaluator

func NewEvaluator(logger *slog.Logger) *Evaluator

NewEvaluator creates a new rule evaluator.

func (*Evaluator) CleanupExpiredCooldowns

func (e *Evaluator) CleanupExpiredCooldowns(maxAge time.Duration) int

CleanupExpiredCooldowns removes cooldown entries older than maxAge. This prevents unbounded memory growth in long-running engines. Returns the number of entries cleaned up.

func (*Evaluator) ClearExecutionState

func (e *Evaluator) ClearExecutionState(executionID string)

ClearExecutionState clears firing counts for a completed execution. Call this when an execution completes to free memory.

func (*Evaluator) EvaluateRule

func (e *Evaluator) EvaluateRule(
	ctx *RuleContext,
	rule *RuleDef,
	workflowID string,
	executionID string,
) EvaluationResult

EvaluateRule evaluates a rule against a RuleContext and returns whether it should fire. This checks: 1. Terminal state (completed/failed/escalated/timed_out executions never fire) 2. Cooldown constraints 3. Max firings constraints 4. All conditions (using AND/OR logic based on rule.Logic)

func (*Evaluator) EvaluateRules

func (e *Evaluator) EvaluateRules(
	ctx *RuleContext,
	def *Definition,
	executionID string,
) (*RuleDef, *EvaluationResult)

EvaluateRules evaluates all rules in a workflow definition against a RuleContext. Returns the first rule that should fire, or nil if no rule matches. Rules are evaluated in definition order for deterministic behavior.

func (*Evaluator) RecordFiring

func (e *Evaluator) RecordFiring(workflowID, executionID, ruleID string)

RecordFiring records that a rule has fired, updating cooldown and firing count.

type EventConfig

type EventConfig struct {
	// OnComplete is the subject to publish when the workflow completes successfully.
	OnComplete string

	// OnFail is the subject to publish when the workflow fails.
	OnFail string

	// OnEscalate is the subject to publish when the workflow is escalated.
	OnEscalate string
}

EventConfig defines typed event subjects for external consumers.

type ExecutionState

type ExecutionState struct {
	// ID is the unique execution identifier (KV key).
	ID string `json:"id"`

	// WorkflowID references the workflow definition.
	WorkflowID string `json:"workflow_id"`

	// Phase is the current execution phase (typed enum per workflow).
	Phase string `json:"phase"`

	// Iteration tracks retry/loop count.
	Iteration int `json:"iteration"`

	// Status is the overall execution status.
	Status ExecutionStatus `json:"status"`

	// Error holds the last error message if any.
	Error string `json:"error,omitempty"`

	// PendingTaskID is set when waiting for an async callback.
	PendingTaskID string `json:"pending_task_id,omitempty"`

	// PendingRuleID identifies which rule is awaiting a callback.
	PendingRuleID string `json:"pending_rule_id,omitempty"`

	// Timestamps
	CreatedAt   time.Time  `json:"created_at"`
	UpdatedAt   time.Time  `json:"updated_at"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`

	// Deadline is the absolute time when this execution times out.
	Deadline *time.Time `json:"deadline,omitempty"`

	// Timeline records rule firings for debugging.
	Timeline []TimelineEntry `json:"timeline,omitempty"`
}

ExecutionState represents the typed state of a workflow execution. Each workflow definition declares its own concrete state type that embeds ExecutionState.

func ExtractExecutionState

func ExtractExecutionState(state any) *ExecutionState

ExtractExecutionState extracts the ExecutionState from a typed state struct. It first checks if the state implements StateAccessor, then falls back to reflection. Returns nil if the state is nil or doesn't contain an ExecutionState.

func GetExecutionState

func GetExecutionState(state any) *ExecutionState

GetExecutionState extracts the embedded ExecutionState from a typed state struct. Returns nil if the state is nil or does not contain an ExecutionState.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the overall status of a workflow execution.

const (
	// StatusPending indicates the execution has been created but not started.
	StatusPending ExecutionStatus = "pending"
	// StatusRunning indicates the execution is actively processing rules.
	StatusRunning ExecutionStatus = "running"
	// StatusWaiting indicates the execution is waiting for an async callback.
	StatusWaiting ExecutionStatus = "waiting"
	// StatusCompleted indicates the execution finished successfully.
	StatusCompleted ExecutionStatus = "completed"
	// StatusFailed indicates the execution failed with an error.
	StatusFailed ExecutionStatus = "failed"
	// StatusEscalated indicates the execution was escalated (e.g., max iterations exceeded).
	StatusEscalated ExecutionStatus = "escalated"
	// StatusTimedOut indicates the execution exceeded its timeout.
	StatusTimedOut ExecutionStatus = "timed_out"
)

func GetStatus

func GetStatus(state any) ExecutionStatus

GetStatus returns the current status of the execution.

type KVOperation

type KVOperation int

KVOperation indicates the type of KV state change.

const (
	// KVOperationPut indicates a create or update.
	KVOperationPut KVOperation = iota
	// KVOperationDelete indicates a deletion.
	KVOperationDelete
)

func (KVOperation) String

func (op KVOperation) String() string

String returns a human-readable name for the operation.

type KVWatchEvent

type KVWatchEvent struct {
	// Bucket is the KV bucket name.
	Bucket string

	// Key is the KV entry key.
	Key string

	// Value is the raw KV entry value (JSON).
	Value []byte

	// Revision is the KV entry revision.
	Revision uint64

	// Operation indicates the type of change.
	Operation KVOperation

	// Timestamp is when the event was received.
	Timestamp time.Time
}

KVWatchEvent represents a KV state change event.

type KVWatcher

type KVWatcher struct {
	// contains filtered or unexported fields
}

KVWatcher manages KV watch operations for state-triggered rules. It watches one or more KV buckets for state changes and triggers rule evaluation when matching keys are updated.

func NewKVWatcher

func NewKVWatcher(logger *slog.Logger) *KVWatcher

NewKVWatcher creates a new KV watcher.

func (*KVWatcher) RecordOwnRevision

func (w *KVWatcher) RecordOwnRevision(key string, revision uint64)

RecordOwnRevision records a revision that we wrote, to prevent feedback loops. Call this after successfully writing to KV so we skip evaluation when we see our own update.

func (*KVWatcher) StartWatch

func (w *KVWatcher) StartWatch(
	ctx context.Context,
	bucket jetstream.KeyValue,
	pattern string,
	handler WatchHandler,
) error

StartWatch starts watching a KV bucket for changes matching the pattern. Multiple handlers can be registered for the same bucket+pattern; all will be called when a matching update occurs.

func (*KVWatcher) StopAll

func (w *KVWatcher) StopAll()

StopAll stops all active watchers. Safe to call multiple times.

func (*KVWatcher) StopWatch

func (w *KVWatcher) StopWatch(bucketName, pattern string) error

StopWatch stops watching a specific bucket+pattern.

type MessageDeserializeError

type MessageDeserializeError struct {
	Subject string
	Cause   error
}

MessageDeserializeError represents an error deserializing a message.

func (*MessageDeserializeError) Error

func (e *MessageDeserializeError) Error() string

Error implements the error interface.

func (*MessageDeserializeError) Unwrap

func (e *MessageDeserializeError) Unwrap() error

Unwrap returns the underlying error.

type MetricsRecorder

type MetricsRecorder interface {
	RecordRuleEvaluation(workflowID, ruleID string, fired bool)
	RecordActionDispatch(workflowID, ruleID, actionType string)
	RecordExecutionCreated(workflowID string)
}

MetricsRecorder records engine metrics.

type PayloadBuilderFunc

type PayloadBuilderFunc func(ctx *RuleContext) (message.Payload, error)

PayloadBuilderFunc constructs a typed NATS message payload from rule context. Has access to both state and message for maximum flexibility. Output: a typed payload struct ready for json.Marshal + NATS publish.

func SimplePayloadBuilder

func SimplePayloadBuilder(payload message.Payload) PayloadBuilderFunc

SimplePayloadBuilder creates a PayloadBuilderFunc that returns a fixed payload. Useful for simple cases where the payload doesn't depend on context.

type Publisher

type Publisher interface {
	// Publish sends a message to a NATS subject.
	Publish(ctx context.Context, subject string, data []byte) error
}

Publisher handles publishing messages to NATS subjects.

type RegistryError

type RegistryError struct {
	Op         string
	WorkflowID string
	Message    string
	Cause      error
}

RegistryError represents an error from the workflow registry.

func (*RegistryError) Error

func (e *RegistryError) Error() string

Error implements the error interface.

func (*RegistryError) Unwrap

func (e *RegistryError) Unwrap() error

Unwrap returns the underlying error.

type RuleBuilder

type RuleBuilder struct {
	// contains filtered or unexported fields
}

RuleBuilder provides a fluent API for building rule definitions.

func NewRule

func NewRule(id string) *RuleBuilder

NewRule starts building a new rule definition.

func (*RuleBuilder) Build

func (b *RuleBuilder) Build() (RuleDef, error)

Build validates and returns the rule definition.

func (*RuleBuilder) Complete

func (b *RuleBuilder) Complete() *RuleBuilder

Complete sets the action to mark the execution as completed.

func (*RuleBuilder) CompleteWithEvent

func (b *RuleBuilder) CompleteWithEvent(subject string, buildPayload PayloadBuilderFunc) *RuleBuilder

CompleteWithEvent sets the action to mark the execution as completed and publish a completion event.

func (*RuleBuilder) CompleteWithMutation

func (b *RuleBuilder) CompleteWithMutation(mutateState StateMutatorFunc) *RuleBuilder

CompleteWithMutation sets the action to mark the execution as completed after applying a final state mutation.

func (*RuleBuilder) MustBuild

func (b *RuleBuilder) MustBuild() RuleDef

MustBuild validates and returns the rule definition, panicking on error. Use this only in tests or during application initialization.

func (*RuleBuilder) Mutate

func (b *RuleBuilder) Mutate(mutateState StateMutatorFunc) *RuleBuilder

Mutate sets the action to update state without publishing.

func (*RuleBuilder) OnJetStreamSubject

func (b *RuleBuilder) OnJetStreamSubject(streamName, subject string, messageFactory func() any) *RuleBuilder

OnJetStreamSubject sets the rule to trigger on JetStream messages.

func (*RuleBuilder) OnSubject

func (b *RuleBuilder) OnSubject(subject string, messageFactory func() any) *RuleBuilder

OnSubject sets the rule to trigger on NATS subject messages.

func (*RuleBuilder) Publish

func (b *RuleBuilder) Publish(subject string, buildPayload PayloadBuilderFunc) *RuleBuilder

Publish sets the action to publish a message fire-and-forget.

func (*RuleBuilder) PublishAsync

func (b *RuleBuilder) PublishAsync(
	subject string,
	buildPayload PayloadBuilderFunc,
	expectedResultType string,
	mutateState StateMutatorFunc,
) *RuleBuilder

PublishAsync sets the action to publish a message and wait for a callback. Note: Only one action method should be called per rule. If multiple action methods are called, only the last one takes effect (actions are mutually exclusive).

func (*RuleBuilder) PublishWithMutation

func (b *RuleBuilder) PublishWithMutation(
	subject string,
	buildPayload PayloadBuilderFunc,
	mutateState StateMutatorFunc,
) *RuleBuilder

PublishWithMutation sets the action to publish a message and mutate state.

func (*RuleBuilder) WatchKV

func (b *RuleBuilder) WatchKV(bucket, pattern string) *RuleBuilder

WatchKV sets the rule to trigger on KV state changes.

func (*RuleBuilder) When

func (b *RuleBuilder) When(description string, condition ConditionFunc) *RuleBuilder

When adds a condition that must be true for the rule to fire.

func (*RuleBuilder) WhenAll

func (b *RuleBuilder) WhenAll() *RuleBuilder

WhenAll sets the rule to require all conditions to be true (AND logic). This is the default behavior.

func (*RuleBuilder) WhenAny

func (b *RuleBuilder) WhenAny() *RuleBuilder

WhenAny sets the rule to require any condition to be true (OR logic).

func (*RuleBuilder) WithCooldown

func (b *RuleBuilder) WithCooldown(d time.Duration) *RuleBuilder

WithCooldown sets the cooldown period to prevent rapid re-firing.

func (*RuleBuilder) WithMaxFirings

func (b *RuleBuilder) WithMaxFirings(n int) *RuleBuilder

WithMaxFirings limits how many times this rule can fire per execution.

func (*RuleBuilder) WithStateLookup

func (b *RuleBuilder) WithStateLookup(bucket string, keyFunc func(msg any) string) *RuleBuilder

WithStateLookup configures state loading for message-triggered rules. This enables "event + state" patterns where the message arrival triggers the rule but conditions evaluate against both message and KV state.

type RuleContext

type RuleContext struct {
	// State is the typed execution entity from KV.
	// Nil when trigger mode is TriggerMessageOnly.
	// Type assertion to concrete state type: ctx.State.(*PlanReviewState)
	State any

	// Message is the typed triggering message from NATS.
	// Nil when trigger mode is TriggerStateOnly.
	// Type assertion to concrete message type: ctx.Message.(*PlanReviewResult)
	Message any

	// KVRevision is the KV entry revision (for optimistic concurrency).
	// Zero when no KV state is involved.
	KVRevision uint64

	// Subject is the NATS subject the message arrived on (for subject-triggered rules).
	// Empty when trigger mode is TriggerStateOnly.
	Subject string

	// KVKey is the KV key that triggered the rule (for KV-triggered rules).
	// Empty when trigger mode is TriggerMessageOnly without state lookup.
	KVKey string
}

RuleContext provides typed access to both the accumulated state and the triggering event. Which fields are populated depends on the TriggerMode:

TriggerStateOnly:       State is set, Message is nil
TriggerMessageOnly:     Message is set, State is nil
TriggerMessageAndState: Both State and Message are set

func BuildRuleContextFromKV

func BuildRuleContextFromKV(event KVWatchEvent, stateFactory func() any) (*RuleContext, error)

BuildRuleContextFromKV builds a RuleContext from a KV watch event. The stateFactory is used to create a typed instance for unmarshaling.

func BuildRuleContextFromMessage

func BuildRuleContextFromMessage(
	event SubjectMessageEvent,
	messageFactory func() any,
	stateLoader func(key string) (any, uint64, error),
	stateKeyFunc func(msg any) string,
) (*RuleContext, error)

BuildRuleContextFromMessage builds a RuleContext from a subject message event. If stateLoader is provided, it loads state from KV using the extracted key.

func BuildRuleContextFromMessageWithKV

func BuildRuleContextFromMessageWithKV(
	ctx context.Context,
	event SubjectMessageEvent,
	messageFactory func() any,
	bucket jetstream.KeyValue,
	stateFactory func() any,
	stateKeyFunc func(msg any) string,
) (*RuleContext, error)

BuildRuleContextFromMessageWithKV builds a RuleContext from a message with KV state lookup. This is the combined message+state trigger pattern.

type RuleDef

type RuleDef struct {
	// ID is the unique rule identifier within a workflow.
	ID string

	// Trigger defines what causes this rule to evaluate.
	Trigger TriggerSource

	// Conditions that must all be true for the rule to fire.
	// Evaluated against RuleContext which provides typed access to
	// both KV state and triggering message.
	Conditions []Condition

	// Logic determines how conditions are combined ("and" or "or").
	// Default is "and" (all conditions must be true).
	Logic string

	// Action to perform when conditions are met.
	Action Action

	// Cooldown prevents re-firing within this duration.
	Cooldown time.Duration

	// MaxFirings limits how many times this rule can fire per execution (0 = unlimited).
	MaxFirings int
}

RuleDef defines a reactive rule. This is the Go-native equivalent of the current JSON workflow step definitions.

func (*RuleDef) Validate

func (r *RuleDef) Validate() error

Validate checks if the rule definition is properly configured.

type RuleWithWorkflow

type RuleWithWorkflow struct {
	Rule       *RuleDef
	Definition *Definition
}

RuleWithWorkflow pairs a rule with its parent workflow definition.

type StateAccessor

type StateAccessor interface {
	GetExecutionState() *ExecutionState
}

StateAccessor provides a standard interface for accessing the embedded ExecutionState from custom workflow state structs. Implementing this interface allows the engine to access and modify the base execution state without reflection.

Example implementation:

type PlanReviewState struct {
    reactive.ExecutionState
    PlanContent *PlanContent
}

func (s *PlanReviewState) GetExecutionState() *reactive.ExecutionState {
    return &s.ExecutionState
}

type StateKeyError

type StateKeyError struct {
	Subject string
	Message string
}

StateKeyError represents an error extracting the state key from a message.

func (*StateKeyError) Error

func (e *StateKeyError) Error() string

Error implements the error interface.

type StateLoadError

type StateLoadError struct {
	Key   string
	Cause error
}

StateLoadError represents an error loading state from KV.

func (*StateLoadError) Error

func (e *StateLoadError) Error() string

Error implements the error interface.

func (*StateLoadError) Unwrap

func (e *StateLoadError) Unwrap() error

Unwrap returns the underlying error.

type StateMutatorFunc

type StateMutatorFunc func(ctx *RuleContext, result any) error

StateMutatorFunc updates the execution state after an action completes. For sync actions (Mutate, Complete, Publish), result is nil. For async actions (PublishAsync), result is the typed callback result. The mutated state is written back to KV, potentially triggering other rules.

func ChainMutators

func ChainMutators(mutators ...StateMutatorFunc) StateMutatorFunc

ChainMutators combines multiple state mutators into a single mutator. They are executed in order.

func IncrementIterationMutator

func IncrementIterationMutator() StateMutatorFunc

IncrementIterationMutator creates a StateMutatorFunc that increments the iteration counter.

func PhaseTransition

func PhaseTransition(newPhase string) StateMutatorFunc

PhaseTransition creates a StateMutatorFunc that updates the execution phase. This is a common pattern for workflows that progress through phases.

func SetErrorMutator

func SetErrorMutator(errorMsg string) StateMutatorFunc

SetErrorMutator creates a StateMutatorFunc that sets an error on the execution state.

type StateStore

type StateStore interface {
	// Get retrieves an entry by key.
	Get(ctx context.Context, key string) (jetstream.KeyValueEntry, error)

	// Put stores a value and returns the new revision.
	Put(ctx context.Context, key string, value []byte) (uint64, error)

	// Update performs a CAS update with explicit revision.
	// Returns ErrKVRevisionMismatch on conflict.
	Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error)
}

StateStore provides read/write access to execution state in KV.

type SubjectConsumer

type SubjectConsumer struct {
	// contains filtered or unexported fields
}

SubjectConsumer manages JetStream subject consumers for message-triggered rules. It consumes messages from NATS subjects and triggers rule evaluation when messages arrive.

func NewSubjectConsumer

func NewSubjectConsumer(logger *slog.Logger) *SubjectConsumer

NewSubjectConsumer creates a new subject consumer.

func (*SubjectConsumer) StartConsumer

func (c *SubjectConsumer) StartConsumer(
	ctx context.Context,
	js jetstream.JetStream,
	streamName string,
	subject string,
	consumerName string,
	handler SubjectMessageHandler,
) error

StartConsumer starts consuming messages from a JetStream stream/subject. The handler is called for each message received.

func (*SubjectConsumer) StopAll

func (c *SubjectConsumer) StopAll()

StopAll stops all active consumers. Safe to call multiple times.

func (*SubjectConsumer) StopConsumer

func (c *SubjectConsumer) StopConsumer(streamName, consumerName string)

StopConsumer stops a specific consumer.

type SubjectMessageEvent

type SubjectMessageEvent struct {
	// Subject is the NATS subject the message was received on.
	Subject string

	// Data is the raw message payload.
	Data []byte

	// Timestamp is when the message was received.
	Timestamp time.Time

	// StreamSequence is the sequence number in the stream.
	StreamSequence uint64

	// DeliveryCount is how many times this message has been delivered.
	DeliveryCount uint64
}

SubjectMessageEvent represents a message received on a subject.

type SubjectMessageHandler

type SubjectMessageHandler func(ctx context.Context, event SubjectMessageEvent, msg jetstream.Msg)

SubjectMessageHandler is called when a message arrives on a subject. The handler should call msg.Ack(), msg.Nak(), or msg.Term() to acknowledge the message.

type TimelineEntry

type TimelineEntry struct {
	Timestamp   time.Time `json:"timestamp"`
	RuleID      string    `json:"rule_id"`
	TriggerMode string    `json:"trigger_mode"` // "kv", "message", "message+state"
	TriggerInfo string    `json:"trigger_info"` // KV key or NATS subject that triggered
	Action      string    `json:"action"`
	Phase       string    `json:"phase_after"`
	Iteration   int       `json:"iteration"`
}

TimelineEntry records a rule firing event.

type TriggerMode

type TriggerMode int

TriggerMode indicates which reactive primitive(s) a trigger uses.

const (
	// TriggerInvalid indicates an invalid or unconfigured trigger.
	TriggerInvalid TriggerMode = iota
	// TriggerStateOnly uses KV watch only.
	TriggerStateOnly
	// TriggerMessageOnly uses subject consumer only.
	TriggerMessageOnly
	// TriggerMessageAndState uses subject consumer with KV state lookup.
	TriggerMessageAndState
)

func (TriggerMode) String

func (m TriggerMode) String() string

String returns a human-readable name for the trigger mode.

type TriggerSource

type TriggerSource struct {

	// WatchBucket is the KV bucket to watch for state changes.
	// When set without Subject, KV changes trigger rule evaluation.
	WatchBucket string `json:"watch_bucket,omitempty"`

	// WatchPattern is the key pattern within the bucket (supports NATS wildcards).
	WatchPattern string `json:"watch_pattern,omitempty"`

	// Subject is the NATS subject to consume messages from.
	// When set, message arrival triggers rule evaluation.
	// Supports NATS wildcards (e.g., "workflow.callback.plan-review.>").
	Subject string `json:"subject,omitempty"`

	// StreamName is the JetStream stream to consume from.
	// Required when Subject is set and durable consumption is needed.
	// When empty with Subject set, uses Core NATS subscription (ephemeral).
	StreamName string `json:"stream_name,omitempty"`

	// MessageFactory creates a zero-value instance of the expected message type
	// for typed deserialization. Required when Subject is set.
	MessageFactory func() any `json:"-"`

	// StateBucket is the KV bucket to load state from when a message triggers
	// the rule. This enables "event + state" patterns where the message arrival
	// is the trigger but conditions evaluate against both the message and the
	// accumulated KV state.
	//
	// When Subject is set AND StateBucket is set:
	//   - Message arrival triggers the rule
	//   - Engine loads state from StateBucket using StateKeyFunc
	//   - RuleContext contains both .Message and .State
	//
	// When only WatchBucket is set (no Subject):
	//   - KV change triggers the rule
	//   - RuleContext contains .State only (.Message is nil)
	//
	// When only Subject is set (no StateBucket or WatchBucket):
	//   - Message arrival triggers the rule
	//   - RuleContext contains .Message only (.State is nil)
	StateBucket string `json:"state_bucket,omitempty"`

	// StateKeyFunc extracts the KV key from the triggering message to load state.
	// Required when Subject + StateBucket are both set.
	// Example: func(msg any) string {
	//     return "plan-review." + msg.(*MyCallbackResult).TaskID
	// }
	StateKeyFunc func(msg any) string `json:"-"`
}

TriggerSource defines what causes a rule to evaluate. A rule can watch KV state, consume stream messages, or both. When both are configured, the message arrival is the trigger and KV state is loaded for condition evaluation — this enables "event + state" patterns in a single rule.

func (*TriggerSource) Mode

func (t *TriggerSource) Mode() TriggerMode

Mode returns which reactive primitive(s) this trigger uses.

func (*TriggerSource) Validate

func (t *TriggerSource) Validate() error

Validate checks if the trigger source is properly configured.

type UnmarshalError

type UnmarshalError struct {
	Key   string
	Cause error
}

UnmarshalError represents an error unmarshaling KV state.

func (*UnmarshalError) Error

func (e *UnmarshalError) Error() string

Error implements the error interface.

func (*UnmarshalError) Unwrap

func (e *UnmarshalError) Unwrap() error

Unwrap returns the underlying error.

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError represents a validation error for a specific field.

func (*ValidationError) Error

func (e *ValidationError) Error() string

Error implements the error interface.

type WatchError

type WatchError struct {
	Bucket  string
	Pattern string
	Cause   error
}

WatchError represents an error starting a KV watch.

func (*WatchError) Error

func (e *WatchError) Error() string

Error implements the error interface.

func (*WatchError) Unwrap

func (e *WatchError) Unwrap() error

Unwrap returns the underlying error.

type WatchHandler

type WatchHandler func(ctx context.Context, event KVWatchEvent)

WatchHandler is called when a KV entry is updated. The handler should return quickly; long-running operations should be done asynchronously.

type WorkflowBuilder

type WorkflowBuilder struct {
	// contains filtered or unexported fields
}

WorkflowBuilder provides a fluent API for building workflow definitions.

func NewWorkflow

func NewWorkflow(id string) *WorkflowBuilder

NewWorkflow starts building a new workflow definition.

func (*WorkflowBuilder) AddRule

func (b *WorkflowBuilder) AddRule(rule RuleDef) *WorkflowBuilder

AddRule adds a rule to the workflow.

func (*WorkflowBuilder) AddRuleFromBuilder

func (b *WorkflowBuilder) AddRuleFromBuilder(rb *RuleBuilder) *WorkflowBuilder

AddRuleFromBuilder adds a rule built using RuleBuilder.

func (*WorkflowBuilder) Build

func (b *WorkflowBuilder) Build() (*Definition, error)

Build validates and returns the workflow definition.

func (*WorkflowBuilder) MustBuild

func (b *WorkflowBuilder) MustBuild() *Definition

MustBuild validates and returns the workflow definition, panicking on error. Use this only in tests or during application initialization.

func (*WorkflowBuilder) WithDescription

func (b *WorkflowBuilder) WithDescription(desc string) *WorkflowBuilder

WithDescription sets the workflow description.

func (*WorkflowBuilder) WithEvents

func (b *WorkflowBuilder) WithEvents(events EventConfig) *WorkflowBuilder

WithEvents sets the event configuration for workflow lifecycle events.

func (*WorkflowBuilder) WithMaxIterations

func (b *WorkflowBuilder) WithMaxIterations(n int) *WorkflowBuilder

WithMaxIterations sets the maximum loop iterations for this workflow.

func (*WorkflowBuilder) WithOnComplete

func (b *WorkflowBuilder) WithOnComplete(subject string) *WorkflowBuilder

WithOnComplete sets the subject to publish when the workflow completes successfully.

func (*WorkflowBuilder) WithOnEscalate

func (b *WorkflowBuilder) WithOnEscalate(subject string) *WorkflowBuilder

WithOnEscalate sets the subject to publish when the workflow is escalated.

func (*WorkflowBuilder) WithOnFail

func (b *WorkflowBuilder) WithOnFail(subject string) *WorkflowBuilder

WithOnFail sets the subject to publish when the workflow fails.

func (*WorkflowBuilder) WithStateBucket

func (b *WorkflowBuilder) WithStateBucket(bucket string) *WorkflowBuilder

WithStateBucket sets the KV bucket for storing execution state.

func (*WorkflowBuilder) WithStateFactory

func (b *WorkflowBuilder) WithStateFactory(factory func() any) *WorkflowBuilder

WithStateFactory sets the factory function for creating typed state instances. The factory must return a pointer to a struct that embeds ExecutionState.

func (*WorkflowBuilder) WithTimeout

func (b *WorkflowBuilder) WithTimeout(d time.Duration) *WorkflowBuilder

WithTimeout sets the maximum execution duration for this workflow.

type WorkflowCompletionEvent

type WorkflowCompletionEvent struct {
	ExecutionID string    `json:"execution_id"`
	WorkflowID  string    `json:"workflow_id"`
	Status      string    `json:"status"`
	Phase       string    `json:"final_phase"`
	CompletedAt time.Time `json:"completed_at"`
}

WorkflowCompletionEvent is published when a workflow completes successfully.

func (*WorkflowCompletionEvent) MarshalJSON

func (e *WorkflowCompletionEvent) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*WorkflowCompletionEvent) Schema

func (e *WorkflowCompletionEvent) Schema() message.Type

Schema returns the message type for WorkflowCompletionEvent.

func (*WorkflowCompletionEvent) UnmarshalJSON

func (e *WorkflowCompletionEvent) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

func (*WorkflowCompletionEvent) Validate

func (e *WorkflowCompletionEvent) Validate() error

Validate validates the completion event.

type WorkflowEscalationEvent

type WorkflowEscalationEvent struct {
	ExecutionID string    `json:"execution_id"`
	WorkflowID  string    `json:"workflow_id"`
	Phase       string    `json:"phase"`
	Reason      string    `json:"reason"`
	EscalatedAt time.Time `json:"escalated_at"`
}

WorkflowEscalationEvent is published when a workflow is escalated.

func (*WorkflowEscalationEvent) MarshalJSON

func (e *WorkflowEscalationEvent) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*WorkflowEscalationEvent) Schema

func (e *WorkflowEscalationEvent) Schema() message.Type

Schema returns the message type for WorkflowEscalationEvent.

func (*WorkflowEscalationEvent) UnmarshalJSON

func (e *WorkflowEscalationEvent) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

func (*WorkflowEscalationEvent) Validate

func (e *WorkflowEscalationEvent) Validate() error

Validate validates the escalation event.

type WorkflowFailureEvent

type WorkflowFailureEvent struct {
	ExecutionID string    `json:"execution_id"`
	WorkflowID  string    `json:"workflow_id"`
	Phase       string    `json:"phase"`
	Error       string    `json:"error"`
	FailedAt    time.Time `json:"failed_at"`
}

WorkflowFailureEvent is published when a workflow fails.

func (*WorkflowFailureEvent) MarshalJSON

func (e *WorkflowFailureEvent) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*WorkflowFailureEvent) Schema

func (e *WorkflowFailureEvent) Schema() message.Type

Schema returns the message type for WorkflowFailureEvent.

func (*WorkflowFailureEvent) UnmarshalJSON

func (e *WorkflowFailureEvent) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler.

func (*WorkflowFailureEvent) Validate

func (e *WorkflowFailureEvent) Validate() error

Validate validates the failure event.

type WorkflowRegistry

type WorkflowRegistry struct {
	// contains filtered or unexported fields
}

WorkflowRegistry manages workflow definitions and their associated types. It provides thread-safe registration and lookup of workflows.

func NewWorkflowRegistry

func NewWorkflowRegistry(logger *slog.Logger) *WorkflowRegistry

NewWorkflowRegistry creates a new workflow registry.

func (*WorkflowRegistry) Clear

func (r *WorkflowRegistry) Clear()

Clear removes all registered workflows and result types. Used primarily for testing.

func (*WorkflowRegistry) Count

func (r *WorkflowRegistry) Count() int

Count returns the number of registered workflows.

func (*WorkflowRegistry) Get

func (r *WorkflowRegistry) Get(workflowID string) *Definition

Get returns a workflow definition by ID. Returns nil if not found.

func (*WorkflowRegistry) GetAll

func (r *WorkflowRegistry) GetAll() []*Definition

GetAll returns all registered workflow definitions.

func (*WorkflowRegistry) GetResultTypeFactory

func (r *WorkflowRegistry) GetResultTypeFactory(typeKey string) func() message.Payload

GetResultTypeFactory returns the factory function for a result type. Returns nil if not found.

func (*WorkflowRegistry) GetRulesForSubject

func (r *WorkflowRegistry) GetRulesForSubject(subject string) []*RuleWithWorkflow

GetRulesForSubject returns all rules that trigger on the given subject pattern.

func (*WorkflowRegistry) GetRulesForTrigger

func (r *WorkflowRegistry) GetRulesForTrigger(triggerMode TriggerMode, bucket, _ string) []*RuleWithWorkflow

GetRulesForTrigger returns all rules across all workflows that match the given trigger. This is used for routing incoming events to the appropriate rules. Note: keyPattern is reserved for future pattern-based filtering.

func (*WorkflowRegistry) List

func (r *WorkflowRegistry) List() []string

List returns all registered workflow IDs.

func (*WorkflowRegistry) Register

func (r *WorkflowRegistry) Register(def *Definition) error

Register registers a workflow definition. Returns an error if the workflow ID is already registered.

func (*WorkflowRegistry) RegisterResultType

func (r *WorkflowRegistry) RegisterResultType(typeKey string, factory func() message.Payload) error

RegisterResultType registers a factory function for deserializing callback results. The typeKey should be in format "domain.category.version".

func (*WorkflowRegistry) Unregister

func (r *WorkflowRegistry) Unregister(workflowID string)

Unregister removes a workflow definition.

func (*WorkflowRegistry) ValidationSummary

func (r *WorkflowRegistry) ValidationSummary() string

ValidationSummary returns a summary of all registered workflows for debugging.

Directories

Path Synopsis
Package testutil provides testing utilities for the reactive workflow engine.
Package testutil provides testing utilities for the reactive workflow engine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL