flow

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2026 License: MIT Imports: 13 Imported by: 0

README

Flow Package

The Flow package provides execution patterns for the go-command framework, offering flexible ways to run command handlers in different configurations.

Key Features

  • Serial Execution: Run handlers sequentially
  • Parallel Execution: Run handlers concurrently
  • Batch Processing: Process multiple messages in batches
  • Conditional Execution: Execute handlers based on predicates
  • Chain Execution: Sequential execution with message transformation

Installation

go get github.com/goliatone/go-command/flow

Usage Examples

Serial Execution

Run handlers one after another:

// Create handlers
inventoryHandler := &InventoryHandler{}
paymentHandler := &PaymentHandler{}
notificationHandler := &NotificationHandler{}

// Create serial executor with struct handlers
serialExecutor := flow.NewSerialExecutor(
    []command.Commander[ProcessOrderMessage]{
        inventoryHandler,
        paymentHandler,
        notificationHandler,
    },
    runner.WithTimeout(5*time.Second),
)

// Execute
if err := serialExecutor.Execute(ctx, order); err != nil {
    fmt.Printf("Serial execution failed: %v\n", err)
}

// Alternative using functions
err := flow.SerialExecute(
    ctx,
    order,
    []command.CommandFunc[ProcessOrderMessage]{
        logOrderHandler,
        func(ctx context.Context, msg ProcessOrderMessage) error {
            fmt.Printf("Processing order %s inline\n", msg.OrderID)
            return nil
        },
    },
)
Parallel Execution

Run handlers concurrently:

// Create parallel executor
parallelExecutor := flow.NewParallelExecutor(
    []command.Commander[ProcessOrderMessage]{
        inventoryHandler,
        paymentHandler,
        notificationHandler,
    },
    runner.WithTimeout(5*time.Second),
)

if err := parallelExecutor.Execute(ctx, order); err != nil {
    fmt.Printf("Parallel execution failed: %v\n", err)
}
Batch Processing

Process messages in batches with a single handler:

// Create multiple orders
orders := []ProcessOrderMessage{
    {OrderID: "ORD-1001", CustomerID: "C1", Items: []string{"Item1"}, TotalAmount: 10.99},
    {OrderID: "ORD-1002", CustomerID: "C2", Items: []string{"Item2"}, TotalAmount: 20.99},
    // ...more orders
}

// Create batch executor
batchExecutor := flow.NewBatchExecutor(
    logOrderHandler,
    flow.WithBatchSize[ProcessOrderMessage](2),
    flow.WithConcurrency[ProcessOrderMessage](2),
)

if err := batchExecutor.Execute(ctx, orders); err != nil {
    fmt.Printf("Batch execution failed: %v\n", err)
}
Conditional Execution

Execute handlers based on conditions:

// Create conditional executor
conditionalExecutor := flow.NewConditionalExecutor(
    []flow.Conditional[ProcessOrderMessage]{
        {
            Predicate: func(msg ProcessOrderMessage) bool {
                return msg.TotalAmount > 1000.0
            },
            Handler: func(ctx context.Context, msg ProcessOrderMessage) error {
                fmt.Println("High-value order, need manager approval")
                return nil
            },
        },
        {
            Predicate: func(msg ProcessOrderMessage) bool {
                return msg.TotalAmount <= 1000.0
            },
            Handler: func(ctx context.Context, msg ProcessOrderMessage) error {
                fmt.Println("Standard order processing")
                return nil
            },
        },
    },
)

conditionalExecutor.Execute(ctx, order)
Chain Execution

Execute handlers sequentially with message transformation:

// Create chain executor
chainExecutor := flow.NewChainExecutor(
    // Transform message after first handler
    func(msg ProcessOrderMessage) ProcessOrderMessage {
        msg.Status = "INVENTORY_CHECKED"
        return msg
    },
    // Transform message after second handler
    func(msg ProcessOrderMessage) ProcessOrderMessage {
        msg.Status = "PAYMENT_PROCESSED"
        return msg
    },
)

chainExecutor.Execute(ctx, order)
Combined Execution

Combine different execution patterns:

// Parallel inventory check
parallelInventory := flow.NewParallelExecutor(
    []command.Commander[ProcessOrderMessage]{
        &InventoryHandler{Name: "Inventory-US"},
        &InventoryHandler{Name: "Inventory-EU"},
    },
)

// Serial payment and notification
serialNotifyPay := flow.NewSerialExecutor(
    []command.Commander[ProcessOrderMessage]{
        paymentHandler,
        notificationHandler,
    },
)

// Combine them
combinedExecutor := flow.NewSerialExecutor(
    []command.Commander[ProcessOrderMessage]{
        parallelInventory,
        serialNotifyPay,
    },
)

combinedExecutor.Execute(ctx, order)

Configuration Options

Runner Options

All executors accept runner options for configuration:

executor := flow.NewSerialExecutor(
    handlers,
    runner.WithTimeout(5*time.Second),
    runner.WithMaxRetries(3),
    runner.WithExitOnError(true),
    runner.WithLogger(customLogger),
)
Batch Executor Options

Batch executors have additional configuration options:

batchExecutor := flow.NewBatchExecutor(
    handler,
    flow.WithBatchSize[MyMessage](100),   // Process 100 messages per batch
    flow.WithConcurrency[MyMessage](5),   // Run 5 batches concurrently
)

Error Handling

All executors provide consistent error handling:

  • Serial Executor: Returns on first error or aggregates all errors
  • Parallel Executor: Aggregates all errors from concurrent handlers
  • Batch Executor: Aggregates errors from batch processing
  • Conditional Executor: Returns error from matched handler
  • Chain Executor: Returns error from any handler in the chain

Configure error behavior with runner.WithExitOnError:

// Stop on first error
executor := flow.NewSerialExecutor(
    handlers,
    runner.WithExitOnError(true),
)

// Continue on error (aggregate all errors)
executor := flow.NewSerialExecutor(
    handlers,
    runner.WithExitOnError(false),
)

Design Benefits

  • Composable: Mix and match execution patterns
  • Type-safe: Leverages Go generics for type safety
  • Flexible: Configure timeouts, retries, and error handling
  • Consistent: Same error handling and context propagation across patterns
  • Extensible: Easy to add new execution patterns

Configuration (JSON/YAML)

Flows can be defined in config files and built via flow/config_loader.go. Example:

version: 1
flows:
  - id: order_pipeline
    type: serial
    serial:
      steps: ["inventory", "payment", "notify"]
    options:
      timeout: 5s
      max_retries: 2
      exit_on_error: true

Registries (handlers, guards, actions, metrics recorders) resolve the IDs referenced in config. Namespacing helpers avoid ID collisions (namespace::id).

State Machine Usage

Define states, transitions, guards, and actions, with pluggable state stores (in-memory, sqlite, redis). You must provide either a stored state or a CurrentState extractor; otherwise the state machine errors to prevent silent resets (use WithInitialFallback(true) to allow reset-to-initial):

smCfg := flow.StateMachineConfig{
  Entity: "order",
  States: []flow.StateConfig{{Name: "draft", Initial: true}, {Name: "approved"}},
  Transitions: []flow.TransitionConfig{
    {Name: "approve", From: "draft", To: "approved", Guard: "is_admin", Action: "audit"},
  },
}
guards := flow.NewGuardRegistry[OrderMsg]()
guards.Register("is_admin", func(m OrderMsg) bool { return m.Admin })
actions := flow.NewActionRegistry[OrderMsg]()
actions.Register("audit", func(ctx context.Context, m OrderMsg) error { return nil })
store := flow.NewInMemoryStateStore()
req := flow.TransitionRequest[OrderMsg]{
  StateKey:     func(m OrderMsg) string { return m.ID },
  Event:        func(m OrderMsg) string { return m.Event },
  CurrentState: func(m OrderMsg) string { return m.State }, // fallback when store has no entry
}
sm, _ := flow.NewStateMachine(smCfg, store, req, guards, actions)

Helper: flow.TransitionRequestFromState(idFn, stateFn, eventFn) builds a request using ID/current state/event to reduce boilerplate. Option flow.WithInitialFallback(true) re-enables the legacy behavior of falling back to the initial state when both store and CurrentState are empty.

Metrics/Tracing Decorators

Use MetricsDecorator to wrap any Flow[T] with metrics; provide a recorder implementation and optionally register it in MetricsRecorderRegistry for config-driven wiring. CircuitBreaker supports half-open probes; RetryableFlow wraps any Flow[T] with a retry strategy.

Hybrid Handler/Mux Usage

Flows accept explicit handlers; a mux resolver adapter is available for registry-driven dispatch. Compose nested flows by converting them to command.Commander[T] via flow.AsCommander. Use the namespacing helpers and registries to prevent ID conflicts across modules.

Registry Resolver Notes

The mux resolver relies on go-command registry metadata. If your command uses an interface message parameter, implement command.MessageFactory to provide a concrete, non-nil message value, otherwise resolver based registration treats the command as unsupported and skips metadata driven integrations.

Legacy Dispatcher Helpers

chain_dispatcher.go and parallel_dispatcher.go are legacy wrappers over the global dispatcher; the recommended approach is to use the flow executors with handler/mux resolvers.

Configuration (JSON/YAML)

Flows can be defined in config files and built via flow/config_loader.go. Example:

version: 1
flows:
  - id: order_pipeline
    type: serial
    serial:
      steps: ["inventory", "payment", "notify"]
    options:
      timeout: 5s
      max_retries: 2
      exit_on_error: true

Registries (handlers, guards, actions, metrics recorders) are used to resolve the IDs referenced in config.

State Machine Usage

Define states, transitions, guards, and actions, with pluggable state stores (in-memory, sqlite, redis):

smCfg := flow.StateMachineConfig{
  Entity: "order",
  States: []flow.StateConfig{{Name: "draft", Initial: true}, {Name: "approved"}},
  Transitions: []flow.TransitionConfig{{Name: "approve", From: "draft", To: "approved", Guard: "is_admin"}},
}
guards := flow.NewGuardRegistry[OrderMsg]()
guards.Register("is_admin", func(m OrderMsg) bool { return m.Admin })
store := flow.NewInMemoryStateStore()
req := flow.TransitionRequest[OrderMsg]{StateKey: func(m OrderMsg) string { return m.ID }, Event: func(m OrderMsg) string { return m.Event }}
sm, _ := flow.NewStateMachine(smCfg, store, req, guards, nil)

Metrics/Tracing Decorators

Use MetricsDecorator to wrap any Flow[T] with metrics, provide a recorder implementation and optionally register it in MetricsRecorderRegistry for config-driven wiring.

Hybrid Handler/Mux Usage

Flows accept explicit handlers; a mux resolver adapter is available for registry driven dispatch. Compose nested flows by converting them to command.Commander[T] via flow.AsCommander. Namespacing helpers avoid ID conflicts when registering handlers/guards/actions/recorders.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCircuitOpen = fmt.Errorf("circuit open")

Functions

func AsCommander added in v0.7.0

func AsCommander[T any](flow Flow[T]) command.Commander[T]

AsCommander converts a Flow to a command.Commander.

func BuildFlows added in v0.7.0

func BuildFlows[T command.Message](ctx context.Context, cfg FlowSet, bctx BuildContext[T]) (map[string]Flow[T], error)

BuildFlows constructs flows from config using provided registries.

func ExecuteBatch

func ExecuteBatch[T command.Message](ctx context.Context, messages []T, handler command.CommandFunc[T], batchSize, concurrency int, opts ...runner.Option) error

ExecuteBatch processes messages in batches with a function handler

func MarshalFlowSet added in v0.7.0

func MarshalFlowSet(cfg FlowSet) ([]byte, error)

MarshalFlowSet renders FlowSet as JSON (useful for fixtures).

func ParallelExecute

func ParallelExecute[T any](ctx context.Context, msg T, handlers []command.CommandFunc[T], opts ...runner.Option) error

ParallelExecute runs handlers concurrently with function handlers

func SerialExecute added in v0.7.0

func SerialExecute[T any](ctx context.Context, msg T, handlers []command.CommandFunc[T], opts ...runner.Option) error

SerialExecute will run each handler in sequence with function handlers

Types

type ActionRegistry added in v0.7.0

type ActionRegistry[T any] struct {
	// contains filtered or unexported fields
}

ActionRegistry stores named actions executed during transitions.

func NewActionRegistry added in v0.7.0

func NewActionRegistry[T any]() *ActionRegistry[T]

NewActionRegistry creates an empty registry.

func (*ActionRegistry[T]) Lookup added in v0.7.0

func (r *ActionRegistry[T]) Lookup(name string) (func(context.Context, T) error, bool)

Lookup retrieves an action by name.

func (*ActionRegistry[T]) Register added in v0.7.0

func (r *ActionRegistry[T]) Register(name string, action func(context.Context, T) error) error

Register adds an action by name.

func (*ActionRegistry[T]) RegisterNamespaced added in v0.7.0

func (r *ActionRegistry[T]) RegisterNamespaced(namespace, name string, action func(context.Context, T) error) error

RegisterNamespaced adds an action under namespace+name.

func (*ActionRegistry[T]) SetNamespacer added in v0.7.0

func (r *ActionRegistry[T]) SetNamespacer(fn func(string, string) string)

SetNamespacer customizes how action IDs are namespaced.

type AggregateErrorStrategy added in v0.7.0

type AggregateErrorStrategy struct{}

AggregateErrorStrategy combines all errors into one

func (AggregateErrorStrategy) HandleErrors added in v0.7.0

func (a AggregateErrorStrategy) HandleErrors(errs []error) error

type BatchConfig added in v0.7.0

type BatchConfig struct {
	Handler     string      `json:"handler" yaml:"handler"`
	BatchSize   int         `json:"batch_size,omitempty" yaml:"batch_size,omitempty"`
	Concurrency int         `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`
	Opts        FlowOptions `json:"options,omitempty" yaml:"options,omitempty"`
}

type BatchExecutor

type BatchExecutor[T command.Message] struct {
	// contains filtered or unexported fields
}

BatchExecutor processes commands in batches

func NewBatchExecutor

func NewBatchExecutor[T command.Message](handler command.Commander[T], opts ...BatchExecutorOption[T]) *BatchExecutor[T]

func (*BatchExecutor[T]) Execute

func (b *BatchExecutor[T]) Execute(ctx context.Context, messages []T) error

type BatchExecutorOption

type BatchExecutorOption[T command.Message] func(*BatchExecutor[T])

func WithBatchSize

func WithBatchSize[T command.Message](size int) BatchExecutorOption[T]

func WithConcurrency

func WithConcurrency[T command.Message](n int) BatchExecutorOption[T]

func WithRunnerOptions added in v0.7.0

func WithRunnerOptions[T command.Message](opts ...Option) BatchExecutorOption[T]

WithRunnerOptions attaches runner options to the batch executor.

type BuildContext added in v0.7.0

type BuildContext[T command.Message] struct {
	Handlers *HandlerRegistry[T]
	Guards   *GuardRegistry[T]
	Actions  *ActionRegistry[T]
	Store    StateStore
	Request  TransitionRequest[T]
}

BuildContext bundles registries and stores needed to construct flows from config.

type CircuitBreaker added in v0.7.0

type CircuitBreaker[T command.Message] struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern

func NewCircuitBreaker added in v0.7.0

func NewCircuitBreaker[T command.Message](
	flow Flow[T],
	failureThreshold int,
	resetTimeout time.Duration,
	opts ...CircuitBreakerOption[T],
) *CircuitBreaker[T]

func (*CircuitBreaker[T]) Execute added in v0.7.0

func (c *CircuitBreaker[T]) Execute(ctx context.Context, msg T) error

type CircuitBreakerOption added in v0.7.0

type CircuitBreakerOption[T command.Message] func(*CircuitBreaker[T])

CircuitBreakerOption allows customizing breaker behavior.

func WithHalfOpenProbe added in v0.7.0

func WithHalfOpenProbe[T command.Message](probe Flow[T]) CircuitBreakerOption[T]

WithHalfOpenProbe sets the flow executed when probing a half-open circuit.

type Conditional added in v0.7.0

type Conditional[T command.Message] struct {
	// Predicate is evaluated when provided.
	Predicate func(T) bool
	// Guard references a named guard in the registry when Predicate is nil.
	Guard   string
	Handler func(context.Context, T) error
}

type ConditionalBranch added in v0.7.0

type ConditionalBranch struct {
	Guard   string `json:"guard" yaml:"guard"`
	Handler string `json:"handler" yaml:"handler"`
}

type ConditionalConfig added in v0.7.0

type ConditionalConfig struct {
	Branches       []ConditionalBranch `json:"branches" yaml:"branches"`
	DefaultHandler string              `json:"default_handler,omitempty" yaml:"default_handler,omitempty"`
}

type ConditionalExecutor added in v0.7.0

type ConditionalExecutor[T command.Message] struct {
	// contains filtered or unexported fields
}

ConditionalExecutor enables conditional command execution based on predicates or guards.

func NewConditionalExecutor added in v0.7.0

func NewConditionalExecutor[T command.Message](branches []Conditional[T], opts ...ConditionalOption[T]) *ConditionalExecutor[T]

func (*ConditionalExecutor[T]) Execute added in v0.7.0

func (b *ConditionalExecutor[T]) Execute(ctx context.Context, msg T) error

type ConditionalOption added in v0.7.0

type ConditionalOption[T command.Message] func(*ConditionalExecutor[T])

ConditionalOption customizes conditional executors.

func WithDefaultHandler added in v0.7.0

func WithDefaultHandler[T command.Message](handler func(context.Context, T) error) ConditionalOption[T]

WithDefaultHandler sets the handler executed when no branch matches.

func WithGuardRegistry added in v0.7.0

func WithGuardRegistry[T command.Message](registry *GuardRegistry[T]) ConditionalOption[T]

WithGuardRegistry wires a guard registry for guard-based branches.

type DecoratorConfig added in v0.7.0

type DecoratorConfig struct {
	Type   string         `json:"type" yaml:"type"`
	Config map[string]any `json:"config,omitempty" yaml:"config,omitempty"`
}

type ErrorStrategy added in v0.7.0

type ErrorStrategy interface {
	HandleErrors([]error) error
}

ErrorStrategy defines how to handle multiple errors from parallel execution

type FailFastStrategy added in v0.7.0

type FailFastStrategy struct{}

FailFastStrategy returns the first error encountered

func (FailFastStrategy) HandleErrors added in v0.7.0

func (f FailFastStrategy) HandleErrors(errs []error) error

type Flow added in v0.7.0

type Flow[T any] interface {
	Execute(ctx context.Context, msg T) error
}

Flow is the common contract for all flow executors.

type FlowCommander added in v0.7.0

type FlowCommander[T any] struct {
	// contains filtered or unexported fields
}

FlowCommander wraps a Flow so it can be registered as a command.Commander.

func (*FlowCommander[T]) Execute added in v0.7.0

func (f *FlowCommander[T]) Execute(ctx context.Context, msg T) error

Execute delegates to the underlying flow.

type FlowDefinition added in v0.7.0

type FlowDefinition struct {
	ID           string              `json:"id" yaml:"id"`
	Type         string              `json:"type" yaml:"type"`
	Options      FlowOptions         `json:"options,omitempty" yaml:"options,omitempty"`
	Serial       *SerialConfig       `json:"serial,omitempty" yaml:"serial,omitempty"`
	Parallel     *ParallelConfig     `json:"parallel,omitempty" yaml:"parallel,omitempty"`
	Batch        *BatchConfig        `json:"batch,omitempty" yaml:"batch,omitempty"`
	Conditional  *ConditionalConfig  `json:"conditional,omitempty" yaml:"conditional,omitempty"`
	Saga         *SagaConfig         `json:"saga,omitempty" yaml:"saga,omitempty"`
	StateMachine *StateMachineConfig `json:"state_machine,omitempty" yaml:"state_machine,omitempty"`
	Decorators   []DecoratorConfig   `json:"decorators,omitempty" yaml:"decorators,omitempty"`
}

FlowDefinition describes a single flow instance.

func (FlowDefinition) Validate added in v0.7.0

func (d FlowDefinition) Validate() error

Validate checks required fields for the flow definition.

type FlowOptions added in v0.7.0

type FlowOptions struct {
	Timeout     time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
	NoTimeout   bool          `json:"no_timeout,omitempty" yaml:"no_timeout,omitempty"`
	MaxRetries  int           `json:"max_retries,omitempty" yaml:"max_retries,omitempty"`
	MaxRuns     int           `json:"max_runs,omitempty" yaml:"max_runs,omitempty"`
	RunOnce     bool          `json:"run_once,omitempty" yaml:"run_once,omitempty"`
	ExitOnError bool          `json:"exit_on_error,omitempty" yaml:"exit_on_error,omitempty"`
	Deadline    time.Time     `json:"deadline,omitempty" yaml:"deadline,omitempty"`
}

FlowOptions captures common runner options.

type FlowSet added in v0.7.0

type FlowSet struct {
	Version int              `json:"version" yaml:"version"`
	Flows   []FlowDefinition `json:"flows" yaml:"flows"`
	Options FlowOptions      `json:"options,omitempty" yaml:"options,omitempty"`
	Meta    map[string]any   `json:"meta,omitempty" yaml:"meta,omitempty"`
}

FlowSet represents a collection of flows loaded from config.

func ParseFlowSet added in v0.7.0

func ParseFlowSet(data []byte) (FlowSet, error)

ParseFlowSet attempts to parse JSON or YAML into a FlowSet.

func (FlowSet) Validate added in v0.7.0

func (c FlowSet) Validate() error

Validate performs basic structural validation.

type GuardRegistry added in v0.7.0

type GuardRegistry[T any] struct {
	// contains filtered or unexported fields
}

GuardRegistry stores named guard functions.

func NewGuardRegistry added in v0.7.0

func NewGuardRegistry[T any]() *GuardRegistry[T]

NewGuardRegistry creates an empty registry.

func (*GuardRegistry[T]) Lookup added in v0.7.0

func (g *GuardRegistry[T]) Lookup(name string) (func(T) bool, bool)

Lookup retrieves a guard by name.

func (*GuardRegistry[T]) Register added in v0.7.0

func (g *GuardRegistry[T]) Register(name string, guard func(T) bool) error

Register stores a guard by name.

func (*GuardRegistry[T]) RegisterNamespaced added in v0.7.0

func (g *GuardRegistry[T]) RegisterNamespaced(namespace, name string, guard func(T) bool) error

RegisterNamespaced stores a guard using namespace+name.

func (*GuardRegistry[T]) SetNamespacer added in v0.7.0

func (g *GuardRegistry[T]) SetNamespacer(fn func(string, string) string)

SetNamespacer customizes how guard IDs are namespaced.

type HandlerRegistry added in v0.7.0

type HandlerRegistry[T any] struct {
	// contains filtered or unexported fields
}

HandlerRegistry stores named commanders.

func NewHandlerRegistry added in v0.7.0

func NewHandlerRegistry[T any]() *HandlerRegistry[T]

NewHandlerRegistry creates an empty registry.

func (*HandlerRegistry[T]) Lookup added in v0.7.0

func (r *HandlerRegistry[T]) Lookup(id string) (command.Commander[T], bool)

Lookup returns a commander by id.

func (*HandlerRegistry[T]) Register added in v0.7.0

func (r *HandlerRegistry[T]) Register(id string, h command.Commander[T]) error

Register stores a commander by id.

func (*HandlerRegistry[T]) RegisterNamespaced added in v0.7.0

func (r *HandlerRegistry[T]) RegisterNamespaced(namespace, id string, h command.Commander[T]) error

RegisterNamespaced stores a commander using a namespace + id.

func (*HandlerRegistry[T]) SetNamespacer added in v0.7.0

func (r *HandlerRegistry[T]) SetNamespacer(fn func(string, string) string)

SetNamespacer customizes how IDs are namespaced.

type HandlerResolver added in v0.7.0

type HandlerResolver[T any] struct {
	// contains filtered or unexported fields
}

HandlerResolver resolves a static list of handlers.

func NewHandlerResolver added in v0.7.0

func NewHandlerResolver[T any](handlers ...command.Commander[T]) *HandlerResolver[T]

NewHandlerResolver constructs a resolver backed by explicit handlers.

func (*HandlerResolver[T]) Resolve added in v0.7.0

func (r *HandlerResolver[T]) Resolve(_ context.Context, msg T) ([]command.Commander[T], error)

Resolve returns the configured handlers or an error if none exist.

type InMemoryStateStore added in v0.7.0

type InMemoryStateStore struct {
	// contains filtered or unexported fields
}

InMemoryStateStore is a thread-safe in-memory store.

func NewInMemoryStateStore added in v0.7.0

func NewInMemoryStateStore() *InMemoryStateStore

NewInMemoryStateStore constructs an empty store.

func (*InMemoryStateStore) Load added in v0.7.0

func (s *InMemoryStateStore) Load(_ context.Context, key string) (string, error)

Load returns stored state or empty string.

func (*InMemoryStateStore) Save added in v0.7.0

func (s *InMemoryStateStore) Save(_ context.Context, key, state string) error

Save writes the state for the key.

type MetricsDecorator added in v0.7.0

type MetricsDecorator[T command.Message] struct {
	// contains filtered or unexported fields
}

MetricsDecorator adds metrics to any flow

func NewMetricsDecorator added in v0.7.0

func NewMetricsDecorator[T command.Message](
	flow interface {
		Execute(context.Context, T) error
	},
	recorder MetricsRecorder,
) *MetricsDecorator[T]

func (*MetricsDecorator[T]) Execute added in v0.7.0

func (m *MetricsDecorator[T]) Execute(ctx context.Context, msg T) error

type MetricsRecorder added in v0.7.0

type MetricsRecorder interface {
	RecordDuration(name string, duration time.Duration)
	RecordError(name string)
	RecordSuccess(name string)
}

type MetricsRecorderRegistry added in v0.7.0

type MetricsRecorderRegistry struct {
	// contains filtered or unexported fields
}

MetricsRecorderRegistry stores named metrics recorders.

func NewMetricsRecorderRegistry added in v0.7.0

func NewMetricsRecorderRegistry() *MetricsRecorderRegistry

NewMetricsRecorderRegistry constructs an empty registry.

func (*MetricsRecorderRegistry) Lookup added in v0.7.0

Lookup retrieves a recorder by name.

func (*MetricsRecorderRegistry) Register added in v0.7.0

func (r *MetricsRecorderRegistry) Register(name string, mr MetricsRecorder) error

Register stores a recorder by name.

func (*MetricsRecorderRegistry) RegisterNamespaced added in v0.7.0

func (r *MetricsRecorderRegistry) RegisterNamespaced(namespace, name string, mr MetricsRecorder) error

RegisterNamespaced stores a recorder by namespace+name.

func (*MetricsRecorderRegistry) SetNamespacer added in v0.7.0

func (r *MetricsRecorderRegistry) SetNamespacer(fn func(string, string) string)

SetNamespacer customizes namespacing.

type MuxResolver added in v0.7.0

type MuxResolver[T any] struct {
	// contains filtered or unexported fields
}

MuxResolver resolves handlers from a router mux using the message type.

func NewMuxResolver added in v0.7.0

func NewMuxResolver[T any](mux *router.Mux) *MuxResolver[T]

NewMuxResolver builds a resolver using the provided mux (or a new one when nil).

func (*MuxResolver[T]) Resolve added in v0.7.0

func (r *MuxResolver[T]) Resolve(_ context.Context, msg T) ([]command.Commander[T], error)

Resolve looks up handlers by message type and converts them to Commanders.

type Option added in v0.7.0

type Option = runner.Option

Option mirrors runner.Option so flows can share runner configuration knobs.

type ParallelConfig added in v0.7.0

type ParallelConfig struct {
	Steps         []string    `json:"steps" yaml:"steps"`
	ErrorStrategy string      `json:"error_strategy,omitempty" yaml:"error_strategy,omitempty"`
	Opts          FlowOptions `json:"options,omitempty" yaml:"options,omitempty"`
}

type ParallelExecutor

type ParallelExecutor[T any] struct {
	// contains filtered or unexported fields
}

func NewParallelExecutor

func NewParallelExecutor[T any](handlers []command.Commander[T], opts ...runner.Option) *ParallelExecutor[T]

NewParallelExecutor creates a new ParallelExecutor with the provided handlers

func (*ParallelExecutor[T]) Execute

func (p *ParallelExecutor[T]) Execute(ctx context.Context, msg T) error

func (*ParallelExecutor[T]) WithErrorStrategy added in v0.7.0

func (p *ParallelExecutor[T]) WithErrorStrategy(strategy ErrorStrategy) *ParallelExecutor[T]

WithErrorStrategy overrides the error strategy used to combine handler errors.

type RedisClient added in v0.7.0

type RedisClient interface {
	Get(ctx context.Context, key string) (string, error)
	Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
}

RedisClient captures the minimal commands needed from a redis client.

type RedisStateStore added in v0.7.0

type RedisStateStore struct {
	// contains filtered or unexported fields
}

RedisStateStore persists state via a minimal redis client interface.

func NewRedisStateStore added in v0.7.0

func NewRedisStateStore(client RedisClient, ttl time.Duration) *RedisStateStore

NewRedisStateStore builds a store using the provided client and TTL.

func (*RedisStateStore) Load added in v0.7.0

func (s *RedisStateStore) Load(ctx context.Context, key string) (string, error)

Load reads state from redis.

func (*RedisStateStore) Save added in v0.7.0

func (s *RedisStateStore) Save(ctx context.Context, key, state string) error

Save writes state to redis.

type Resolver added in v0.7.0

type Resolver[T any] interface {
	Resolve(ctx context.Context, msg T) ([]command.Commander[T], error)
}

Resolver returns the handlers that should run for a given message.

type RetryableFlow added in v0.7.0

type RetryableFlow[T command.Message] struct {
	// contains filtered or unexported fields
}

Example usage of retry with backoff for any flow pattern

func NewRetryableFlow added in v0.7.0

func NewRetryableFlow[T command.Message](
	flow Flow[T],
	retryStrategy runner.RetryStrategy,
	maxRetries int,
) *RetryableFlow[T]

func (*RetryableFlow[T]) Execute added in v0.7.0

func (r *RetryableFlow[T]) Execute(ctx context.Context, msg T) error

type SQLiteStateStore added in v0.7.0

type SQLiteStateStore struct {
	// contains filtered or unexported fields
}

SQLiteStateStore persists state in a SQL table using a provided *sql.DB. The table is expected to have schema: CREATE TABLE IF NOT EXISTS states (k TEXT PRIMARY KEY, v TEXT);

func NewSQLiteStateStore added in v0.7.0

func NewSQLiteStateStore(db *sql.DB, table string) *SQLiteStateStore

NewSQLiteStateStore builds a store using the given DB and table name.

func (*SQLiteStateStore) Load added in v0.7.0

func (s *SQLiteStateStore) Load(ctx context.Context, key string) (string, error)

Load reads state for key.

func (*SQLiteStateStore) Save added in v0.7.0

func (s *SQLiteStateStore) Save(ctx context.Context, key, state string) error

Save upserts state for key.

type Saga added in v0.7.0

type Saga[T command.Message] struct {
	// contains filtered or unexported fields
}

Saga implements the saga pattern for distributed transactions a compensating transaction must be idempotent and retryable

func NewSaga added in v0.7.0

func NewSaga[T command.Message](steps []SagaStep[T], compensate bool) *Saga[T]

func (*Saga[T]) Execute added in v0.7.0

func (s *Saga[T]) Execute(ctx context.Context, msg T) error

type SagaConfig added in v0.7.0

type SagaConfig struct {
	Steps             []SagaStepConfig `json:"steps" yaml:"steps"`
	CompensateOnError bool             `json:"compensate_on_error,omitempty" yaml:"compensate_on_error,omitempty"`
}

type SagaStep added in v0.7.0

type SagaStep[T command.Message] struct {
	Name       string
	Execute    func(context.Context, T) error
	Compensate func(context.Context, T) error
}

type SagaStepConfig added in v0.7.0

type SagaStepConfig struct {
	Do         string `json:"do" yaml:"do"`
	Compensate string `json:"compensate,omitempty" yaml:"compensate,omitempty"`
}

type SerialConfig added in v0.7.0

type SerialConfig struct {
	Steps []string    `json:"steps" yaml:"steps"`
	Opts  FlowOptions `json:"options,omitempty" yaml:"options,omitempty"`
}

type SerialExecutor added in v0.7.0

type SerialExecutor[T any] struct {
	// contains filtered or unexported fields
}

SerialExecutor runs multiple command handlers in sequence

func NewSerialExecutor added in v0.7.0

func NewSerialExecutor[T any](handlers []command.Commander[T], opts ...runner.Option) *SerialExecutor[T]

NewSerialExecutor creates a new SerialExecutor with the provided handlers

func (*SerialExecutor[T]) Execute added in v0.7.0

func (s *SerialExecutor[T]) Execute(ctx context.Context, msg T) error

type StateConfig added in v0.7.0

type StateConfig struct {
	Name        string `json:"name" yaml:"name"`
	Description string `json:"description,omitempty" yaml:"description,omitempty"`
	Terminal    bool   `json:"terminal,omitempty" yaml:"terminal,omitempty"`
	Initial     bool   `json:"initial,omitempty" yaml:"initial,omitempty"`
}

type StateMachine added in v0.7.0

type StateMachine[T command.Message] struct {
	// contains filtered or unexported fields
}

StateMachine executes transitions with optional guards/actions and persistence.

func NewStateMachine added in v0.7.0

func NewStateMachine[T command.Message](
	cfg StateMachineConfig,
	store StateStore,
	req TransitionRequest[T],
	guards *GuardRegistry[T],
	actions *ActionRegistry[T],
	opts ...StateMachineOption[T],
) (*StateMachine[T], error)

NewStateMachine constructs a state machine flow.

func (*StateMachine[T]) Execute added in v0.7.0

func (s *StateMachine[T]) Execute(ctx context.Context, msg T) error

Execute applies a transition based on the incoming message.

type StateMachineConfig added in v0.7.0

type StateMachineConfig struct {
	Entity      string             `json:"entity" yaml:"entity"`
	States      []StateConfig      `json:"states" yaml:"states"`
	Transitions []TransitionConfig `json:"transitions" yaml:"transitions"`
	PersistWith string             `json:"persist_with,omitempty" yaml:"persist_with,omitempty"`
}

func (StateMachineConfig) Validate added in v0.7.0

func (s StateMachineConfig) Validate() error

Validate ensures the state machine definition is well formed.

type StateMachineOption added in v0.7.0

type StateMachineOption[T command.Message] func(*StateMachine[T])

StateMachineOption customizes state machine behavior.

func WithInitialFallback added in v0.7.0

func WithInitialFallback[T command.Message](enable bool) StateMachineOption[T]

WithInitialFallback allows falling back to the initial state when both store and CurrentState are empty.

type StateStore added in v0.7.0

type StateStore interface {
	Load(ctx context.Context, key string) (string, error)
	Save(ctx context.Context, key, state string) error
}

StateStore persists workflow state by key.

type TransitionConfig added in v0.7.0

type TransitionConfig struct {
	Name   string `json:"name" yaml:"name"`
	From   string `json:"from" yaml:"from"`
	To     string `json:"to" yaml:"to"`
	Guard  string `json:"guard,omitempty" yaml:"guard,omitempty"`
	Action string `json:"action,omitempty" yaml:"action,omitempty"`
}

type TransitionRequest added in v0.7.0

type TransitionRequest[T any] struct {
	StateKey     func(T) string
	Event        func(T) string
	CurrentState func(T) string
}

TransitionRequest extracts state machine metadata from a message.

func TransitionRequestFromState added in v0.7.0

func TransitionRequestFromState[T any](idFn func(T) string, stateFn func(T) string, eventFn func(T) string) TransitionRequest[T]

TransitionRequestFromState creates a TransitionRequest using ID and current state fields.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL