compose

package
v0.0.0-...-0f749ce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	START = "start"
	END   = "end"
)

Graph constants

View Source
const (
	ComponentOfUnknown     component = "Unknown"
	ComponentOfGraph       component = "Graph"
	ComponentOfWorkflow    component = "Workflow"
	ComponentOfChain       component = "Chain"
	ComponentOfPassthrough component = "Passthrough"
	ComponentOfToolsNode   component = "ToolsNode"
	ComponentOfLambda      component = "Lambda"
	ComponentOfActivity    component = "TemporalActivity"
	ComponentOfSubGraph    component = "SubGraph"
)

Variables

View Source
var DAGInvalidLoopErr = errors.New("DAG is invalid, has loop")
View Source
var ErrGraphCompiled = errors.New("graph has been compiled, cannot be modified")

Error definitions

Functions

This section is empty.

Types

type AnyGraph

type AnyGraph interface {
	// contains filtered or unexported methods
}

type FieldMapping

type FieldMapping struct {
	// contains filtered or unexported fields
}

func FromField

func FromField(from string) *FieldMapping

FromField creates a FieldMapping that maps a single predecessor field to the entire successor input. This is an exclusive mapping - once set, no other field mappings can be added since the successor input has already been fully mapped. Field: either the field of a struct, or the key of a map.

func FromFieldPath

func FromFieldPath(fromFieldPath FieldPath) *FieldMapping

FromFieldPath creates a FieldMapping that maps a single predecessor field path to the entire successor input. This is an exclusive mapping - once set, no other field mappings can be added since the successor input has already been fully mapped.

Example:

// Maps the 'name' field from nested 'user.profile' to the entire successor input
FromFieldPath(FieldPath{"user", "profile", "name"})

Note: The field path elements must not contain the internal path separator character ('\x1F').

func MapFieldPaths

func MapFieldPaths(fromFieldPath, toFieldPath FieldPath) *FieldMapping

MapFieldPaths creates a FieldMapping that maps a single predecessor field path to a single successor field path.

Example:

// Maps user.profile.name to response.userName
MapFieldPaths(
    FieldPath{"user", "profile", "name"},
    FieldPath{"response", "userName"},
)

Note: The field path elements must not contain the internal path separator character ('\x1F').

func MapFields

func MapFields(from, to string) *FieldMapping

MapFields creates a FieldMapping that maps a single predecessor field to a single successor field. Field: either the field of a struct, or the key of a map.

func ToField

func ToField(to string, opts ...FieldMappingOption) *FieldMapping

ToField creates a FieldMapping that maps the entire predecessor output to a single successor field. Field: either the field of a struct, or the key of a map.

func ToFieldPath

func ToFieldPath(toFieldPath FieldPath, opts ...FieldMappingOption) *FieldMapping

ToFieldPath creates a FieldMapping that maps the entire predecessor output to a single successor field path.

Example:

// Maps the entire predecessor output to response.data.userName
ToFieldPath(FieldPath{"response", "data", "userName"})

Note: The field path elements must not contain the internal path separator character ('\x1F').

func (*FieldMapping) Equals

func (m *FieldMapping) Equals(o *FieldMapping) bool

func (*FieldMapping) FromNodeKey

func (m *FieldMapping) FromNodeKey() string

func (*FieldMapping) FromPath

func (m *FieldMapping) FromPath() FieldPath

func (*FieldMapping) String

func (m *FieldMapping) String() string

String returns the string representation of the FieldMapping.

func (*FieldMapping) ToPath

func (m *FieldMapping) ToPath() FieldPath

type FieldMappingOption

type FieldMappingOption func(*FieldMapping)

FieldMappingOption is a functional option for configuring a FieldMapping.

func WithCustomExtractor

func WithCustomExtractor(extractor func(input any) (any, error)) FieldMappingOption

WithCustomExtractor sets a custom extractor function for the FieldMapping. The extractor function is used to extract a value from the 'source' of the FieldMapping. NOTE: if specified in this way, Eino can only check the validity of the field mapping at request time..

type FieldPath

type FieldPath []string

FieldPath represents a path to a nested field in a struct or map. Each element in the path is either: - a struct field name - a map key

Example paths:

  • []string{"user"} // top-level field
  • []string{"user", "name"} // nested struct field
  • []string{"users", "admin"} // map key access

type Graph

type Graph[I, O any] struct {
	// contains filtered or unexported fields
}

Graph is the generic wrapper for the graph, following eino's pattern

func NewGraph

func NewGraph[I, O any]() *Graph[I, O]

NewGraph creates a new temporal-based graph following eino's pattern

func (*Graph[I, O]) AddBranch

func (g *Graph[I, O]) AddBranch(startNode string, branch *GraphBranch) error

AddBranch adds a branch to the typed graph

func (Graph) AddEdge

func (g Graph) AddEdge(startNode, endNode string) error

AddEdge adds an edge to the temporal graph, following eino's addEdgeWithMappings logic

func (*Graph[I, O]) AddGraphNode

func (g *Graph[I, O]) AddGraphNode(key string, subGraph AnyGraph, opts ...GraphAddNodeOpt) error

AddGraphNode adds a sub-graph to the typed graph

func (Graph) AddNode

func (g Graph) AddNode(key string, activity any, input any, acopt workflow.ActivityOptions, opts ...GraphAddNodeOpt) (err error)

AddNode adds an activity node to the graph, similar to eino's addNode

func (*Graph[I, O]) Compile

func (g *Graph[I, O]) Compile(ctx workflow.Context, opts ...GraphCompileOption) (Runnable[I, O], error)

Compile compiles the temporal graph into a runnable executor, following eino's compile logic

type GraphAddNodeOpt

type GraphAddNodeOpt func(o *graphAddNodeOpts)

func WithGraphCompileOptions

func WithGraphCompileOptions(opts ...GraphCompileOption) GraphAddNodeOpt

WithGraphCompileOptions sets compile options for subgraph nodes

func WithInputKey

func WithInputKey(key string) GraphAddNodeOpt

WithInputKey sets the input key for the node

func WithNodeName

func WithNodeName(name string) GraphAddNodeOpt

WithNodeName sets the display name for the node

func WithOutputKey

func WithOutputKey(key string) GraphAddNodeOpt

WithOutputKey sets the output key for the node

type GraphBranch

type GraphBranch struct {
	// contains filtered or unexported fields
}

GraphBranch represents a conditional branch in the temporal graph

func NewGraphBranch

func NewGraphBranch[T any](condition GraphBranchCondition[T], endNodes map[string]bool) *GraphBranch

NewGraphBranch creates a new graph branch following eino's pattern. It is used to determine the next node based on the condition. e.g.

condition := func(ctx workflow.Context, in string) (string, error) {
	// logic to determine the next node
	return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewGraphBranch(condition, endNodes)

graph.AddBranch("key_of_node_before_branch", branch)

func NewGraphMultiBranch

func NewGraphMultiBranch[T any](condition GraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch

NewGraphMultiBranch creates a new graph branch that can route to multiple end nodes following eino's pattern.

type GraphBranchCondition

type GraphBranchCondition[T any] func(ctx workflow.Context, in T) (endNode string, err error)

GraphBranchCondition is the condition type for the branch.

type GraphCompileOption

type GraphCompileOption func(*graphCompileOptions)

GraphCompileOption options for compiling AnyGraph.

func WithGraphName

func WithGraphName(graphName string) GraphCompileOption

WithGraphName sets a name for the graph. The name is used for debugging and logging purposes. If not set, a default name will be used.

type GraphInfo

type GraphInfo struct {
	CompileOptions        []GraphCompileOption
	Nodes                 map[string]GraphNodeInfo // node key -> node info
	Edges                 map[string][]string      // edge start node key -> edge end node key, control edges
	DataEdges             map[string][]string
	Branches              map[string][]GraphBranch // branch start node key -> branch
	InputType, OutputType reflect.Type
	Name                  string
}

GraphInfo the info which end users pass in when they are compiling a graph. it is used in compile callback for user to get the node info and instance. you may need all details info of the graph for observation.

type GraphMultiBranchCondition

type GraphMultiBranchCondition[T any] func(ctx workflow.Context, in T) (endNode map[string]bool, err error)

GraphMultiBranchCondition is the condition type for the multi choice branch.

type GraphNodeInfo

type GraphNodeInfo struct {
	Component             component
	Instance              any
	GraphAddNodeOpts      []GraphAddNodeOpt
	InputType, OutputType reflect.Type // mainly for lambda, whose input and output types cannot be inferred by component type
	Name                  string
	InputKey, OutputKey   string
	GraphInfo             *GraphInfo
	Mappings              []*FieldMapping
}

type GraphRunError

type GraphRunError struct {
	// contains filtered or unexported fields
}

GraphRunError wraps errors that occur during a temporal graph run. This mirrors eino's GraphRunError behavior to provide a consistent surface.

func (*GraphRunError) Error

func (e *GraphRunError) Error() string

func (*GraphRunError) Unwrap

func (e *GraphRunError) Unwrap() error

type Invoke

type Invoke[I, O any] func(ctx workflow.Context, input I) (output O, err error)

Invoke is the type of the invokable lambda function for temporal activities

type Runnable

type Runnable[I, O any] interface {
	Invoke(ctx workflow.Context, input I) (output O, err error)
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents a temporal activity task (similar to eino's task)

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager manages temporal activity execution (similar to eino's taskManager)

type Workflow

type Workflow[I, O any] struct {
	// contains filtered or unexported fields
}

Workflow is wrapper of graph, replacing AddEdge with declaring dependencies and field mappings between nodes. Under the hood it uses NodeTriggerMode(AllPredecessor), so does not support cycles.

func NewWorkflow

func NewWorkflow[I, O any]() *Workflow[I, O]

NewWorkflow creates a new Workflow.

func (*Workflow[I, O]) AddBranch

func (wf *Workflow[I, O]) AddBranch(fromNodeKey string, branch *GraphBranch) *WorkflowBranch

AddBranch adds a branch to the workflow. End nodes must define their own field mappings.

func (*Workflow[I, O]) AddEnd deprecated

func (wf *Workflow[I, O]) AddEnd(fromNodeKey string, inputs ...*FieldMapping) *Workflow[I, O]

Deprecated: use *Workflow[I,O].End() to obtain a WorkflowNode instance for END, then work with it just like a normal WorkflowNode.

func (*Workflow[I, O]) AddGraphNode

func (wf *Workflow[I, O]) AddGraphNode(key string, subGraph AnyGraph, opts ...GraphAddNodeOpt) *WorkflowNode

AddGraphNode adds a sub-graph node.

func (*Workflow[I, O]) AddNode

func (wf *Workflow[I, O]) AddNode(key string, activity any, input any, acopt workflow.ActivityOptions, opts ...GraphAddNodeOpt) *WorkflowNode

AddNode adds a temporal activity node.

func (*Workflow[I, O]) Compile

func (wf *Workflow[I, O]) Compile(ctx workflow.Context, opts ...GraphCompileOption) (Runnable[I, O], error)

func (*Workflow[I, O]) End

func (wf *Workflow[I, O]) End() *WorkflowNode

End returns the WorkflowNode representing END node.

func (*Workflow[I, O]) Start

func (wf *Workflow[I, O]) Start() *WorkflowNode

Start returns the WorkflowNode representing START node.

type WorkflowAddInputOpt

type WorkflowAddInputOpt func(*workflowAddInputOpts)

func WithNoDirectDependency

func WithNoDirectDependency() WorkflowAddInputOpt

WithNoDirectDependency creates a data mapping without establishing a direct execution dependency.

type WorkflowBranch

type WorkflowBranch struct {
	*GraphBranch
	// contains filtered or unexported fields
}

WorkflowBranch wraps GraphBranch with the from-node context.

type WorkflowNode

type WorkflowNode struct {
	// contains filtered or unexported fields
}

WorkflowNode is the node of the Workflow.

func (*WorkflowNode) AddDependency

func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode

AddDependency creates an execution-only dependency between nodes.

func (*WorkflowNode) AddInput

func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode

AddInput creates both data and execution dependencies between nodes. It configures how data flows from the predecessor node (fromNodeKey) to the current node, and ensures the current node only executes after the predecessor completes.

func (*WorkflowNode) AddInputWithOptions

func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode

AddInputWithOptions creates a dependency between nodes with custom configuration options.

func (*WorkflowNode) SetStaticValue

func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode

SetStaticValue sets a static value for a field path that will be merged into node input at runtime.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL