orchid

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

README

Orchid: Dynamic Dataflow Orchestration

Go Report Card GoDoc

Orchid is a lightweight Go framework for orchestrating data-driven workflows. It combines concepts from Flow-Based Programming (FBP) and workflow engines to provide a simple, fault-tolerant solution for managing data flows and task execution within applications.

Inspired by tools like Uber Cadence and Temporal.io, Orchid offers a minimalistic approach to workflow orchestration without the complexity and heavy dependencies often found in other solutions. It was created out of a lack of a simple executor such as temporalite, but is no longer maintained. Other solutions are often complex with many dependencies.

Orchid is designed with the following principles in mind:

  • Simplicity: Designed to be easy to understand and use, with minimal boilerplate. Orchid's core is around 1k lines of code.
  • Data Passing: Facilitates data passing between nodes using byte arrays, aligning with flow-based programming paradigms.
  • Dynamic Routing: Supports dynamic routing based on data and error conditions, enabling flexible workflow logic.
  • Hybrid Execution: Allows both synchronous and asynchronous task execution.
  • Modular: Encourages modular code by encapsulating functionality within independent nodes.
  • Retry Policies: Allows to define custom retry policies for each task.
  • Context Propagation: Utilizes Go's context package to pass metadata and cancellation signals throughout workflow execution.
  • Graph-Based Workflows: Workflows are directed graphs, making complex processes easier to visualize.
  • Fault Tolerance: Provides optional state persistence and workflow recovery.
  • JSON Workflow Definitions: Supports a human-readable DSL, a basic JSON format for defining workflows, in addition to code-based definitions.

Use Cases:

  • Data Pipelines: Orchestrate data processing tasks, including data ingestion, transformation, and analysis.
  • Automation Workflows: Manage automated tasks and interactions with machine or human agents.
  • Service Orchestration: Coordinate interactions between application services and manage distributed API workflows.
  • Human-in-the-Loop Processes: Handle workflows that require human intervention or approval.
  • Agent-Based Models: Manage the interactions and behaviors of agents in simulations.

Connections to Related Concepts:

  • Workflow Engines: Orchid shares foundational similarities with other DAG-based workflow engines like Airflow and Prefect but focuses on simplicity and ease of use and does allow for loops for retries. It provides a streamlined configuration format, error-based routing, and supports both local and remote execution.
  • Flow-Based Programming: The serial execution and data passing between nodes resemble flow-based programming paradigms, but Orchid offers additional features like triggers, callbacks, and conditional branching for more complex orchestration.
  • Agent-Based Modeling (ABM): Orchid's ability to represent dependencies and trigger workflows based on events aligns well with the interaction patterns of agent-based models.
  • Large language model (LLM) agents: The flexibility to pass data and handle conditional branching can be applied in orchestrating tasks involving language models.

Trade-offs, Constraints, and Considerations

Orchid is a lot simpler than other workflow engines, designed to be easy to understand and a low number of lines of code. This simplicity comes with trade-offs and constraints that users should be aware of:

  • Fixed Activity Interface: For simple fault tolerance, all activities must adhere to a simple interface with an input and output byte array. This design choice simplifies the implementation but may require additional logic for complex data types client side.
  • Parallelism via Merge Points: Orchid focuses primarily on serial workflow execution, but parallelism can be achieved through merge points where multiple branches converge into a single node. This implies that every workflow ends with a single node, which can be a merge point where parallel branches converge.
  • Idempotency and Error Handling: Orchid ensures that workflows and activities have unique execution IDs to prevent duplicate processing. All activities are executed at-least-once, so they MUST BE idempotent to ensure that retries do not cause unintended side effects. Pass the ActivityToken to external systems to ensure idempotency and fault tolerance.
  • Atomicity and Side Effects: Activities SHALL perform atomic operations, non-atomicity (like partial database updates) may leave the system in an inconsistent state if they fail midway.
  • Avoid shared mutable state in Activities: Activities SHOULD NOT share mutable state between them, as this can lead to race conditions and data corruption. Use the context package to pass data between activities, atomic datastores or synchronization primitives for shared state.
  • Execution and Recovery Trade-off: If an optional persister is provided, Orchid can recover the state of workflows after failures.
    • Note that passing objects in NodeConfig DOES NOT Work for Restoration. NodeConfig is included in the serialized form of the workflow when exporting or persisting the workflow. The NodeConfig must be serializable into JSON along with the rest of the workflow data.
    • Further to note, the NodeConfig is currently stored in plaintext JSON in the persister.
  • Distributed Execution: Clients can utilize the Activity interface to implement an executor pattern for remote execution of tasks. Each activity executes with a stable ActivityToken that can be used to register callbacks for human-in-the-loop scenarios. The token will be restored in failure cases to ensure the workflow can continue.

Usage

Install the package:

go get github.com/kyodo-tech/orchid

Define a Workflow: Create a new workflow and add nodes (tasks) and edges (data routes) to define the processing steps and how data moves between them.

wf := orchid.NewWorkflow("example_workflow")
wf.AddNode(orchid.NewNode("task1"))
wf.AddNode(orchid.NewNode("task2"))
wf.Link("task1", "task2")

Or use the fluent API for linear use cases:

wf := orchid.NewWorkflow("example_workflow").
    AddNode(orchid.NewNode("task1")).
    Then(orchid.NewNode("task2"))

Implement an Activity: Define the logic for each task in the workflow.

func task1Handler(ctx context.Context, input []byte) ([]byte, error) {
    // Task logic here
    return output, nil
}

Execute the Workflow: Initialize the orchestrator, register task handlers, and execute the workflow.

o := orchid.NewOrchestrator()
o.RegisterActivity("task1", task1Handler)
o.RegisterActivity("task2", task2Handler)
o.LoadWorkflow(wf)
o.RunTriggers(context.Background(), nil)

Context based logging if slog is used and the orchestrator is set up with it:

logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

o := orchid.NewOrchestrator(
    orchid.WithLogger(logger),
)

Which is available in activities:

orchid.Logger(ctx).Error("Error executing task", "task", taskName, "error", err)

Persistence and Fault Tolerance

Orchid supports fault tolerance through state persistence and recovery mechanisms. By providing an optional persister, Orchid can recover the state of workflows after failures. The persistence layer MUST accurately reflect the state of the workflow to avoid incorrect restoration. Cyclic dependencies, disconnected graphs, or malformed workflows CAN cause the restoration to fail or behave unpredictably. Well-formed workflows recover at the last non-parallel node, replay successful node executions until the failure point, and retry the failed nodes.

To enable persistence, create a persister instance and pass it to the orchestrator:

persister, err := persistence.NewSQLitePersister("orchid.db")
if err != nil {
    // Handle error
}
defer persister.DB.Close()

o := orchid.NewOrchestrator(orchid.WithPersistence(persister))

We can also pass a default retry policy for all nodes if desired:

o := orchid.NewOrchestratorWithWorkflow(wf,
    orchid.WithDefaultRetryPolicy(&orchid.RetryPolicy{
        MaxRetries:         3,
        InitInterval:       2 * time.Second,
        MaxInterval:        30 * time.Second,
        BackoffCoefficient: 2.0,
    }),
    // ... other options ...
)

Middleware Support

Middlewares allow wrapping activities with additional functionality, such as logging, error handling, or custom serialization. We can apply cross-cutting concerns consistently across activities without modifying their core logic.

A middleware is a function that takes an Activity and returns a new Activity. This allows us to compose additional behavior.

type Middleware func(Activity) Activity

To apply a middleware, register it with the orchestrator before registering activities:

o := orchid.NewOrchestrator(
    orchid.WithLogger(logger),
    // Other options...
)

// Register global middlewares
o.Use(middleware.Logging)
// ...

// Register activities after adding middlewares
o.RegisterActivity("ActivityA", activityA)
o.RegisterActivity("ActivityB", activityB)

For example, a logging middleware can be defined as follows:

func SomeMiddleware(activity orchid.Activity) orchid.Activity {
	return func(ctx context.Context, input []byte) ([]byte, error) {
        // pre process
		output, outErr := activity(ctx, input)
        // post process
		return output, outErr
	}
}

Typed Activities

Activities use []byte for inputs and outputs to facilitate easy serialization and deserialization, which is important for simple state persistence and recovery. To work with custom types, you can use the TypedActivity helper, which handles JSON marshaling and unmarshaling. Define a custom, JSON serializable struct for your data:

type flow struct {
    Data   []byte
    Rating int
}

Then, use TypedActivity to wrap your activity functions:

func someTypedActivity(ctx context.Context, input *flow) (*flow, error) {
    fmt.Println("fnA input:", input)
    return &flow{
        Data:   []byte("A"),
        Rating: 60,
    }, nil
}

Register your activities using TypedActivity:

o.RegisterActivity("A", orchid.TypedActivity(fnA))

See the ./examples directory for different usage scenarios.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNodeNotFound                 = fmt.Errorf("node not found")
	ErrWorkflowNodeAlreadyExists    = fmt.Errorf("node already exists")
	ErrWorkflowInvalid              = fmt.Errorf("invalid, it has no trigger or no starting node")
	ErrWorkflowHasNoTriggers        = fmt.Errorf("no triggers found")
	ErrWorkflowHasNoNodes           = fmt.Errorf("workflow has no nodes")
	ErrOrchestratorActivityNotFound = fmt.Errorf("activity not found")
	ErrOrchestratorHasNoPersister   = fmt.Errorf("no persister set")
	ErrNoWorkflowID                 = fmt.Errorf("no execution ID set")
)

Functions

func ActivityName

func ActivityName(ctx context.Context) string

func ActivityToken

func ActivityToken(ctx context.Context) string

func Config

func Config(ctx context.Context, key string) (interface{}, bool)

func ConfigString

func ConfigString(ctx context.Context, key string) (string, bool)

func IsNonRestorable

func IsNonRestorable(ctx context.Context) bool

func Logger

func Logger(ctx context.Context) *slog.Logger

func RouteTo

func RouteTo(nodeKey string) error

func WithNonRestorable

func WithNonRestorable(ctx context.Context) context.Context

func WithWorkflowID

func WithWorkflowID(ctx context.Context, id string) context.Context

func WorkflowID

func WorkflowID(ctx context.Context) string

Types

type Activity

type Activity func(ctx context.Context, input []byte) (output []byte, err error)

func AsyncExecutor

func AsyncExecutor(ctx context.Context) (Activity, bool)

func SyncExecutor

func SyncExecutor(ctx context.Context) (Activity, bool)

func TypedActivity

func TypedActivity[T any](activity func(ctx context.Context, input T) (T, error)) Activity

type DynamicRoute

type DynamicRoute struct {
	Key string `json:"key"`
	Err error  `json:"error"`
}

func (*DynamicRoute) Error

func (e *DynamicRoute) Error() string

func (*DynamicRoute) MarshalJSON

func (e *DynamicRoute) MarshalJSON() ([]byte, error)

func (*DynamicRoute) UnmarshalJSON

func (e *DynamicRoute) UnmarshalJSON(data []byte) error

func (*DynamicRoute) Unwrap

func (e *DynamicRoute) Unwrap() error

type Edge

type Edge struct {
	From string `json:"from"`
	To   string `json:"to"`
}

type Middleware

type Middleware func(Activity) Activity

type Node

type Node struct {
	ID           int64                  `json:"id"`
	ActivityName string                 `json:"activity"`
	Config       map[string]interface{} `json:"config,omitempty"`
	Type         NodeType               `json:"type,omitempty"`
	RetryPolicy  *RetryPolicy           `json:"retry,omitempty"`
	EditLink     *string                `json:"link,omitempty"`
}

func NewNode

func NewNode(activity string, options ...NodeOption) *Node

func (*Node) ActivityStartTime

func (n *Node) ActivityStartTime(ctx context.Context) time.Time

func (*Node) IsRetryableError

func (n *Node) IsRetryableError(err error) bool

func (*Node) IsTrigger

func (n *Node) IsTrigger() bool

func (*Node) RetryAttempts

func (n *Node) RetryAttempts(ctx context.Context) int

func (*Node) RetryPolicyOrDefault

func (n *Node) RetryPolicyOrDefault(d *RetryPolicy) *RetryPolicy

type NodeOption

type NodeOption func(*Node)

func WithNodeConfig

func WithNodeConfig(config map[string]interface{}) NodeOption
func WithNodeEditLink(link string) NodeOption

func WithNodeID

func WithNodeID(id int64) NodeOption

func WithNodeRetryPolicy

func WithNodeRetryPolicy(policy *RetryPolicy) NodeOption

func WithNodeType

func WithNodeType(t NodeType) NodeOption

type NodeType

type NodeType string
const (
	Trigger NodeType = "trigger"
	Action  NodeType = "action"
)

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

func NewOrchestrator

func NewOrchestrator(options ...OrchestratorOption) *Orchestrator

func NewOrchestratorWithWorkflow added in v0.2.0

func NewOrchestratorWithWorkflow(w *Workflow, options ...OrchestratorOption) *Orchestrator

func (*Orchestrator) GetActivity

func (o *Orchestrator) GetActivity(node *Node) (Activity, bool)

func (*Orchestrator) GetNode

func (o *Orchestrator) GetNode(graphNode graph.Node) (*Node, bool)

func (*Orchestrator) GetReducer

func (o *Orchestrator) GetReducer(node *Node) (Reducer, bool)

func (*Orchestrator) LoadWorkflow

func (o *Orchestrator) LoadWorkflow(w *Workflow)

func (*Orchestrator) RegisterActivity

func (o *Orchestrator) RegisterActivity(name string, activity Activity)

func (*Orchestrator) RegisterReducer

func (o *Orchestrator) RegisterReducer(name string, reducer Reducer)

func (*Orchestrator) RestorableWorkflows

func (o *Orchestrator) RestorableWorkflows(ctx context.Context) (RestorableWorkflows, error)

func (*Orchestrator) RestoreWorkflowsAsync

func (o *Orchestrator) RestoreWorkflowsAsync(ctx context.Context) error

func (*Orchestrator) RunTriggers

func (o *Orchestrator) RunTriggers(ctx context.Context, input []byte) ([]byte, error)

func (*Orchestrator) Start

func (o *Orchestrator) Start(ctx context.Context, data []byte) (output []byte, err error)

func (*Orchestrator) StartAsync

func (o *Orchestrator) StartAsync(ctx context.Context, data []byte) (output []byte, err error)

func (*Orchestrator) Use

func (o *Orchestrator) Use(mw Middleware)

type OrchestratorOption

type OrchestratorOption func(*Orchestrator)

func WithDefaultRetryPolicy added in v0.2.0

func WithDefaultRetryPolicy(policy *RetryPolicy) OrchestratorOption

func WithLogger

func WithLogger(logger *slog.Logger) OrchestratorOption

func WithPersistence

func WithPersistence(persister persistence.Persister) OrchestratorOption

type Reducer

type Reducer func([][]byte) []byte

type RestorableWorkflows

type RestorableWorkflows map[string]map[string]func(ctx context.Context) ([]byte, error)

RestorableWorkflows returns a map of workflowName to a map of workflowID to a an array of node functions that allow the client to restore them.

type RetryPolicy

type RetryPolicy struct {
	MaxRetries               int
	InitInterval             time.Duration
	MaxInterval              time.Duration
	BackoffCoefficient       float64
	NonRetriableErrorReasons []string
}

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

Default retry policy

type Workflow

type Workflow struct {
	Name  string           `json:"name"`
	Nodes map[string]*Node `json:"nodes"`
	Edges []*Edge          `json:"edges"`
	// contains filtered or unexported fields
}

func NewWorkflow

func NewWorkflow(name string) *Workflow

func (*Workflow) AddEdge

func (wf *Workflow) AddEdge(edge *Edge) error

func (*Workflow) AddNode

func (wf *Workflow) AddNode(node *Node) *Workflow

AddNode adds a new task to the workflow.

func (*Workflow) AddNodes

func (wf *Workflow) AddNodes(nodes ...*Node) *Workflow

func (*Workflow) Export

func (wf *Workflow) Export() ([]byte, error)

func (*Workflow) ExportDot

func (wf *Workflow) ExportDot(indent string, optionalChildWorkflows map[string]*Workflow) []byte

func (*Workflow) ExportDotToFile

func (wf *Workflow) ExportDotToFile(filename string, optionalChildWorkflows map[string]*Workflow) error

func (*Workflow) ExportMermaid

func (wf *Workflow) ExportMermaid(indent string, optionalChildWorkflows map[string]*Workflow) []byte

ExportMermaid generates the Mermaid representation of the workflow.

func (*Workflow) ExportMermaidHTML added in v0.3.0

func (wf *Workflow) ExportMermaidHTML(indent string, optionalChildWorkflows map[string]*Workflow) ([]byte, error)

func (*Workflow) ExportMermaidToFile

func (wf *Workflow) ExportMermaidToFile(filename string, optionalChildWorkflows map[string]*Workflow) error

func (*Workflow) Import

func (wf *Workflow) Import(workflow []byte) error
func (wf *Workflow) Link(from, to string) *Workflow

func (*Workflow) Then added in v0.2.0

func (wf *Workflow) Then(node *Node) *Workflow

Directories

Path Synopsis
examples
abm command
childworkflow command
cron command
fbp command
middlewares command
parallel command
webhook command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL