fluxor

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2025 License: Apache-2.0 Imports: 28 Imported by: 2

README

Fluxor: A Generic Workflow Engine

Fluxor is a powerful, Go-based workflow engine that allows you to define, execute, and manage complex workflows with custom actions and executors. This README provides an overview of Fluxor's architecture, features, and how to use it effectively.

Table of Contents

Overview

Fluxor is designed to provide a flexible, extensible workflow engine that can be integrated into any Go application. It supports:

  • Complex workflow definitions with nested tasks
  • Task dependencies and conditional execution
  • Custom actions and executors
  • State management and persistence
  • Pausable and resumable workflows
  • Dynamic task allocation

Key Features

  • Declarative Workflow Definitions: Define workflows in YAML or JSON
  • Task Dependencies: Specify dependencies between tasks to control execution order
  • Custom Actions: Create and register custom actions to extend the engine
  • State Management: Track workflow state and execution history
  • Concurrency Control: Control the level of parallelism in workflow execution
  • Extensible Architecture: Easily extend the engine with custom components

Architecture

Fluxor consists of the following main components:

  • Workflow Model: Defines the structure of workflows, tasks, and transitions
  • Process Engine: Manages the lifecycle of workflow processes
  • Task Executor: Executes individual tasks within a workflow
  • Task Allocator: Allocates tasks to execution workers
  • Persistence Layer: Stores workflow definitions and execution state
  • Extension System: Allows for custom actions and executors
Component Diagram
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Workflow      │    │      Task       │    │   Extension     │
│   Definition    │───▶│    Allocator    │───▶│     System      │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                              │                        │
                              ▼                        ▼
                       ┌─────────────────┐    ┌─────────────────┐
                       │     Process     │    │      Task       │
                       │     Engine      │◀───│     Executor    │
                       └─────────────────┘    └─────────────────┘
                              │
                              ▼
                       ┌─────────────────┐
                       │   Persistence   │
                       │      Layer      │
                       └─────────────────┘

Getting Started

Installation
go get github.com/viant/fluxor
Basic Usage
package main

import (
	"context"
	"fmt"
	"github.com/viant/fluxor"
	"time"
)

func main() {
	err := runIt()
	if err != nil {
		panic(err)
	}
}

func runIt() error {
	srv := fluxor.New()
	runtime := srv.Runtime()
	ctx := context.Background()
	workflow, err := runtime.LoadWorkflow(ctx, "parent.yaml")
	if err != nil {
		return err
	}
	_ = runtime.Start(ctx)
	process, wait, err := runtime.StartProcess(ctx, workflow, map[string]interface{}{})
	if err != nil {
		return err
	}
	fmt.Println("process:", process.ID)
	output, err := wait(ctx, time.Minute)
	if err != nil {
		return err
	}
	fmt.Printf("output: %+v\n", output)
	return nil
}

Defining Workflows

Fluxor workflows are defined in YAML or JSON. Here's a simple example:

init:
  i: 0

pipeline:
  start:
    action: printer:print
    input:
      message: 'Parent started'
  loop:
    inc:
      action: nop:nop
      post:
        i: ${i + 1}

    runChildren:
      action: workflow:run
      input:
        location: children
        context:
          iteration: $i
    body:
      action: printer:print
      input:
        message: 'Iteration: $i'
      goto:
        when: i < 3
        task: loop

  stop:
    action: printer:print
    input:
      message: 'Parent stoped'

Custom Actions

Custom actions allow you to extend Fluxor with your own functionality:

package myaction

import (
    "context"
    "github.com/viant/fluxor/model/types"
    "reflect"
)

type Input struct {
    Param1 string `json:"param1"`
    Param2 int    `json:"param2"`
}

type Output struct {
    Result string `json:"result"`
}

type Service struct{}

func (s *Service) Name() string {
    return "my/action"
}

func (s *Service) Methods() types.Signatures {
    return []types.Signature{
        {
            Name:   "execute",
            Input:  reflect.TypeOf(&Input{}),
            Output: reflect.TypeOf(&Output{}),
        },
    }
}

func (s *Service) Method(name string) (types.Executable, error) {
    if name == "execute" {
        return s.execute, nil
    }
    return nil, types.NewMethodNotFoundError(name)
}

func (s *Service) execute(ctx context.Context, in, out interface{}) error {
    input := in.(*Input)
    output := out.(*Output)
    
    // Implement your action logic
    output.Result = fmt.Sprintf("Processed %s with value %d", input.Param1, input.Param2)
    
    return nil
}

Register your custom action:

actions.Register(&myaction.Service{})

Executors

Executors handle the execution of tasks within workflows:

task:
  action: system/executor
  commands:
    - echo "Hello, World!"

The built-in system/executor action allows you to run shell commands.

Task Allocation

The task allocator is responsible for assigning tasks to execution workers and managing dependencies between tasks. It ensures tasks are executed in the correct order based on their dependencies.

alloc := allocator.New(
    processDAO,
    taskExecutionDAO,
    queue,
    allocator.DefaultConfig(),
)
go alloc.Start(ctx)

Advanced Features

Task Dependencies
taskB:
  dependsOn: taskA
  action: printer:print
  input:
    message: "Task B depends on Task A"
Parallel Execution
parallelTasks:
  tasks:
    - id: task1
      async: true
      action: printer:print
      input:
        message: "Running in parallel 1"
    
    - id: task2
      async: true
      action: printer:print
      input:
        message: "Running in parallel 2"
Template Tasks

Template tasks allow you to repeat a sub-task over each element of a collection. Use the template keyword with a selector and an inner task definition:

pipeline:
  processOrders:
    template:
      selector:
        - name: order
          value: "$orders"
      task:
        processOne:
          action:
            service: printer
            method: print
          input:
            message: "Order: $order"

In this example, processOrders will spawn one processOne task for each element in orders, binding the current element to $order in each task.

Contributing

Contributions to Fluxor are welcome! Please feel free to submit a Pull Request.

License

Fluxor is licensed under the LICENSE file in the root directory of this source tree.


© 2012-2023 Viant, inc. All rights reserved.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(s *Service)

Service represents fluxor service

func WithEventService

func WithEventService(service *event.Service) Option

func WithExtensionServices

func WithExtensionServices(services ...types.Service) Option

WithExtensionServices sets the extension services

func WithExtensionTypes

func WithExtensionTypes(types ...*x.Type) Option

WithExtensionTypes sets the extension types

func WithMetaBaseURL

func WithMetaBaseURL(url string) Option

WithMetaBaseURL sets the meta base URL

func WithMetaFsOptions

func WithMetaFsOptions(options ...storage.Option) Option

WithMetaFsOptions with meta file system options

func WithMetaService

func WithMetaService(service *meta.Service) Option

WithMetaService sets the meta service

func WithProcessDAO

func WithProcessDAO(dao dao.Service[string, execution.Process]) Option

WithProcessDAO sets the processor DAO

func WithProcessorWorkers

func WithProcessorWorkers(count int) Option

WithProcessorWorkers sets the processor workers

func WithQueue

func WithQueue(queue messaging.Queue[execution.Execution]) Option

WithQueue sets the message queue

func WithRootTaskNodeName

func WithRootTaskNodeName(name string) Option

WithRootTaskNodeName sets the root task node name

func WithTaskExecutionDAO

func WithTaskExecutionDAO(dao dao.Service[string, execution.Execution]) Option

WithTaskExecutionDAO sets the task execution DAO

func WithTracing

func WithTracing(serviceName, serviceVersion, outputFile string) Option

WithTracing configures OpenTelemetry tracing for the service. If outputFile is empty the stdout exporter is used; otherwise traces are written to the supplied file path. The function is safe to call multiple times – the first successful initialisation wins.

func WithTracingExporter

func WithTracingExporter(serviceName, serviceVersion string, exporter sdktrace.SpanExporter) Option

WithTracingExporter configures OpenTelemetry tracing using a custom SpanExporter. This enables integrations with exporters other than the built-in stdout exporter, for example OTLP, Jaeger or Zipkin. The function is safe to call multiple times – the first successful initialisation wins.

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime represents a workflow engine runtime

func (*Runtime) DecodeYAMLWorkflow

func (r *Runtime) DecodeYAMLWorkflow(data []byte) (*model.Workflow, error)

DecideYAMLWorkflow loads a workflow

func (*Runtime) Execution

func (r *Runtime) Execution(ctx context.Context, id string) (*execution.Execution, error)

Execution returns an execution

func (*Runtime) LoadWorkflow

func (r *Runtime) LoadWorkflow(ctx context.Context, location string) (*model.Workflow, error)

LoadWorkflow loads a workflow

func (*Runtime) Process

func (r *Runtime) Process(ctx context.Context, id string) (*execution.Process, error)

Process returns a process

func (*Runtime) Processes

func (r *Runtime) Processes(ctx context.Context, parameter ...*dao.Parameter) ([]*execution.Process, error)

Processes returns a list of processes

func (*Runtime) SaveExecution

func (r *Runtime) SaveExecution(ctx context.Context, anExecution *execution.Execution) error

Processes saves execution

func (*Runtime) Shutdown

func (r *Runtime) Shutdown(ctx context.Context) error

Shutdown shutdowns runtime

func (*Runtime) Start

func (r *Runtime) Start(ctx context.Context) error

Start starts runtime

func (*Runtime) StartProcess

func (r *Runtime) StartProcess(ctx context.Context, aWorkflow *model.Workflow, initialState map[string]interface{}, tasks ...string) (*execution.Process, execution.Wait, error)

StartProcess starts a new process

type Service

type Service struct {
	// contains filtered or unexported fields
}

func New

func New(options ...Option) *Service

func (*Service) Actions

func (s *Service) Actions() *extension.Actions

func (*Service) EventService

func (s *Service) EventService() *event.Service

EventService returns event service

func (*Service) NewContext

func (s *Service) NewContext(ctx context.Context) context.Context

func (*Service) RegisterExtensionServices

func (s *Service) RegisterExtensionServices(services ...types.Service)

func (*Service) RegisterExtensionType

func (s *Service) RegisterExtensionType(aType *x.Type)

func (*Service) RegisterExtensionTypes

func (s *Service) RegisterExtensionTypes(types ...*x.Type)

func (*Service) Runtime

func (s *Service) Runtime() *Runtime

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL