Documentation
¶
Index ¶
- Variables
- func AsCommander[T any](flow Flow[T]) command.Commander[T]
- func BuildFlows[T command.Message](ctx context.Context, cfg FlowSet, bctx BuildContext[T]) (map[string]Flow[T], error)
- func ExecuteBatch[T command.Message](ctx context.Context, messages []T, handler command.CommandFunc[T], ...) error
- func MarshalFlowSet(cfg FlowSet) ([]byte, error)
- func ParallelExecute[T any](ctx context.Context, msg T, handlers []command.CommandFunc[T], ...) error
- func SerialExecute[T any](ctx context.Context, msg T, handlers []command.CommandFunc[T], ...) error
- type ActionRegistry
- func (r *ActionRegistry[T]) Lookup(name string) (func(context.Context, T) error, bool)
- func (r *ActionRegistry[T]) Register(name string, action func(context.Context, T) error) error
- func (r *ActionRegistry[T]) RegisterNamespaced(namespace, name string, action func(context.Context, T) error) error
- func (r *ActionRegistry[T]) SetNamespacer(fn func(string, string) string)
- type AggregateErrorStrategy
- type BatchConfig
- type BatchExecutor
- type BatchExecutorOption
- type BuildContext
- type CircuitBreaker
- type CircuitBreakerOption
- type Conditional
- type ConditionalBranch
- type ConditionalConfig
- type ConditionalExecutor
- type ConditionalOption
- type DecoratorConfig
- type ErrorStrategy
- type FailFastStrategy
- type Flow
- type FlowCommander
- type FlowDefinition
- type FlowOptions
- type FlowSet
- type GuardRegistry
- func (g *GuardRegistry[T]) Lookup(name string) (func(T) bool, bool)
- func (g *GuardRegistry[T]) Register(name string, guard func(T) bool) error
- func (g *GuardRegistry[T]) RegisterNamespaced(namespace, name string, guard func(T) bool) error
- func (g *GuardRegistry[T]) SetNamespacer(fn func(string, string) string)
- type HandlerRegistry
- func (r *HandlerRegistry[T]) Lookup(id string) (command.Commander[T], bool)
- func (r *HandlerRegistry[T]) Register(id string, h command.Commander[T]) error
- func (r *HandlerRegistry[T]) RegisterNamespaced(namespace, id string, h command.Commander[T]) error
- func (r *HandlerRegistry[T]) SetNamespacer(fn func(string, string) string)
- type HandlerResolver
- type InMemoryStateStore
- type MetricsDecorator
- type MetricsRecorder
- type MetricsRecorderRegistry
- func (r *MetricsRecorderRegistry) Lookup(name string) (MetricsRecorder, bool)
- func (r *MetricsRecorderRegistry) Register(name string, mr MetricsRecorder) error
- func (r *MetricsRecorderRegistry) RegisterNamespaced(namespace, name string, mr MetricsRecorder) error
- func (r *MetricsRecorderRegistry) SetNamespacer(fn func(string, string) string)
- type MuxResolver
- type Option
- type ParallelConfig
- type ParallelExecutor
- type RedisClient
- type RedisStateStore
- type Resolver
- type RetryableFlow
- type SQLiteStateStore
- type Saga
- type SagaConfig
- type SagaStep
- type SagaStepConfig
- type SerialConfig
- type SerialExecutor
- type StateConfig
- type StateMachine
- type StateMachineConfig
- type StateMachineOption
- type StateStore
- type TransitionConfig
- type TransitionRequest
Constants ¶
This section is empty.
Variables ¶
var ErrCircuitOpen = fmt.Errorf("circuit open")
Functions ¶
func AsCommander ¶ added in v0.7.0
AsCommander converts a Flow to a command.Commander.
func BuildFlows ¶ added in v0.7.0
func BuildFlows[T command.Message](ctx context.Context, cfg FlowSet, bctx BuildContext[T]) (map[string]Flow[T], error)
BuildFlows constructs flows from config using provided registries.
func ExecuteBatch ¶
func ExecuteBatch[T command.Message](ctx context.Context, messages []T, handler command.CommandFunc[T], batchSize, concurrency int, opts ...runner.Option) error
ExecuteBatch processes messages in batches with a function handler
func MarshalFlowSet ¶ added in v0.7.0
MarshalFlowSet renders FlowSet as JSON (useful for fixtures).
func ParallelExecute ¶
func ParallelExecute[T any](ctx context.Context, msg T, handlers []command.CommandFunc[T], opts ...runner.Option) error
ParallelExecute runs handlers concurrently with function handlers
func SerialExecute ¶ added in v0.7.0
func SerialExecute[T any](ctx context.Context, msg T, handlers []command.CommandFunc[T], opts ...runner.Option) error
SerialExecute will run each handler in sequence with function handlers
Types ¶
type ActionRegistry ¶ added in v0.7.0
type ActionRegistry[T any] struct { // contains filtered or unexported fields }
ActionRegistry stores named actions executed during transitions.
func NewActionRegistry ¶ added in v0.7.0
func NewActionRegistry[T any]() *ActionRegistry[T]
NewActionRegistry creates an empty registry.
func (*ActionRegistry[T]) RegisterNamespaced ¶ added in v0.7.0
func (r *ActionRegistry[T]) RegisterNamespaced(namespace, name string, action func(context.Context, T) error) error
RegisterNamespaced adds an action under namespace+name.
func (*ActionRegistry[T]) SetNamespacer ¶ added in v0.7.0
func (r *ActionRegistry[T]) SetNamespacer(fn func(string, string) string)
SetNamespacer customizes how action IDs are namespaced.
type AggregateErrorStrategy ¶ added in v0.7.0
type AggregateErrorStrategy struct{}
AggregateErrorStrategy combines all errors into one
func (AggregateErrorStrategy) HandleErrors ¶ added in v0.7.0
func (a AggregateErrorStrategy) HandleErrors(errs []error) error
type BatchConfig ¶ added in v0.7.0
type BatchConfig struct {
Handler string `json:"handler" yaml:"handler"`
BatchSize int `json:"batch_size,omitempty" yaml:"batch_size,omitempty"`
Concurrency int `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`
Opts FlowOptions `json:"options,omitempty" yaml:"options,omitempty"`
}
type BatchExecutor ¶
BatchExecutor processes commands in batches
func NewBatchExecutor ¶
func NewBatchExecutor[T command.Message](handler command.Commander[T], opts ...BatchExecutorOption[T]) *BatchExecutor[T]
type BatchExecutorOption ¶
type BatchExecutorOption[T command.Message] func(*BatchExecutor[T])
func WithBatchSize ¶
func WithBatchSize[T command.Message](size int) BatchExecutorOption[T]
func WithConcurrency ¶
func WithConcurrency[T command.Message](n int) BatchExecutorOption[T]
func WithRunnerOptions ¶ added in v0.7.0
func WithRunnerOptions[T command.Message](opts ...Option) BatchExecutorOption[T]
WithRunnerOptions attaches runner options to the batch executor.
type BuildContext ¶ added in v0.7.0
type BuildContext[T command.Message] struct { Handlers *HandlerRegistry[T] Guards *GuardRegistry[T] Actions *ActionRegistry[T] Store StateStore Request TransitionRequest[T] }
BuildContext bundles registries and stores needed to construct flows from config.
type CircuitBreaker ¶ added in v0.7.0
CircuitBreaker implements the circuit breaker pattern
func NewCircuitBreaker ¶ added in v0.7.0
func NewCircuitBreaker[T command.Message]( flow Flow[T], failureThreshold int, resetTimeout time.Duration, opts ...CircuitBreakerOption[T], ) *CircuitBreaker[T]
type CircuitBreakerOption ¶ added in v0.7.0
type CircuitBreakerOption[T command.Message] func(*CircuitBreaker[T])
CircuitBreakerOption allows customizing breaker behavior.
func WithHalfOpenProbe ¶ added in v0.7.0
func WithHalfOpenProbe[T command.Message](probe Flow[T]) CircuitBreakerOption[T]
WithHalfOpenProbe sets the flow executed when probing a half-open circuit.
type Conditional ¶ added in v0.7.0
type ConditionalBranch ¶ added in v0.7.0
type ConditionalConfig ¶ added in v0.7.0
type ConditionalConfig struct {
Branches []ConditionalBranch `json:"branches" yaml:"branches"`
DefaultHandler string `json:"default_handler,omitempty" yaml:"default_handler,omitempty"`
}
type ConditionalExecutor ¶ added in v0.7.0
ConditionalExecutor enables conditional command execution based on predicates or guards.
func NewConditionalExecutor ¶ added in v0.7.0
func NewConditionalExecutor[T command.Message](branches []Conditional[T], opts ...ConditionalOption[T]) *ConditionalExecutor[T]
type ConditionalOption ¶ added in v0.7.0
type ConditionalOption[T command.Message] func(*ConditionalExecutor[T])
ConditionalOption customizes conditional executors.
func WithDefaultHandler ¶ added in v0.7.0
func WithDefaultHandler[T command.Message](handler func(context.Context, T) error) ConditionalOption[T]
WithDefaultHandler sets the handler executed when no branch matches.
func WithGuardRegistry ¶ added in v0.7.0
func WithGuardRegistry[T command.Message](registry *GuardRegistry[T]) ConditionalOption[T]
WithGuardRegistry wires a guard registry for guard-based branches.
type DecoratorConfig ¶ added in v0.7.0
type ErrorStrategy ¶ added in v0.7.0
ErrorStrategy defines how to handle multiple errors from parallel execution
type FailFastStrategy ¶ added in v0.7.0
type FailFastStrategy struct{}
FailFastStrategy returns the first error encountered
func (FailFastStrategy) HandleErrors ¶ added in v0.7.0
func (f FailFastStrategy) HandleErrors(errs []error) error
type FlowCommander ¶ added in v0.7.0
type FlowCommander[T any] struct { // contains filtered or unexported fields }
FlowCommander wraps a Flow so it can be registered as a command.Commander.
type FlowDefinition ¶ added in v0.7.0
type FlowDefinition struct {
ID string `json:"id" yaml:"id"`
Type string `json:"type" yaml:"type"`
Options FlowOptions `json:"options,omitempty" yaml:"options,omitempty"`
Serial *SerialConfig `json:"serial,omitempty" yaml:"serial,omitempty"`
Parallel *ParallelConfig `json:"parallel,omitempty" yaml:"parallel,omitempty"`
Batch *BatchConfig `json:"batch,omitempty" yaml:"batch,omitempty"`
Conditional *ConditionalConfig `json:"conditional,omitempty" yaml:"conditional,omitempty"`
Saga *SagaConfig `json:"saga,omitempty" yaml:"saga,omitempty"`
StateMachine *StateMachineConfig `json:"state_machine,omitempty" yaml:"state_machine,omitempty"`
Decorators []DecoratorConfig `json:"decorators,omitempty" yaml:"decorators,omitempty"`
}
FlowDefinition describes a single flow instance.
func (FlowDefinition) Validate ¶ added in v0.7.0
func (d FlowDefinition) Validate() error
Validate checks required fields for the flow definition.
type FlowOptions ¶ added in v0.7.0
type FlowOptions struct {
Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
NoTimeout bool `json:"no_timeout,omitempty" yaml:"no_timeout,omitempty"`
MaxRetries int `json:"max_retries,omitempty" yaml:"max_retries,omitempty"`
MaxRuns int `json:"max_runs,omitempty" yaml:"max_runs,omitempty"`
RunOnce bool `json:"run_once,omitempty" yaml:"run_once,omitempty"`
ExitOnError bool `json:"exit_on_error,omitempty" yaml:"exit_on_error,omitempty"`
Deadline time.Time `json:"deadline,omitempty" yaml:"deadline,omitempty"`
}
FlowOptions captures common runner options.
type FlowSet ¶ added in v0.7.0
type FlowSet struct {
Version int `json:"version" yaml:"version"`
Flows []FlowDefinition `json:"flows" yaml:"flows"`
Options FlowOptions `json:"options,omitempty" yaml:"options,omitempty"`
Meta map[string]any `json:"meta,omitempty" yaml:"meta,omitempty"`
}
FlowSet represents a collection of flows loaded from config.
func ParseFlowSet ¶ added in v0.7.0
ParseFlowSet attempts to parse JSON or YAML into a FlowSet.
type GuardRegistry ¶ added in v0.7.0
type GuardRegistry[T any] struct { // contains filtered or unexported fields }
GuardRegistry stores named guard functions.
func NewGuardRegistry ¶ added in v0.7.0
func NewGuardRegistry[T any]() *GuardRegistry[T]
NewGuardRegistry creates an empty registry.
func (*GuardRegistry[T]) Lookup ¶ added in v0.7.0
func (g *GuardRegistry[T]) Lookup(name string) (func(T) bool, bool)
Lookup retrieves a guard by name.
func (*GuardRegistry[T]) Register ¶ added in v0.7.0
func (g *GuardRegistry[T]) Register(name string, guard func(T) bool) error
Register stores a guard by name.
func (*GuardRegistry[T]) RegisterNamespaced ¶ added in v0.7.0
func (g *GuardRegistry[T]) RegisterNamespaced(namespace, name string, guard func(T) bool) error
RegisterNamespaced stores a guard using namespace+name.
func (*GuardRegistry[T]) SetNamespacer ¶ added in v0.7.0
func (g *GuardRegistry[T]) SetNamespacer(fn func(string, string) string)
SetNamespacer customizes how guard IDs are namespaced.
type HandlerRegistry ¶ added in v0.7.0
type HandlerRegistry[T any] struct { // contains filtered or unexported fields }
HandlerRegistry stores named commanders.
func NewHandlerRegistry ¶ added in v0.7.0
func NewHandlerRegistry[T any]() *HandlerRegistry[T]
NewHandlerRegistry creates an empty registry.
func (*HandlerRegistry[T]) Lookup ¶ added in v0.7.0
func (r *HandlerRegistry[T]) Lookup(id string) (command.Commander[T], bool)
Lookup returns a commander by id.
func (*HandlerRegistry[T]) Register ¶ added in v0.7.0
func (r *HandlerRegistry[T]) Register(id string, h command.Commander[T]) error
Register stores a commander by id.
func (*HandlerRegistry[T]) RegisterNamespaced ¶ added in v0.7.0
func (r *HandlerRegistry[T]) RegisterNamespaced(namespace, id string, h command.Commander[T]) error
RegisterNamespaced stores a commander using a namespace + id.
func (*HandlerRegistry[T]) SetNamespacer ¶ added in v0.7.0
func (r *HandlerRegistry[T]) SetNamespacer(fn func(string, string) string)
SetNamespacer customizes how IDs are namespaced.
type HandlerResolver ¶ added in v0.7.0
type HandlerResolver[T any] struct { // contains filtered or unexported fields }
HandlerResolver resolves a static list of handlers.
func NewHandlerResolver ¶ added in v0.7.0
func NewHandlerResolver[T any](handlers ...command.Commander[T]) *HandlerResolver[T]
NewHandlerResolver constructs a resolver backed by explicit handlers.
type InMemoryStateStore ¶ added in v0.7.0
type InMemoryStateStore struct {
// contains filtered or unexported fields
}
InMemoryStateStore is a thread-safe in-memory store.
func NewInMemoryStateStore ¶ added in v0.7.0
func NewInMemoryStateStore() *InMemoryStateStore
NewInMemoryStateStore constructs an empty store.
type MetricsDecorator ¶ added in v0.7.0
MetricsDecorator adds metrics to any flow
func NewMetricsDecorator ¶ added in v0.7.0
func NewMetricsDecorator[T command.Message]( flow interface { Execute(context.Context, T) error }, recorder MetricsRecorder, ) *MetricsDecorator[T]
type MetricsRecorder ¶ added in v0.7.0
type MetricsRecorderRegistry ¶ added in v0.7.0
type MetricsRecorderRegistry struct {
// contains filtered or unexported fields
}
MetricsRecorderRegistry stores named metrics recorders.
func NewMetricsRecorderRegistry ¶ added in v0.7.0
func NewMetricsRecorderRegistry() *MetricsRecorderRegistry
NewMetricsRecorderRegistry constructs an empty registry.
func (*MetricsRecorderRegistry) Lookup ¶ added in v0.7.0
func (r *MetricsRecorderRegistry) Lookup(name string) (MetricsRecorder, bool)
Lookup retrieves a recorder by name.
func (*MetricsRecorderRegistry) Register ¶ added in v0.7.0
func (r *MetricsRecorderRegistry) Register(name string, mr MetricsRecorder) error
Register stores a recorder by name.
func (*MetricsRecorderRegistry) RegisterNamespaced ¶ added in v0.7.0
func (r *MetricsRecorderRegistry) RegisterNamespaced(namespace, name string, mr MetricsRecorder) error
RegisterNamespaced stores a recorder by namespace+name.
func (*MetricsRecorderRegistry) SetNamespacer ¶ added in v0.7.0
func (r *MetricsRecorderRegistry) SetNamespacer(fn func(string, string) string)
SetNamespacer customizes namespacing.
type MuxResolver ¶ added in v0.7.0
type MuxResolver[T any] struct { // contains filtered or unexported fields }
MuxResolver resolves handlers from a router mux using the message type.
func NewMuxResolver ¶ added in v0.7.0
func NewMuxResolver[T any](mux *router.Mux) *MuxResolver[T]
NewMuxResolver builds a resolver using the provided mux (or a new one when nil).
type Option ¶ added in v0.7.0
Option mirrors runner.Option so flows can share runner configuration knobs.
type ParallelConfig ¶ added in v0.7.0
type ParallelConfig struct {
Steps []string `json:"steps" yaml:"steps"`
ErrorStrategy string `json:"error_strategy,omitempty" yaml:"error_strategy,omitempty"`
Opts FlowOptions `json:"options,omitempty" yaml:"options,omitempty"`
}
type ParallelExecutor ¶
type ParallelExecutor[T any] struct { // contains filtered or unexported fields }
func NewParallelExecutor ¶
func NewParallelExecutor[T any](handlers []command.Commander[T], opts ...runner.Option) *ParallelExecutor[T]
NewParallelExecutor creates a new ParallelExecutor with the provided handlers
func (*ParallelExecutor[T]) Execute ¶
func (p *ParallelExecutor[T]) Execute(ctx context.Context, msg T) error
func (*ParallelExecutor[T]) WithErrorStrategy ¶ added in v0.7.0
func (p *ParallelExecutor[T]) WithErrorStrategy(strategy ErrorStrategy) *ParallelExecutor[T]
WithErrorStrategy overrides the error strategy used to combine handler errors.
type RedisClient ¶ added in v0.7.0
type RedisClient interface {
Get(ctx context.Context, key string) (string, error)
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
}
RedisClient captures the minimal commands needed from a redis client.
type RedisStateStore ¶ added in v0.7.0
type RedisStateStore struct {
// contains filtered or unexported fields
}
RedisStateStore persists state via a minimal redis client interface.
func NewRedisStateStore ¶ added in v0.7.0
func NewRedisStateStore(client RedisClient, ttl time.Duration) *RedisStateStore
NewRedisStateStore builds a store using the provided client and TTL.
type Resolver ¶ added in v0.7.0
type Resolver[T any] interface { Resolve(ctx context.Context, msg T) ([]command.Commander[T], error) }
Resolver returns the handlers that should run for a given message.
type RetryableFlow ¶ added in v0.7.0
Example usage of retry with backoff for any flow pattern
func NewRetryableFlow ¶ added in v0.7.0
func NewRetryableFlow[T command.Message]( flow Flow[T], retryStrategy runner.RetryStrategy, maxRetries int, ) *RetryableFlow[T]
type SQLiteStateStore ¶ added in v0.7.0
type SQLiteStateStore struct {
// contains filtered or unexported fields
}
SQLiteStateStore persists state in a SQL table using a provided *sql.DB. The table is expected to have schema: CREATE TABLE IF NOT EXISTS states (k TEXT PRIMARY KEY, v TEXT);
func NewSQLiteStateStore ¶ added in v0.7.0
func NewSQLiteStateStore(db *sql.DB, table string) *SQLiteStateStore
NewSQLiteStateStore builds a store using the given DB and table name.
type Saga ¶ added in v0.7.0
Saga implements the saga pattern for distributed transactions a compensating transaction must be idempotent and retryable
type SagaConfig ¶ added in v0.7.0
type SagaConfig struct {
Steps []SagaStepConfig `json:"steps" yaml:"steps"`
CompensateOnError bool `json:"compensate_on_error,omitempty" yaml:"compensate_on_error,omitempty"`
}
type SagaStepConfig ¶ added in v0.7.0
type SerialConfig ¶ added in v0.7.0
type SerialConfig struct {
Steps []string `json:"steps" yaml:"steps"`
Opts FlowOptions `json:"options,omitempty" yaml:"options,omitempty"`
}
type SerialExecutor ¶ added in v0.7.0
type SerialExecutor[T any] struct { // contains filtered or unexported fields }
SerialExecutor runs multiple command handlers in sequence
func NewSerialExecutor ¶ added in v0.7.0
func NewSerialExecutor[T any](handlers []command.Commander[T], opts ...runner.Option) *SerialExecutor[T]
NewSerialExecutor creates a new SerialExecutor with the provided handlers
type StateConfig ¶ added in v0.7.0
type StateMachine ¶ added in v0.7.0
StateMachine executes transitions with optional guards/actions and persistence.
func NewStateMachine ¶ added in v0.7.0
func NewStateMachine[T command.Message]( cfg StateMachineConfig, store StateStore, req TransitionRequest[T], guards *GuardRegistry[T], actions *ActionRegistry[T], opts ...StateMachineOption[T], ) (*StateMachine[T], error)
NewStateMachine constructs a state machine flow.
type StateMachineConfig ¶ added in v0.7.0
type StateMachineConfig struct {
Entity string `json:"entity" yaml:"entity"`
States []StateConfig `json:"states" yaml:"states"`
Transitions []TransitionConfig `json:"transitions" yaml:"transitions"`
PersistWith string `json:"persist_with,omitempty" yaml:"persist_with,omitempty"`
}
func (StateMachineConfig) Validate ¶ added in v0.7.0
func (s StateMachineConfig) Validate() error
Validate ensures the state machine definition is well formed.
type StateMachineOption ¶ added in v0.7.0
type StateMachineOption[T command.Message] func(*StateMachine[T])
StateMachineOption customizes state machine behavior.
func WithInitialFallback ¶ added in v0.7.0
func WithInitialFallback[T command.Message](enable bool) StateMachineOption[T]
WithInitialFallback allows falling back to the initial state when both store and CurrentState are empty.
type StateStore ¶ added in v0.7.0
type StateStore interface {
Load(ctx context.Context, key string) (string, error)
Save(ctx context.Context, key, state string) error
}
StateStore persists workflow state by key.
type TransitionConfig ¶ added in v0.7.0
type TransitionRequest ¶ added in v0.7.0
type TransitionRequest[T any] struct { StateKey func(T) string Event func(T) string CurrentState func(T) string }
TransitionRequest extracts state machine metadata from a message.
func TransitionRequestFromState ¶ added in v0.7.0
func TransitionRequestFromState[T any](idFn func(T) string, stateFn func(T) string, eventFn func(T) string) TransitionRequest[T]
TransitionRequestFromState creates a TransitionRequest using ID and current state fields.
Source Files
¶
- action_registry.go
- batch_executor.go
- circuit_breaker.go
- commander_adapter.go
- conditional.go
- config.go
- config_loader.go
- error_strategy.go
- flow.go
- guard_registry.go
- metrics_decorator.go
- metrics_registry.go
- namespacing.go
- options.go
- parallel_executor.go
- registry.go
- saga.go
- serial_executor.go
- state_machine.go
- state_machine_helpers.go
- state_store.go