fluxor

package module
v0.1.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2025 License: Apache-2.0 Imports: 33 Imported by: 2

README

Fluxor: A Generic Workflow Engine

Fluxor is a powerful, Go-based workflow engine that allows you to define, execute, and manage complex workflows with custom actions and executors. This README provides an overview of Fluxor's architecture, features, and how to use it effectively.

Table of Contents

Overview

Fluxor is designed to provide a flexible, extensible workflow engine that can be integrated into any Go application. It supports:

  • Complex workflow definitions with nested tasks
  • Task dependencies and conditional execution
  • Custom actions and executors
  • State management and persistence
  • Pausable and resumable workflows
  • Dynamic task allocation

Key Features

  • Declarative Workflow Definitions: Define workflows in YAML or JSON
  • Task Dependencies: Specify dependencies between tasks to control execution order
  • Custom Actions: Create and register custom actions to extend the engine
  • State Management: Track workflow state and execution history
  • Concurrency Control: Control the level of parallelism in workflow execution
  • Extensible Architecture: Easily extend the engine with custom components
  • Optional Human Approval: Pause selected tasks until a human (or custom logic) approves them
  • Ad-hoc Execution: Run a single task or sub-pipeline on demand for quick streaming, tests, debugging or manual reruns without launching the full workflow

Architecture

Fluxor consists of the following main components:

  • Workflow Model: Defines the structure of workflows, tasks, and transitions
  • Process Engine: Manages the lifecycle of workflow processes
  • Task Executor: Executes individual tasks within a workflow
  • Task Allocator: Allocates tasks to execution workers
  • Persistence Layer: Stores workflow definitions and execution state
  • Extension System: Allows for custom actions and executors
Component Diagram
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Workflow      │    │      Task       │    │   Extension     │
│   Definition    │───▶│    Allocator    │───▶│     System      │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                              │                        │
                              ▼                        ▼
                       ┌─────────────────┐    ┌─────────────────┐
                       │     Process     │    │      Task       │
                       │     Engine      │◀───│     Executor    │
                       └─────────────────┘    └─────────────────┘
                              │
                              ▼
                       ┌─────────────────┐
                       │   Persistence   │
                       │      Layer      │
                       └─────────────────┘

                              │ (approval request)
                              ▼
                       ┌─────────────────┐
                       │  Approval Queue │
                       └────────┬────────┘
                                │ consume
                                ▼
                       ┌─────────────────┐
                       │  Approval       │
                       │   Service       │
                       └─────────────────┘

The Approval components are initialised only when the optional policy layer (see below) is enabled.

Getting Started

Installation
go get github.com/viant/fluxor
Basic Usage
package main

import (
	"context"
	"fmt"
	"github.com/viant/fluxor"
	"time"
)

func main() {
	err := runIt()
	if err != nil {
		panic(err)
	}
}

func runIt() error {
	srv := fluxor.New()
	runtime := srv.Runtime()
	ctx := context.Background()
	workflow, err := runtime.LoadWorkflow(ctx, "parent.yaml")
	if err != nil {
		return err
	}
	_ = runtime.Start(ctx)
	process, wait, err := runtime.StartProcess(ctx, workflow, map[string]interface{}{})
	if err != nil {
		return err
	}
	output, err := wait(ctx, time.Minute)
	if err != nil {
		return err
	}
	fmt.Printf("output: %+v\n", output)
	return nil
}

Defining Workflows

Fluxor workflows are defined in YAML or JSON. Here's a simple example:

init:
  i: 0

pipeline:
  start:
    action: printer:print
    input:
      message: 'Parent started'
  loop:
    inc:
      action: nop:nop
      post:
        i: ${i + 1}

    runChildren:
      action: workflow:run
      input:
        location: children
        context:
          iteration: $i
    body:
      action: printer:print
      input:
        message: 'Iteration: $i'
      goto:
        when: i < 3
        task: loop

  stop:
    action: printer:print
    input:
      message: 'Parent stoped'

Alternatively, you can define the pipeline as an ordered sequence of task definitions:

pipeline:
  - id: list
    service: system/storage
    action: list
    with:
      URL: "file://."
  - id: show
    service: printer
    action: print
    with:
      message: ${ $.list.entries }

Custom Actions

Custom actions allow you to extend Fluxor with your own functionality:

package myaction

import (
    "context"
    "github.com/viant/fluxor/model/types"
    "reflect"
)

type Input struct {
    Param1 string `json:"param1"`
    Param2 int    `json:"param2"`
}

type Output struct {
    Result string `json:"result"`
}

type Service struct{}

func (s *Service) Name() string {
    return "my/action"
}

func (s *Service) Methods() types.Signatures {
    return []types.Signature{
        {
            Name:        "execute",
            Description: "Executes custom action logic using provided input parameters to produce output.",
            Input:       reflect.TypeOf(&Input{}),
            Output:      reflect.TypeOf(&Output{}),
        },
    }
}

func (s *Service) Method(name string) (types.Executable, error) {
    if name == "execute" {
        return s.execute, nil
    }
    return nil, types.NewMethodNotFoundError(name)
}

func (s *Service) execute(ctx context.Context, in, out interface{}) error {
    input := in.(*Input)
    output := out.(*Output)
    
    // Implement your action logic
    output.Result = fmt.Sprintf("Processed %s with value %d", input.Param1, input.Param2)
    
    return nil
}

Register your custom action:

actions.Register(&myaction.Service{})

Executors

Executors handle the execution of tasks within workflows:

task:
  action: system/executor
  commands:
    - echo "Hello, World!"

The built-in system/executor action allows you to run shell commands.

Task Allocation

The task allocator is responsible for assigning tasks to execution workers and managing dependencies between tasks. It ensures tasks are executed in the correct order based on their dependencies.

alloc := allocator.New(
    processDAO,
    taskExecutionDAO,
    queue,
    allocator.DefaultConfig(),
)
go alloc.Start(ctx)

Advanced Features

Task Dependencies
taskB:
  dependsOn: taskA
  action: printer:print
  input:
    message: "Task B depends on Task A"
Parallel Execution
parallelTasks:
  tasks:
    - id: task1
      async: true
      action: printer:print
      input:
        message: "Running in parallel 1"
    
    - id: task2
      async: true
      action: printer:print
      input:
        message: "Running in parallel 2"
Template Tasks

Template tasks allow you to repeat a sub-task over each element of a collection. Use the template keyword with a selector and an inner task definition:

pipeline:
  processOrders:
    template:
      selector:
        - name: order
          value: "$orders"
      task:
        processOne:
          action:
            service: printer
            method: print
            input:
              message: $order

In this example, processOrders will spawn one processOne task for each element in orders, binding the current element to $order in each task.

⚠️ Runtime validation: at the moment the selector expression is expanded the resolved value must be a slice or array. If it evaluates to any other type (string, map, scalar, nil, …) Fluxor fails the task with an explicit error message and the workflow moves to its normal failure path (or retries if a retry-strategy is defined).

Ad-hoc Execution

Sometimes you only want to run one task (or a small subset of the pipeline) to verify behaviour, re-process a failed item or perform a quick manual job. Fluxor exposes a convenience helper that executes a task outside the context of a full workflow run:

taskID  := "transformCsv"
input   := map[string]any{"file": "customers.csv"}

// Execute the task once and wait synchronously for the result.
output, err := runtime.RunTaskOnce(ctx, workflow, taskID, input)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s output: %+v\n", taskID, output)

Under the hood the engine still uses the same allocator / processor / executor infrastructure so semantics stay identical⁠—you just avoid spinning up the entire parent workflow. input: message: "Order: $order"

Optional Task Approval & Policy Layer

Fluxor includes an opt-in policy subsystem that can pause task execution until it is manually approved – perfect for production safety-nets, dry-runs or interactive debugging.

Runtime Control Plane (pause / resume / cancel)

Starting with v0.4 Fluxor ships a lightweight control-plane that lets operators interact with a live workflow run after it has started.

Action Effect
pause Stops scheduling new tasks. In-flight executions finish normally.
resume Re-enables scheduling after a pause.
cancel Immediately cancels the per-process context (ctx.Done()) so long-running tasks terminate early and stops further scheduling.
resumeFailed Rewinds a workflow that ended in failed / cancelled state and re-starts it.

Simple API calls:

// pause / resume
_ = processor.PauseProcess(ctx,   "workflow/123")
_ = processor.ResumeProcess(ctx,  "workflow/123")

// graceful cancellation
_ = processor.CancelProcess(ctx,  "workflow/123", "cli kill")

// give the workflow another try after fixing the root cause
_ = processor.ResumeFailedProcess(ctx, "workflow/123")
Life-cycle diagram
          ┌────────────┐               ┌───────────────┐
   start  │  running   │ pause         │ pauseRequested│
─────────►│            │──────────────►│               │
          └─────┬──────┘               └──────┬────────┘
                │ resume                         │ all running
                │                                │ tasks drained
                │                                ▼
                │               ┌───────────────┐
                └───────────────│   paused      │
                                 └──────┬────────┘ resume
                                        │
                                        ▼
                                  (back to running)

cancel        ┌──────────────┐
─────────────►│cancelRequested│── allocator ──► cancelled
              └──────────────┘

One-line Progress Tracker

Every root workflow carries a context-local Progress tracker. Allocator, Processor and Executor update it with a single helper:

progress.UpdateCtx(ctx, progress.Delta{Pending:-1, Running:+1})

The tracker keeps aggregate counters (total / completed / skipped / failed / …) for all sub-workflows and exposes an optional callback to stream live updates to dashboards.

Snapshot at any time:

if p, ok := progress.GetSnapshot(ctx); ok {
    fmt.Printf("%.1f%% done (%d/%d)\n",
        100*float64(p.CompletedTasks)/float64(p.TotalTasks),
        p.CompletedTasks, p.TotalTasks)
}
Execution Modes
Mode Behaviour
auto Run tasks immediately (default).
ask Create an approval request before running each task (unless whitelisted).
deny Skip the task and mark the execution as failed.
Minimal Example
ctx := policy.WithPolicy(context.Background(), &policy.Policy{
    Mode: policy.ModeAsk,
})

proc, wait, _ := runtime.StartProcess(ctx, wf, nil)
// ... respond to approval prompts ...
End-to-end Flow
  1. The executor evaluates the policy before every task.
  2. If Mode==ask and no decision has been recorded yet, it emits an approval.Request message and suspends the execution (waitForApproval).
  3. The Approval Service consumes the request, calls your AskFunc, persists the decision and – if approved – re-enqueues the execution so a worker can continue processing.

The default AskFunc bundled with Fluxor simply returns true (auto-approve) so that enabling the feature never blocks development. Provide your own callback via fluxor.WithApprovalAskFunc for real-world use cases.

Contributing

Contributions to Fluxor are welcome! Please feel free to submit a Pull Request.

License

Fluxor is licensed under the LICENSE file in the root directory of this source tree.


© 2012-2023 Viant, inc. All rights reserved.

Documentation

Overview

Package fluxor provides a generic, extensible workflow engine.

The engine executes workflows defined declaratively (for example in YAML or JSON) and comes with pluggable service layers such as:

  • runtime – orchestration of workflow execution
  • allocator – task allocation and state management
  • executor – task execution through custom actions
  • approval – optional human-in-the-loop task approval

Fluxor is designed to be embedded in host applications. End-users typically interact with the engine via the high-level Service façade exposed by the root package:

srv := fluxor.New()
rt  := srv.Runtime()
wf, _ := rt.LoadWorkflow(ctx, "workflow.yaml")
_, wait, _ := rt.StartProcess(ctx, wf, nil)
out, _ := wait(ctx, time.Minute)

For more details see the README and individual sub-packages.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTaskNotFound   = errors.New("task not found in workflow")
	ErrMethodNotFound = errors.New("method not found in service")
)

Functions

This section is empty.

Types

type Config added in v0.1.1

type Config struct {
	Processor ProcessorConfig `json:"processor" yaml:"processor"`
}

func DefaultConfig added in v0.1.1

func DefaultConfig() *Config

DefaultConfig returns a Config populated with exactly the same default values that were previously hard-coded in the constructors. Callers may modify the returned struct before passing it to NewFromConfig.

func (*Config) Validate added in v0.1.1

func (c *Config) Validate() error

Validate returns aggregated error describing invalid settings or nil.

type Option

type Option func(s *Service)

Service represents fluxor service

func WithApprovalService added in v0.1.1

func WithApprovalService(svc approval.Service) Option

WithApprovalService sets the approvalService service

func WithEventService

func WithEventService(service *event.Service) Option

func WithExecutorOptions added in v0.1.1

func WithExecutorOptions(opts ...executor.Option) Option

WithExecutorOptions lets the caller supply additional options passed to executor.NewService (e.g. disabling the default StdoutListener).

func WithExtensionServices

func WithExtensionServices(services ...types.Service) Option

WithExtensionServices sets the extension services

func WithExtensionTypes

func WithExtensionTypes(types ...*x.Type) Option

WithExtensionTypes sets the extension types

func WithMetaBaseURL

func WithMetaBaseURL(url string) Option

WithMetaBaseURL sets the meta base URL

func WithMetaFsOptions

func WithMetaFsOptions(options ...storage.Option) Option

WithMetaFsOptions with meta file system options

func WithMetaService

func WithMetaService(service *meta.Service) Option

WithMetaService sets the meta service

func WithProcessDAO

func WithProcessDAO(dao dao.Service[string, execution.Process]) Option

WithProcessDAO sets the processor DAO

func WithProcessorWorkers

func WithProcessorWorkers(count int) Option

WithProcessorWorkers sets the processor workers

func WithQueue

func WithQueue(queue messaging.Queue[execution.Execution]) Option

WithQueue sets the message queue

func WithRootTaskNodeName

func WithRootTaskNodeName(name string) Option

WithRootTaskNodeName sets the root task node name

func WithStateListeners added in v0.1.2

func WithStateListeners(listeners ...execution.StateListener) Option

func WithTaskExecutionDAO

func WithTaskExecutionDAO(dao dao.Service[string, execution.Execution]) Option

WithTaskExecutionDAO sets the task execution DAO

func WithTracing

func WithTracing(serviceName, serviceVersion, outputFile string) Option

WithTracing configures OpenTelemetry tracing for the service. If outputFile is empty the stdout exporter is used; otherwise traces are written to the supplied file path. The function is safe to call multiple times – the first successful initialisation wins.

func WithTracingExporter

func WithTracingExporter(serviceName, serviceVersion string, exporter sdktrace.SpanExporter) Option

WithTracingExporter configures OpenTelemetry tracing using a custom SpanExporter. This enables integrations with exporters other than the built-in stdout exporter, for example OTLP, Jaeger or Zipkin. The function is safe to call multiple times – the first successful initialisation wins.

func WithWhenListeners added in v0.1.2

func WithWhenListeners(listeners ...execution.WhenListener) Option

WithWhenListeners registers callbacks invoked after every when-condition evaluation.

type ProcessorConfig added in v0.1.1

type ProcessorConfig struct {
	WorkerCount int `json:"workers" yaml:"workers"`
}

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime represents a workflow engine runtime

func (*Runtime) DecodeYAMLWorkflow

func (r *Runtime) DecodeYAMLWorkflow(data []byte) (*model.Workflow, error)

DecodeYAMLWorkflow loads a workflow

func (*Runtime) Execution

func (r *Runtime) Execution(ctx context.Context, id string) (*execution.Execution, error)

Execution returns an execution

func (*Runtime) LoadWorkflow

func (r *Runtime) LoadWorkflow(ctx context.Context, location string) (*model.Workflow, error)

LoadWorkflow loads a workflow

func (*Runtime) Process

func (r *Runtime) Process(ctx context.Context, id string) (*execution.Process, error)

Process returns a process

func (*Runtime) ProcessFromContext added in v0.1.7

func (r *Runtime) ProcessFromContext(ctx context.Context) *execution.Process

ProcessFromContext return process from context

func (*Runtime) Processes

func (r *Runtime) Processes(ctx context.Context, parameter ...*dao.Parameter) ([]*execution.Process, error)

Processes returns a list of processes

func (*Runtime) RefreshWorkflow added in v0.1.19

func (r *Runtime) RefreshWorkflow(location string) error

RefreshWorkflow discards any cached copy of the workflow definition located at the given URL/location. The next LoadWorkflow call will reload the file via the configured meta-service (i.e. one extra disk/cloud round-trip).

func (*Runtime) RunTaskOnce added in v0.1.9

func (r *Runtime) RunTaskOnce(ctx context.Context, wf *model.Workflow, taskID string, input interface{}) (interface{}, error)

RunTaskOnce is a convenience helper that executes a *single* task from the supplied workflow and waits for its completion. It is intended for quick ad-hoc jobs, debugging and unit tests where launching the entire workflow would be unnecessary overhead.

The helper works by submitting an "at-hoc" execution to the shared allocator/processor queue, therefore semantics (retries, policies, tracing etc.) are identical to regular executions. The returned value is whatever the task's action populates as its output.

func (*Runtime) ScheduleExecution added in v0.1.7

func (r *Runtime) ScheduleExecution(ctx context.Context, exec *execution.Execution) (func(duration time.Duration) (*execution.Execution, error), error)

func (*Runtime) Shutdown

func (r *Runtime) Shutdown(ctx context.Context) error

Shutdown shutdowns runtime

func (*Runtime) Start

func (r *Runtime) Start(ctx context.Context) error

Start starts runtime

func (*Runtime) StartProcess

func (r *Runtime) StartProcess(ctx context.Context, aWorkflow *model.Workflow, initialState map[string]interface{}, tasks ...string) (*execution.Process, execution.Wait, error)

StartProcess starts a new process

func (*Runtime) UpsertDefinition added in v0.1.19

func (r *Runtime) UpsertDefinition(location string, data []byte) error

UpsertDefinition parses the supplied YAML bytes and stores the resulting workflow definition in the in-memory cache under the specified location. When data is nil the call falls back to RefreshWorkflow, causing a lazy reload on next use.

type Service

type Service struct {
	// contains filtered or unexported fields
}

func New

func New(options ...Option) *Service

func NewFromConfig added in v0.1.1

func NewFromConfig(cfg *Config, opts ...Option) *Service

NewFromConfig constructs the Fluxor engine using a declarative configuration that can be further customised by functional options. The precedence order is:

  1. package defaults (via DefaultConfig)
  2. values present in cfg (may be nil – treated as empty)
  3. values set by Option functions (highest priority)

func (*Service) Actions

func (s *Service) Actions() *extension.Actions

func (*Service) ApprovalService added in v0.1.1

func (s *Service) ApprovalService() approval.Service

func (*Service) EventService

func (s *Service) EventService() *event.Service

EventService returns event service

func (*Service) NewContext

func (s *Service) NewContext(ctx context.Context) context.Context

func (*Service) RegisterExtensionServices

func (s *Service) RegisterExtensionServices(services ...types.Service)

func (*Service) RegisterExtensionType

func (s *Service) RegisterExtensionType(aType *x.Type)

func (*Service) RegisterExtensionTypes

func (s *Service) RegisterExtensionTypes(types ...*x.Type)

func (*Service) Runtime

func (s *Service) Runtime() *Runtime

Directories

Path Synopsis
examples
workflow command
Package extension provides run-time registries that allow Fluxor to work with user-defined Go types (for example custom action inputs or outputs).
Package extension provides run-time registries that allow Fluxor to work with user-defined Go types (for example custom action inputs or outputs).
internal
clock
Package clock provides test-friendly wrappers around the standard library time utilities.
Package clock provides test-friendly wrappers around the standard library time utilities.
idgen
Package idgen wraps the UUID generator so that it can be stubbed in tests.
Package idgen wraps the UUID generator so that it can be stubbed in tests.
Package model contains the in-memory representation of workflow definitions, runtime state and supporting types used by the Fluxor engine.
Package model contains the in-memory representation of workflow definitions, runtime state and supporting types used by the Fluxor engine.
Package policy provides optional declarative rules that can be applied on top of a running Fluxor engine – for example to require human approval for selected tasks or to enforce execution constraints.
Package policy provides optional declarative rules that can be applied on top of a running Fluxor engine – for example to require human approval for selected tasks or to enforce execution constraints.
Package progress defines primitives for reporting and aggregating the progress of long-running tasks executed by the Fluxor runtime.
Package progress defines primitives for reporting and aggregating the progress of long-running tasks executed by the Fluxor runtime.
runtime
evaluator
Package evaluator is responsible for evaluating (interpolating) dynamic expressions found in workflow definitions at run-time.
Package evaluator is responsible for evaluating (interpolating) dynamic expressions found in workflow definitions at run-time.
execution
Package execution contains the core entities representing workflow execution – processes, sessions and individual task executions.
Package execution contains the core entities representing workflow execution – processes, sessions and individual task executions.
expander
Package expander converts concise workflow definitions that use the "matrix" shortcut notation into an explicit representation understood by the executor.
Package expander converts concise workflow definitions that use the "matrix" shortcut notation into an explicit representation understood by the executor.
service
allocator
Package allocator owns the execution queue and is the only service allowed to mutate `Process` instances according to the project guidelines.
Package allocator owns the execution queue and is the only service allowed to mutate `Process` instances according to the project guidelines.
approval
Package approval implements the optional human-in-the-loop approval layer.
Package approval implements the optional human-in-the-loop approval layer.
dao
event
Package event defines Fluxor's publish-subscribe mechanism that allows observers to react to execution lifecycle events.
Package event defines Fluxor's publish-subscribe mechanism that allows observers to react to execution lifecycle events.
executor
Package executor defines the interface that bridges tasks enqueued by the processor with the backing implementation of actions.
Package executor defines the interface that bridges tasks enqueued by the processor with the backing implementation of actions.
messaging
Package messaging contains queue implementations used to decouple the allocator, processor and approval services.
Package messaging contains queue implementations used to decouple the allocator, processor and approval services.
processor
Package processor hosts the workers that execute individual task executions.
Package processor hosts the workers that execute individual task executions.
Package tracing integrates observability back-ends with the Fluxor engine to provide distributed tracing information.
Package tracing integrates observability back-ends with the Fluxor engine to provide distributed tracing information.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL