Documentation
¶
Index ¶
- Constants
- Variables
- func AppendAddressSegment(ctx context.Context, segType AddressSegmentType, segID string) context.Context
- func BatchResumeWithData(ctx context.Context, resumeData map[string]any) context.Context
- func CompositeInterrupt(ctx context.Context, info any, state any, errs ...error) error
- func GetInterruptState[T any](ctx context.Context) (wasInterrupted bool, hasState bool, state T)
- func GetResumeContext[T any](ctx context.Context) (isResumeFlow bool, hasData bool, data T)
- func GetToolCallID(ctx context.Context) string
- func InitGraphCompileCallbacks(cbs []GraphCompileCallback)
- func Interrupt(ctx context.Context, info any) error
- func IsInterruptRerunError(err error) (any, bool)
- func NewInterruptAndRerunErr(extra any) errordeprecated
- func ProcessState[S any](ctx context.Context, handler func(context.Context, S) error) error
- func RegisterSerializableType[T any](name string) (err error)
- func RegisterStreamChunkConcatFunc[T any](fn func([]T) (T, error))
- func RegisterValuesMergeFunc[T any](fn func([]T) (T, error))
- func Resume(ctx context.Context, interruptIDs ...string) context.Context
- func ResumeWithData(ctx context.Context, interruptID string, data any) context.Context
- func StatefulInterrupt(ctx context.Context, info any, state any) error
- func WithGraphInterrupt(parent context.Context) (ctx context.Context, interrupt func(opts ...GraphInterruptOption))
- func WrapInterruptAndRerunIfNeeded(ctx context.Context, step AddressSegment, err error) error
- type Address
- type AddressSegment
- type AddressSegmentType
- type AnyGraph
- type Chain
- func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O]
- func (c *Chain[I, O]) AppendChatModel(node model.BaseChatModel, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendDocumentTransformer(node document.Transformer, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendEmbedding(node embedding.Embedder, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendGraph(node AnyGraph, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendIndexer(node indexer.Indexer, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendLoader(node document.Loader, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendParallel(p *Parallel) *Chain[I, O]
- func (c *Chain[I, O]) AppendPassthrough(opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendRetriever(node retriever.Retriever, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendToolsNode(node *ToolsNode, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
- type ChainBranch
- func NewChainBranch[T any](cond GraphBranchCondition[T]) *ChainBranch
- func NewChainMultiBranch[T any](cond GraphMultiBranchCondition[T]) *ChainBranch
- func NewStreamChainBranch[T any](cond StreamGraphBranchCondition[T]) *ChainBranch
- func NewStreamChainMultiBranch[T any](cond StreamGraphMultiBranchCondition[T]) *ChainBranch
- func (cb *ChainBranch) AddChatModel(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddChatTemplate(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddDocumentTransformer(key string, node document.Transformer, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddEmbedding(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddGraph(key string, node AnyGraph, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddIndexer(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddLambda(key string, node *Lambda, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddLoader(key string, node document.Loader, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddPassthrough(key string, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddRetriever(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) *ChainBranch
- type CheckPointStore
- type Collect
- type CollectWOOpt
- type FanInMergeConfig
- type FieldMapping
- func FromField(from string) *FieldMapping
- func FromFieldPath(fromFieldPath FieldPath) *FieldMapping
- func MapFieldPaths(fromFieldPath, toFieldPath FieldPath) *FieldMapping
- func MapFields(from, to string) *FieldMapping
- func ToField(to string, opts ...FieldMappingOption) *FieldMapping
- func ToFieldPath(toFieldPath FieldPath, opts ...FieldMappingOption) *FieldMapping
- type FieldMappingOption
- type FieldPath
- type GenLocalState
- type Graph
- func (g Graph) AddBranch(startNode string, branch *GraphBranch) (err error)
- func (g Graph) AddChatModelNode(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) error
- func (g Graph) AddChatTemplateNode(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) error
- func (g Graph) AddDocumentTransformerNode(key string, node document.Transformer, opts ...GraphAddNodeOpt) error
- func (g *Graph[I, O]) AddEdge(startNode, endNode string) (err error)
- func (g Graph) AddEmbeddingNode(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) error
- func (g Graph) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error
- func (g Graph) AddIndexerNode(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) error
- func (g Graph) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error
- func (g Graph) AddLoaderNode(key string, node document.Loader, opts ...GraphAddNodeOpt) error
- func (g Graph) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) error
- func (g Graph) AddRetrieverNode(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) error
- func (g Graph) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) error
- func (g *Graph[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
- func (g Graph) GetType() string
- type GraphAddNodeOpt
- func WithGraphCompileOptions(opts ...GraphCompileOption) GraphAddNodeOpt
- func WithInputKey(k string) GraphAddNodeOpt
- func WithNodeKey(key string) GraphAddNodeOpt
- func WithNodeName(n string) GraphAddNodeOpt
- func WithOutputKey(k string) GraphAddNodeOpt
- func WithStatePostHandler[O, S any](post StatePostHandler[O, S]) GraphAddNodeOpt
- func WithStatePreHandler[I, S any](pre StatePreHandler[I, S]) GraphAddNodeOpt
- func WithStreamStatePostHandler[O, S any](post StreamStatePostHandler[O, S]) GraphAddNodeOpt
- func WithStreamStatePreHandler[I, S any](pre StreamStatePreHandler[I, S]) GraphAddNodeOpt
- type GraphBranch
- func NewGraphBranch[T any](condition GraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
- func NewGraphMultiBranch[T any](condition GraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch
- func NewStreamGraphBranch[T any](condition StreamGraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
- func NewStreamGraphMultiBranch[T any](condition StreamGraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch
- type GraphBranchCondition
- type GraphCompileCallback
- type GraphCompileOption
- func WithCheckPointStore(store CheckPointStore) GraphCompileOption
- func WithEagerExecution() GraphCompileOption
- func WithEagerExecutionDisabled() GraphCompileOption
- func WithFanInMergeConfig(confs map[string]FanInMergeConfig) GraphCompileOption
- func WithGraphCompileCallbacks(cbs ...GraphCompileCallback) GraphCompileOption
- func WithGraphName(graphName string) GraphCompileOption
- func WithInterruptAfterNodes(nodes []string) GraphCompileOption
- func WithInterruptBeforeNodes(nodes []string) GraphCompileOption
- func WithMaxRunSteps(maxSteps int) GraphCompileOption
- func WithNodeTriggerMode(triggerMode NodeTriggerMode) GraphCompileOption
- func WithSerializer(serializer Serializer) GraphCompileOption
- type GraphInfo
- type GraphInterruptOption
- type GraphMultiBranchCondition
- type GraphNodeInfo
- type InterruptCtx
- type InterruptInfo
- type InvokableToolEndpoint
- type InvokableToolMiddleware
- type Invoke
- type InvokeWOOpt
- type Lambda
- func AnyLambda[I, O, TOption any](i Invoke[I, O, TOption], s Stream[I, O, TOption], c Collect[I, O, TOption], ...) (*Lambda, error)
- func CollectableLambda[I, O any](c CollectWOOpt[I, O], opts ...LambdaOpt) *Lambda
- func CollectableLambdaWithOption[I, O, TOption any](c Collect[I, O, TOption], opts ...LambdaOpt) *Lambda
- func InvokableLambda[I, O any](i InvokeWOOpt[I, O], opts ...LambdaOpt) *Lambda
- func InvokableLambdaWithOption[I, O, TOption any](i Invoke[I, O, TOption], opts ...LambdaOpt) *Lambda
- func MessageParser[T any](p schema.MessageParser[T], opts ...LambdaOpt) *Lambda
- func StreamableLambda[I, O any](s StreamWOOpt[I, O], opts ...LambdaOpt) *Lambda
- func StreamableLambdaWithOption[I, O, TOption any](s Stream[I, O, TOption], opts ...LambdaOpt) *Lambda
- func ToList[I any](opts ...LambdaOpt) *Lambda
- func TransformableLambda[I, O any](t TransformWOOpts[I, O], opts ...LambdaOpt) *Lambda
- func TransformableLambdaWithOption[I, O, TOption any](t Transform[I, O, TOption], opts ...LambdaOpt) *Lambda
- type LambdaOpt
- type NewGraphOption
- type NodePath
- type NodeTriggerMode
- type Option
- func WithCallbacks(cbs ...callbacks.Handler) Option
- func WithChatModelOption(opts ...model.Option) Option
- func WithChatTemplateOption(opts ...prompt.Option) Option
- func WithCheckPointID(checkPointID string) Option
- func WithDocumentTransformerOption(opts ...document.TransformerOption) Option
- func WithEmbeddingOption(opts ...embedding.Option) Option
- func WithForceNewRun() Option
- func WithIndexerOption(opts ...indexer.Option) Option
- func WithLambdaOption(opts ...any) Option
- func WithLoaderOption(opts ...document.LoaderOption) Option
- func WithRetrieverOption(opts ...retriever.Option) Option
- func WithRuntimeMaxSteps(maxSteps int) Option
- func WithStateModifier(sm StateModifier) Option
- func WithToolsNodeOption(opts ...ToolsNodeOption) Option
- func WithWriteToCheckPointID(checkPointID string) Option
- type Parallel
- func (p *Parallel) AddChatModel(outputKey string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddChatTemplate(outputKey string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddDocumentTransformer(outputKey string, node document.Transformer, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddEmbedding(outputKey string, node embedding.Embedder, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddGraph(outputKey string, node AnyGraph, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddIndexer(outputKey string, node indexer.Indexer, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddLambda(outputKey string, node *Lambda, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddLoader(outputKey string, node document.Loader, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddPassthrough(outputKey string, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddRetriever(outputKey string, node retriever.Retriever, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddToolsNode(outputKey string, node *ToolsNode, opts ...GraphAddNodeOpt) *Parallel
- type Runnable
- type Serializer
- type StateModifier
- type StatePostHandler
- type StatePreHandler
- type Stream
- type StreamGraphBranchCondition
- type StreamGraphMultiBranchCondition
- type StreamStatePostHandler
- type StreamStatePreHandler
- type StreamToolOutput
- type StreamWOOpt
- type StreamableToolEndpoint
- type StreamableToolMiddleware
- type ToolInput
- type ToolMiddleware
- type ToolOutput
- type ToolsInterruptAndRerunExtra
- type ToolsNode
- func (tn *ToolsNode) GetType() string
- func (tn *ToolsNode) Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error)
- func (tn *ToolsNode) Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)
- type ToolsNodeConfig
- type ToolsNodeOption
- type Transform
- type TransformWOOpts
- type Workflow
- func (wf *Workflow[I, O]) AddBranch(fromNodeKey string, branch *GraphBranch) *WorkflowBranch
- func (wf *Workflow[I, O]) AddChatModelNode(key string, chatModel model.BaseChatModel, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddChatTemplateNode(key string, chatTemplate prompt.ChatTemplate, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddDocumentTransformerNode(key string, transformer document.Transformer, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddEmbeddingNode(key string, embedding embedding.Embedder, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddEnd(fromNodeKey string, inputs ...*FieldMapping) *Workflow[I, O]deprecated
- func (wf *Workflow[I, O]) AddGraphNode(key string, graph AnyGraph, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddIndexerNode(key string, indexer indexer.Indexer, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddLambdaNode(key string, lambda *Lambda, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddLoaderNode(key string, loader document.Loader, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddRetrieverNode(key string, retriever retriever.Retriever, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddToolsNode(key string, tools *ToolsNode, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
- func (wf *Workflow[I, O]) End() *WorkflowNode
- type WorkflowAddInputOpt
- type WorkflowBranch
- type WorkflowNode
- func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode
- func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode
- func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode
- func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode
Constants ¶
const ( ComponentOfUnknown component = "Unknown" ComponentOfGraph component = "Graph" ComponentOfWorkflow component = "Workflow" ComponentOfChain component = "Chain" ComponentOfPassthrough component = "Passthrough" ComponentOfToolsNode component = "ToolsNode" ComponentOfLambda component = "Lambda" )
built-in component types in graph node. it represents the type of the most primitive executable object provided by the user.
const END = "end"
END is the end node of the graph. You can add your last edge with END.
const START = "start"
START is the start node of the graph. You can add your first edge with START.
Variables ¶
var DAGInvalidLoopErr = errors.New("DAG is invalid, has loop")
var ErrChainCompiled = errors.New("chain has been compiled, cannot be modified")
ErrChainCompiled is returned when attempting to modify a chain after it has been compiled
var ErrExceedMaxSteps = errors.New("exceeds max steps")
ErrExceedMaxSteps graph will throw this error when the number of steps exceeds the maximum number of steps.
var ErrGraphCompiled = errors.New("graph has been compiled, cannot be modified")
ErrGraphCompiled is returned when attempting to modify a graph after it has been compiled
var InterruptAndRerun = deprecatedInterruptAndRerun
Deprecated: use Interrupt(ctx context.Context, info any) error instead. If you really needs to use this error as a sub-error for a CompositeInterrupt call, wrap it using WrapInterruptAndRerunIfNeeded first.
Functions ¶
func AppendAddressSegment ¶ added in v0.7.0
func AppendAddressSegment(ctx context.Context, segType AddressSegmentType, segID string) context.Context
AppendAddressSegment creates a new execution context for a sub-component (e.g., a graph node or a tool call).
It extends the current context's address with a new segment and populates the new context with the appropriate interrupt state and resume data for that specific sub-address.
- ctx: The parent context, typically the one passed into the component's Invoke/Stream method.
- segType: The type of the new address segment (e.g., "node", "tool").
- segID: The unique ID for the new address segment.
func BatchResumeWithData ¶ added in v0.7.0
BatchResumeWithData is the core function for preparing a resume context. It injects a map of resume targets and their corresponding data into the context.
The `resumeData` map should contain the interrupt IDs (which are the string form of addresses) of the components to be resumed as keys. The value can be the resume data for that component, or `nil` if no data is needed (equivalent to using `Resume`).
This function is the foundation for the "Explicit Targeted Resume" strategy. Components whose interrupt IDs are present as keys in the map will receive `isResumeFlow = true` when they call `GetResumeContext`.
func CompositeInterrupt ¶ added in v0.7.0
CompositeInterrupt creates a special error that signals a composite interruption. It is designed for "composite" nodes (like ToolsNode) that manage multiple, independent, interruptible sub-processes. It bundles multiple sub-interrupt errors into a single error that the engine can deconstruct into a flat list of resumable points.
This function is robust and can handle several types of errors from sub-processes:
A `Interrupt` or `StatefulInterrupt` error from a simple component.
A nested `CompositeInterrupt` error from another composite component.
An error containing `InterruptInfo` returned by a `Runnable` (e.g., a Graph within a lambda node).
An error returned by \'WrapInterruptAndRerunIfNeeded\' for the legacy old interrupt and rerun error, and for the error returned by the deprecated old interrupt errors.
Parameters:
ctx: The context of the running composite node.
info: User-facing information for the composite node itself. Can be nil. This info will be attached to InterruptInfo.RerunNodeExtra. Provided mainly for compatibility purpose as the composite node itself is not an interrupt point with interrupt ID, which means it lacks enough reason to give a user-facing info.
state: The state for the composite node itself. Can be nil. This could be useful when the composite node needs to restore state, such as its input (e.g. ToolsNode).
errs: a list of errors emitted by sub-processes.
NOTE: if the error you passed in is the deprecated old interrupt and rerun err, or an error returned by the deprecated old interrupt function, you must wrap it using WrapInterruptAndRerunIfNeeded first before passing them into this function.
func GetInterruptState ¶ added in v0.7.0
GetInterruptState provides a type-safe way to check for and retrieve the persisted state from a previous interruption. It is the primary function a component should use to understand its past state.
It returns three values:
- wasInterrupted (bool): True if the node was part of a previous interruption, regardless of whether state was provided.
- state (T): The typed state object, if it was provided and matches type `T`.
- hasState (bool): True if state was provided during the original interrupt and successfully cast to type `T`.
func GetResumeContext ¶ added in v0.7.0
GetResumeContext checks if the current component is the target of a resume operation and retrieves any data provided by the user for that resumption.
This function is typically called *after* a component has already determined it is in a resumed state by calling GetInterruptState.
It returns three values:
- isResumeFlow: A boolean that is true if the current component's address was explicitly targeted by a call to Resume() or ResumeWithData().
- hasData: A boolean that is true if data was provided for this component (i.e., not nil).
- data: The typed data provided by the user.
### How to Use This Function: A Decision Framework
The correct usage pattern depends on the application's desired resume strategy.
#### Strategy 1: Implicit "Resume All" In some use cases, any resume operation implies that *all* interrupted points should proceed. For example, if an application's UI only provides a single "Continue" button for a set of interruptions. In this model, a component can often just use `GetInterruptState` to see if `wasInterrupted` is true and then proceed with its logic, as it can assume it is an intended target. It may still call `GetResumeContext` to check for optional data, but the `isResumeFlow` flag is less critical.
#### Strategy 2: Explicit "Targeted Resume" (Most Common) For applications with multiple, distinct interrupt points that must be resumed independently, it is crucial to differentiate which point is being resumed. This is the primary use case for the `isResumeFlow` flag.
- If `isResumeFlow` is `true`: Your component is the explicit target. You should consume the `data` (if any) and complete your work.
- If `isResumeFlow` is `false`: Another component is the target. You MUST re-interrupt (e.g., by returning `StatefulInterrupt(...)`) to preserve your state and allow the resume signal to propagate.
### Guidance for Composite Components
Composite components (like `Graph` or other `Runnable`s that contain sub-processes) have a dual role:
- Check for Self-Targeting: A composite component can itself be the target of a resume operation, for instance, to modify its internal state. It may call `GetResumeContext` to check for data targeted at its own address.
- Act as a Conduit: After checking for itself, its primary role is to re-execute its children, allowing the resume context to flow down to them. It must not consume a resume signal intended for one of its descendants.
func GetToolCallID ¶ added in v0.3.18
GetToolCallID gets the current tool call id from the context.
func InitGraphCompileCallbacks ¶
func InitGraphCompileCallbacks(cbs []GraphCompileCallback)
InitGraphCompileCallbacks set global graph compile callbacks, which ONLY will be added to top level graph compile options
func Interrupt ¶ added in v0.7.0
Interrupt creates a special error that signals the execution engine to interrupt the current run at the component's specific address and save a checkpoint.
This is the standard way for a single, non-composite component to signal a resumable interruption.
- ctx: The context of the running component, used to retrieve the current execution address.
- info: User-facing information about the interrupt. This is not persisted but is exposed to the calling application via the InterruptCtx to provide context (e.g., a reason for the pause).
func IsInterruptRerunError ¶ added in v0.3.38
func NewInterruptAndRerunErr
deprecated
added in
v0.3.38
func ProcessState ¶ added in v0.3.10
ProcessState processes the state from the context in a concurrency-safe way. This is the recommended way to access and modify state in custom nodes. The provided function handler will be executed with exclusive access to the state (protected by mutex).
State Lookup Behavior: - If the requested state type exists in the current graph, it will be returned - If not found in current graph, ProcessState will search in parent graph states (for nested graphs) - This enables nested graphs to access state from their parent graphs - Follows lexical scoping: inner state of the same type shadows outer state
Concurrency Safety: - ProcessState automatically locks the mutex of the state being accessed (current or parent level) - Each state level has its own mutex, allowing concurrent access to different levels - The lock is held for the entire duration of the handler function
Note: This method will report an error if the state type doesn't match or state is not found in the context chain.
Example - Basic usage in a single graph:
lambdaFunc := func(ctx context.Context, in string, opts ...any) (string, error) {
err := compose.ProcessState[*MyState](ctx, func(ctx context.Context, state *MyState) error {
// Safely modify state
state.Count++
return nil
})
if err != nil {
return "", err
}
return in, nil
}
Example - Nested graph accessing parent state:
// In an inner graph node
innerNode := func(ctx context.Context, input string) (string, error) {
// Access parent graph's state
err := compose.ProcessState[*OuterState](ctx, func(ctx context.Context, s *OuterState) error {
s.Counter++ // Safely modify parent state
return nil
})
if err != nil {
return "", err
}
// Also access inner graph's own state
err = compose.ProcessState[*InnerState](ctx, func(ctx context.Context, s *InnerState) error {
s.Data = "processed"
return nil
})
return input, nil
}
func RegisterSerializableType ¶ added in v0.3.18
RegisterSerializableType registers a custom type for eino serialization. This allows eino to properly serialize and deserialize custom types. Both custom interfaces and structs need to be registered using this function. Types only need to be registered once - pointers and other references will be handled automatically. All built-in eino types are already registered. Parameters: - name: A unique identifier for the type being registered (should not start with "_eino") - T: The generic type parameter representing the type to register Returns: - error: An error if registration fails (e.g., if the type is already registered) Deprecated: RegisterSerializableType is deprecated. Use schema.RegisterName[T](name) instead.
func RegisterStreamChunkConcatFunc ¶
RegisterStreamChunkConcatFunc registers a function to concat stream chunks. It's required when you want to concat stream chunks of a specific type. for example you call Invoke() but node only implements Stream(). call at process init not thread safe eg.
type testStruct struct {
field1 string
field2 int
}
compose.RegisterStreamChunkConcatFunc(func(items []testStruct) (testStruct, error) {
return testStruct{
field1: items[1].field1, // may implement inplace logic by your scenario
field2: items[0].field2 + items[1].field2,
}, nil
})
func RegisterValuesMergeFunc ¶ added in v0.3.26
RegisterValuesMergeFunc registers a function to merge outputs from multiple nodes when fan-in. It's used to define how to merge for a specific type. For maps that already have a default merge function, you don't need to register a new one unless you want to customize the merge logic.
func Resume ¶ added in v0.7.0
Resume prepares a context for an "Explicit Targeted Resume" operation by targeting one or more components without providing data. It is a convenience wrapper around BatchResumeWithData.
This is useful when the act of resuming is itself the signal, and no extra data is needed. The components at the provided addresses (interrupt IDs) will receive `isResumeFlow = true` when they call `GetResumeContext`.
func ResumeWithData ¶ added in v0.7.0
ResumeWithData prepares a context to resume a single, specific component with data. It is the primary function for the "Explicit Targeted Resume" strategy when data is required. It is a convenience wrapper around BatchResumeWithData. The `interruptID` parameter is the unique interrupt ID of the target component.
func StatefulInterrupt ¶ added in v0.7.0
StatefulInterrupt creates a special error that signals the execution engine to interrupt the current run at the component's specific address and save a checkpoint.
This is the standard way for a single, non-composite component to signal a resumable interruption.
- ctx: The context of the running component, used to retrieve the current execution address.
- info: User-facing information about the interrupt. This is not persisted but is exposed to the calling application via the InterruptCtx to provide context (e.g., a reason for the pause).
- state: The internal state that the interrupting component needs to persist to be able to resume its work later. This state is saved in the checkpoint and will be provided back to the component upon resumption via GetInterruptState.
func WithGraphInterrupt ¶ added in v0.5.4
func WithGraphInterrupt(parent context.Context) (ctx context.Context, interrupt func(opts ...GraphInterruptOption))
WithGraphInterrupt creates a context with graph cancellation support. When the returned context is used to invoke a graph or workflow, calling the interrupt function will trigger an interrupt. The graph will wait for current tasks to complete by default.
func WrapInterruptAndRerunIfNeeded ¶ added in v0.7.0
func WrapInterruptAndRerunIfNeeded(ctx context.Context, step AddressSegment, err error) error
WrapInterruptAndRerunIfNeeded wraps the deprecated old interrupt errors, with the current execution address. If the error is returned by either Interrupt, StatefulInterrupt or CompositeInterrupt, it will be returned as-is without wrapping
Types ¶
type Address ¶ added in v0.7.0
Address represents a full, hierarchical address to a point in the execution structure.
func GetCurrentAddress ¶ added in v0.7.0
GetCurrentAddress returns the hierarchical address of the currently executing component. The address is a sequence of segments, each identifying a structural part of the execution like an agent, a graph node, or a tool call. This can be useful for logging or debugging.
type AddressSegment ¶ added in v0.7.0
type AddressSegment = core.AddressSegment
AddressSegment represents a single segment in the hierarchical address of an execution point. A sequence of AddressSegments uniquely identifies a location within a potentially nested structure.
type AddressSegmentType ¶ added in v0.7.0
type AddressSegmentType = core.AddressSegmentType
AddressSegmentType defines the type of a segment in an execution address.
const ( // AddressSegmentNode represents a segment of an address that corresponds to a graph node. AddressSegmentNode AddressSegmentType = "node" // AddressSegmentTool represents a segment of an address that corresponds to a specific tool call within a ToolsNode. AddressSegmentTool AddressSegmentType = "tool" // AddressSegmentRunnable represents a segment of an address that corresponds to an instance of the Runnable interface. // Currently the possible Runnable types are: Graph, Workflow and Chain. // Note that for sub-graphs added through AddGraphNode to another graph is not a Runnable. // So a AddressSegmentRunnable indicates a standalone Root level Graph, // or a Root level Graph inside a node such as Lambda node. AddressSegmentRunnable AddressSegmentType = "runnable" )
type AnyGraph ¶
type AnyGraph interface {
// contains filtered or unexported methods
}
AnyGraph the identifiers for composable and compilable Graph[I, O]、Chain[I, O] in Eino.
type Chain ¶
type Chain[I, O any] struct { // contains filtered or unexported fields }
Chain is a chain of components. Chain nodes can be parallel / branch / sequence components. Chain is designed to be used in a builder pattern (should Compile() before use). And the interface is `Chain style`, you can use it like: `chain.AppendXX(...).AppendXX(...)`
Normal usage:
- create a chain with input/output type: `chain := NewChain[inputType, outputType]()`
- add components to chainable list: 2.1 add components: `chain.AppendChatTemplate(...).AppendChatModel(...).AppendToolsNode(...)` 2.2 add parallel or branch node if needed: `chain.AppendParallel()`, `chain.AppendBranch()`
- compile: `r, err := c.Compile()`
- run: 4.1 `one input & one output` use `r.Invoke(ctx, input)` 4.2 `one input & multi output chunk` use `r.Stream(ctx, input)` 4.3 `multi input chunk & one output` use `r.Collect(ctx, inputReader)` 4.4 `multi input chunk & multi output chunk` use `r.Transform(ctx, inputReader)`
Using in graph or other chain: chain1 := NewChain[inputType, outputType]() graph := NewGraph[](runTypePregel) graph.AddGraph("key", chain1) // chain is an AnyGraph implementation
// or in another chain: chain2 := NewChain[inputType, outputType]() chain2.AppendGraph(chain1)
func NewChain ¶
func NewChain[I, O any](opts ...NewGraphOption) *Chain[I, O]
NewChain create a chain with input/output type.
func (*Chain[I, O]) AppendBranch ¶
func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O]
AppendBranch add a conditional branch to chain. Each branch within the ChainBranch can be an AnyGraph. All branches should either lead to END, or converge to another node within the Chain. e.g.
cb := compose.NewChainBranch(conditionFunc)
cb.AddChatTemplate("chat_template_key_01", chatTemplate)
cb.AddChatTemplate("chat_template_key_02", chatTemplate2)
chain.AppendBranch(cb)
func (*Chain[I, O]) AppendChatModel ¶
func (c *Chain[I, O]) AppendChatModel(node model.BaseChatModel, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendChatModel add a ChatModel node to the chain. e.g.
model, err := openai.NewChatModel(ctx, config)
if err != nil {...}
chain.AppendChatModel(model)
func (*Chain[I, O]) AppendChatTemplate ¶
func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendChatTemplate add a ChatTemplate node to the chain. eg.
chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{
Role: schema.System,
Content: "You are acting as a {role}.",
})
chain.AppendChatTemplate(chatTemplate)
func (*Chain[I, O]) AppendDocumentTransformer ¶
func (c *Chain[I, O]) AppendDocumentTransformer(node document.Transformer, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendDocumentTransformer add a DocumentTransformer node to the chain. e.g.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})
chain.AppendDocumentTransformer(markdownSplitter)
func (*Chain[I, O]) AppendEmbedding ¶
func (c *Chain[I, O]) AppendEmbedding(node embedding.Embedder, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendEmbedding add a Embedding node to the chain. e.g.
embedder, err := openai.NewEmbedder(ctx, config)
if err != nil {...}
chain.AppendEmbedding(embedder)
func (*Chain[I, O]) AppendGraph ¶
func (c *Chain[I, O]) AppendGraph(node AnyGraph, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendGraph add a AnyGraph node to the chain. AnyGraph can be a chain or a graph. e.g.
graph := compose.NewGraph[string, string]() chain.AppendGraph(graph)
func (*Chain[I, O]) AppendIndexer ¶
func (c *Chain[I, O]) AppendIndexer(node indexer.Indexer, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendIndexer add an Indexer node to the chain. Indexer is a node that can store documents. e.g.
vectorStoreImpl, err := vikingdb.NewVectorStorer(ctx, vikingdbConfig) // in components/vectorstore/vikingdb/vectorstore.go
if err != nil {...}
config := vectorstore.IndexerConfig{VectorStore: vectorStoreImpl}
indexer, err := vectorstore.NewIndexer(ctx, config)
if err != nil {...}
chain.AppendIndexer(indexer)
func (*Chain[I, O]) AppendLambda ¶
func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendLambda add a Lambda node to the chain. Lambda is a node that can be used to implement custom logic. e.g.
lambdaNode := compose.InvokableLambda(func(ctx context.Context, docs []*schema.Document) (string, error) {...})
chain.AppendLambda(lambdaNode)
Note: to create a Lambda node, you need to use `compose.AnyLambda` or `compose.InvokableLambda` or `compose.StreamableLambda` or `compose.TransformableLambda`. if you want this node has real stream output, you need to use `compose.StreamableLambda` or `compose.TransformableLambda`, for example.
func (*Chain[I, O]) AppendLoader ¶
func (c *Chain[I, O]) AppendLoader(node document.Loader, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendLoader adds a Loader node to the chain. e.g.
loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{})
if err != nil {...}
chain.AppendLoader(loader)
func (*Chain[I, O]) AppendParallel ¶
AppendParallel add a Parallel structure (multiple concurrent nodes) to the chain. e.g.
parallel := compose.NewParallel()
parallel.AddChatModel("openai", model1) // => "openai": *schema.Message{}
parallel.AddChatModel("maas", model2) // => "maas": *schema.Message{}
chain.AppendParallel(parallel) // => multiple concurrent nodes are added to the Chain
The next node in the chain is either an END, or a node which accepts a map[string]any, where keys are `openai` `maas` as specified above.
func (*Chain[I, O]) AppendPassthrough ¶
func (c *Chain[I, O]) AppendPassthrough(opts ...GraphAddNodeOpt) *Chain[I, O]
AppendPassthrough add a Passthrough node to the chain. Could be used to connect multiple ChainBranch or Parallel. e.g.
chain.AppendPassthrough()
func (*Chain[I, O]) AppendRetriever ¶
func (c *Chain[I, O]) AppendRetriever(node retriever.Retriever, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendRetriever add a Retriever node to the chain. e.g.
retriever, err := vectorstore.NewRetriever(ctx, config)
if err != nil {...}
chain.AppendRetriever(retriever)
or using fornax knowledge as retriever:
config := fornaxknowledge.Config{...}
retriever, err := fornaxknowledge.NewKnowledgeRetriever(ctx, config)
if err != nil {...}
chain.AppendRetriever(retriever)
func (*Chain[I, O]) AppendToolsNode ¶
func (c *Chain[I, O]) AppendToolsNode(node *ToolsNode, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendToolsNode add a ToolsNode node to the chain. e.g.
toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{
Tools: []tools.Tool{...},
})
chain.AppendToolsNode(toolsNode)
func (*Chain[I, O]) Compile ¶
func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
Compile to a Runnable. Runnable can be used directly. e.g.
chain := NewChain[string, string]()
r, err := chain.Compile()
if err != nil {}
r.Invoke(ctx, input) // ping => pong
r.Stream(ctx, input) // ping => stream out
r.Collect(ctx, inputReader) // stream in => pong
r.Transform(ctx, inputReader) // stream in => stream out
type ChainBranch ¶
type ChainBranch struct {
// contains filtered or unexported fields
}
ChainBranch represents a conditional branch in a chain of operations. It allows for dynamic routing of execution based on a condition. All branches within ChainBranch are expected to either end the Chain, or converge to another node in the Chain.
func NewChainBranch ¶
func NewChainBranch[T any](cond GraphBranchCondition[T]) *ChainBranch
NewChainBranch creates a new ChainBranch instance based on a given condition. It takes a generic type T and a GraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.
condition := func(ctx context.Context, in string, opts ...any) (endNode string, err error) {
// logic to determine the next node
return "some_next_node_key", nil
}
cb := NewChainBranch[string](condition)
cb.AddPassthrough("next_node_key_01", xxx) // node in branch, represent one path of branch
cb.AddPassthrough("next_node_key_02", xxx) // node in branch
func NewChainMultiBranch ¶ added in v0.3.18
func NewChainMultiBranch[T any](cond GraphMultiBranchCondition[T]) *ChainBranch
func NewStreamChainBranch ¶
func NewStreamChainBranch[T any](cond StreamGraphBranchCondition[T]) *ChainBranch
NewStreamChainBranch creates a new ChainBranch instance based on a given stream condition. It takes a generic type T and a StreamGraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.
condition := func(ctx context.Context, in *schema.StreamReader[string], opts ...any) (endNode string, err error) {
// logic to determine the next node, you can read the stream and make a decision.
// to save time, usually read the first chunk of stream, then make a decision which path to go.
return "some_next_node_key", nil
}
cb := NewStreamChainBranch[string](condition)
func NewStreamChainMultiBranch ¶ added in v0.3.18
func NewStreamChainMultiBranch[T any](cond StreamGraphMultiBranchCondition[T]) *ChainBranch
func (*ChainBranch) AddChatModel ¶
func (cb *ChainBranch) AddChatModel(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *ChainBranch
AddChatModel adds a ChatModel node to the branch. eg.
chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
Model: "gpt-4o",
})
chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
Model: "gpt-4o-mini",
})
cb.AddChatModel("chat_model_key_01", chatModel01)
cb.AddChatModel("chat_model_key_02", chatModel02)
func (*ChainBranch) AddChatTemplate ¶
func (cb *ChainBranch) AddChatTemplate(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *ChainBranch
AddChatTemplate adds a ChatTemplate node to the branch. eg.
chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{
Role: schema.System,
Content: "You are acting as a {role}.",
})
cb.AddChatTemplate("chat_template_key_01", chatTemplate)
chatTemplate2, err := prompt.FromMessages(schema.FString, &schema.Message{
Role: schema.System,
Content: "You are acting as a {role}, you are not allowed to chat in other topics.",
})
cb.AddChatTemplate("chat_template_key_02", chatTemplate2)
func (*ChainBranch) AddDocumentTransformer ¶
func (cb *ChainBranch) AddDocumentTransformer(key string, node document.Transformer, opts ...GraphAddNodeOpt) *ChainBranch
AddDocumentTransformer adds an Document Transformer node to the branch. eg.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})
cb.AddDocumentTransformer("document_transformer_node_key", markdownSplitter)
func (*ChainBranch) AddEmbedding ¶
func (cb *ChainBranch) AddEmbedding(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) *ChainBranch
AddEmbedding adds an Embedding node to the branch. eg.
embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{
Model: "text-embedding-3-small",
})
cb.AddEmbedding("embedding_node_key", embeddingNode)
func (*ChainBranch) AddGraph ¶
func (cb *ChainBranch) AddGraph(key string, node AnyGraph, opts ...GraphAddNodeOpt) *ChainBranch
AddGraph adds a generic Graph node to the branch. eg.
graph, err := compose.NewGraph[string, string]()
cb.AddGraph("graph_node_key", graph)
func (*ChainBranch) AddIndexer ¶
func (cb *ChainBranch) AddIndexer(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) *ChainBranch
AddIndexer adds an Indexer node to the branch. eg.
indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{
Collection: "my_collection",
})
cb.AddIndexer("indexer_node_key", indexer)
func (*ChainBranch) AddLambda ¶
func (cb *ChainBranch) AddLambda(key string, node *Lambda, opts ...GraphAddNodeOpt) *ChainBranch
AddLambda adds a Lambda node to the branch. eg.
lambdaFunc := func(ctx context.Context, in string, opts ...any) (out string, err error) {
// logic to process the input
return "processed_output", nil
}
cb.AddLambda("lambda_node_key", compose.InvokeLambda(lambdaFunc))
func (*ChainBranch) AddLoader ¶
func (cb *ChainBranch) AddLoader(key string, node document.Loader, opts ...GraphAddNodeOpt) *ChainBranch
AddLoader adds a Loader node to the branch. eg.
pdfParser, err := pdf.NewPDFParser()
loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{
Parser: pdfParser,
})
cb.AddLoader("loader_node_key", loader)
func (*ChainBranch) AddPassthrough ¶
func (cb *ChainBranch) AddPassthrough(key string, opts ...GraphAddNodeOpt) *ChainBranch
AddPassthrough adds a Passthrough node to the branch. eg.
cb.AddPassthrough("passthrough_node_key")
func (*ChainBranch) AddRetriever ¶
func (cb *ChainBranch) AddRetriever(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) *ChainBranch
AddRetriever adds a Retriever node to the branch. eg.
retriever, err := volc_vikingdb.NewRetriever(ctx, &volc_vikingdb.RetrieverConfig{
Collection: "my_collection",
})
cb.AddRetriever("retriever_node_key", retriever)
func (*ChainBranch) AddToolsNode ¶
func (cb *ChainBranch) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) *ChainBranch
AddToolsNode adds a ToolsNode to the branch. eg.
toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{
Tools: []tools.Tool{...},
})
cb.AddToolsNode("tools_node_key", toolsNode)
type CheckPointStore ¶ added in v0.3.18
type CheckPointStore = core.CheckPointStore
type Collect ¶
type Collect[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output O, err error)
Collect is the type of the collectable lambda function.
type CollectWOOpt ¶
type CollectWOOpt[I, O any] func(ctx context.Context, input *schema.StreamReader[I]) (output O, err error)
CollectWOOpt is the type of the collectable lambda function without options.
type FanInMergeConfig ¶ added in v0.3.44
type FanInMergeConfig struct {
StreamMergeWithSourceEOF bool //indicates whether to emit a SourceEOF error for each stream
}
FanInMergeConfig defines the configuration for fan-in merge operations. It allows specifying how multiple inputs are merged into a single input. StreamMergeWithSourceEOF indicates whether to emit a SourceEOF error for each stream when it ends, before the final merged output is produced. This is useful for tracking the completion of individual input streams in a named stream merge.
type FieldMapping ¶ added in v0.3.8
type FieldMapping struct {
// contains filtered or unexported fields
}
func FromField ¶ added in v0.3.8
func FromField(from string) *FieldMapping
FromField creates a FieldMapping that maps a single predecessor field to the entire successor input. This is an exclusive mapping - once set, no other field mappings can be added since the successor input has already been fully mapped. Field: either the field of a struct, or the key of a map.
func FromFieldPath ¶ added in v0.3.17
func FromFieldPath(fromFieldPath FieldPath) *FieldMapping
FromFieldPath creates a FieldMapping that maps a single predecessor field path to the entire successor input. This is an exclusive mapping - once set, no other field mappings can be added since the successor input has already been fully mapped.
Example:
// Maps the 'name' field from nested 'user.profile' to the entire successor input
FromFieldPath(FieldPath{"user", "profile", "name"})
Note: The field path elements must not contain the internal path separator character ('\x1F').
func MapFieldPaths ¶ added in v0.3.17
func MapFieldPaths(fromFieldPath, toFieldPath FieldPath) *FieldMapping
MapFieldPaths creates a FieldMapping that maps a single predecessor field path to a single successor field path.
Example:
// Maps user.profile.name to response.userName
MapFieldPaths(
FieldPath{"user", "profile", "name"},
FieldPath{"response", "userName"},
)
Note: The field path elements must not contain the internal path separator character ('\x1F').
func MapFields ¶ added in v0.3.8
func MapFields(from, to string) *FieldMapping
MapFields creates a FieldMapping that maps a single predecessor field to a single successor field. Field: either the field of a struct, or the key of a map.
func ToField ¶ added in v0.3.8
func ToField(to string, opts ...FieldMappingOption) *FieldMapping
ToField creates a FieldMapping that maps the entire predecessor output to a single successor field. Field: either the field of a struct, or the key of a map.
func ToFieldPath ¶ added in v0.3.17
func ToFieldPath(toFieldPath FieldPath, opts ...FieldMappingOption) *FieldMapping
ToFieldPath creates a FieldMapping that maps the entire predecessor output to a single successor field path.
Example:
// Maps the entire predecessor output to response.data.userName
ToFieldPath(FieldPath{"response", "data", "userName"})
Note: The field path elements must not contain the internal path separator character ('\x1F').
func (*FieldMapping) Equals ¶ added in v0.3.48
func (m *FieldMapping) Equals(o *FieldMapping) bool
func (*FieldMapping) FromNodeKey ¶ added in v0.3.48
func (m *FieldMapping) FromNodeKey() string
func (*FieldMapping) FromPath ¶ added in v0.3.48
func (m *FieldMapping) FromPath() FieldPath
func (*FieldMapping) String ¶ added in v0.3.8
func (m *FieldMapping) String() string
String returns the string representation of the FieldMapping.
func (*FieldMapping) ToPath ¶ added in v0.3.48
func (m *FieldMapping) ToPath() FieldPath
type FieldMappingOption ¶ added in v0.3.48
type FieldMappingOption func(*FieldMapping)
FieldMappingOption is a functional option for configuring a FieldMapping.
func WithCustomExtractor ¶ added in v0.3.48
func WithCustomExtractor(extractor func(input any) (any, error)) FieldMappingOption
WithCustomExtractor sets a custom extractor function for the FieldMapping. The extractor function is used to extract a value from the 'source' of the FieldMapping. NOTE: if specified in this way, Eino can only check the validity of the field mapping at request time..
type FieldPath ¶ added in v0.3.17
type FieldPath []string
FieldPath represents a path to a nested field in a struct or map. Each element in the path is either: - a struct field name - a map key
Example paths:
- []string{"user"} // top-level field
- []string{"user", "name"} // nested struct field
- []string{"users", "admin"} // map key access
type GenLocalState ¶
GenLocalState is a function that generates the state.
type Graph ¶
type Graph[I, O any] struct { // contains filtered or unexported fields }
Graph is a generic graph that can be used to compose components. I: the input type of graph compiled product O: the output type of graph compiled product
func NewGraph ¶
func NewGraph[I, O any](opts ...NewGraphOption) *Graph[I, O]
NewGraph create a directed graph that can compose components, lambda, chain, parallel etc. simultaneously provide flexible and multi-granular aspect governance capabilities. I: the input type of graph compiled product O: the output type of graph compiled product
To share state between nodes, use WithGenLocalState option:
type testState struct {
UserInfo *UserInfo
KVs map[string]any
}
genStateFunc := func(ctx context.Context) *testState {
return &testState{}
}
graph := compose.NewGraph[string, string](WithGenLocalState(genStateFunc))
// you can use WithStatePreHandler and WithStatePostHandler to do something with state
graph.AddNode("node1", someNode, compose.WithPreHandler(func(ctx context.Context, in string, state *testState) (string, error) {
// do something with state
return in, nil
}), compose.WithPostHandler(func(ctx context.Context, out string, state *testState) (string, error) {
// do something with state
return out, nil
}))
func (Graph) AddBranch ¶
func (g Graph) AddBranch(startNode string, branch *GraphBranch) (err error)
AddBranch adds a branch to the graph. e.g.
condition := func(ctx context.Context, in string) (string, error) {
return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewGraphBranch(condition, endNodes)
graph.AddBranch("start_node_key", branch)
func (Graph) AddChatModelNode ¶
func (g Graph) AddChatModelNode(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) error
AddChatModelNode add node that implements model.BaseChatModel. e.g.
chatModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
Model: "gpt-4o",
})
graph.AddChatModelNode("chat_model_node_key", chatModel)
func (Graph) AddChatTemplateNode ¶
func (g Graph) AddChatTemplateNode(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) error
AddChatTemplateNode add node that implements prompt.ChatTemplate. e.g.
chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{
Role: schema.System,
Content: "You are acting as a {role}.",
})
graph.AddChatTemplateNode("chat_template_node_key", chatTemplate)
func (Graph) AddDocumentTransformerNode ¶
func (g Graph) AddDocumentTransformerNode(key string, node document.Transformer, opts ...GraphAddNodeOpt) error
AddDocumentTransformerNode adds a node that implements document.Transformer. e.g.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})
graph.AddDocumentTransformerNode("document_transformer_node_key", markdownSplitter)
func (*Graph[I, O]) AddEdge ¶
AddEdge adds an edge to the graph, edge means a data flow from startNode to endNode. the previous node's output type must be set to the next node's input type. NOTE: startNode and endNode must have been added to the graph before adding edge. e.g.
graph.AddNode("start_node_key", compose.NewPassthroughNode())
graph.AddNode("end_node_key", compose.NewPassthroughNode())
err := graph.AddEdge("start_node_key", "end_node_key")
func (Graph) AddEmbeddingNode ¶
func (g Graph) AddEmbeddingNode(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) error
AddEmbeddingNode adds a node that implements embedding.Embedder. e.g.
embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{
Model: "text-embedding-3-small",
})
graph.AddEmbeddingNode("embedding_node_key", embeddingNode)
func (Graph) AddGraphNode ¶
func (g Graph) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error
AddGraphNode add one kind of Graph[I, O]、Chain[I, O]、StateChain[I, O, S] as a node. for Graph[I, O], comes from NewGraph[I, O]() for Chain[I, O], comes from NewChain[I, O]()
func (Graph) AddIndexerNode ¶
func (g Graph) AddIndexerNode(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) error
AddIndexerNode adds a node that implements indexer.Indexer. e.g.
indexer, err := vikingdb.NewIndexer(ctx, &vikingdb.IndexerConfig{})
graph.AddIndexerNode("indexer_node_key", indexer)
func (Graph) AddLambdaNode ¶
func (g Graph) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error
AddLambdaNode add node that implements at least one of Invoke[I, O], Stream[I, O], Collect[I, O], Transform[I, O]. due to the lack of supporting method generics, we need to use function generics to generate Lambda run as Runnable[I, O]. for Invoke[I, O], use compose.InvokableLambda() for Stream[I, O], use compose.StreamableLambda() for Collect[I, O], use compose.CollectableLambda() for Transform[I, O], use compose.TransformableLambda() for arbitrary combinations of 4 kinds of lambda, use compose.AnyLambda()
func (Graph) AddLoaderNode ¶
func (g Graph) AddLoaderNode(key string, node document.Loader, opts ...GraphAddNodeOpt) error
AddLoaderNode adds a node that implements document.Loader. e.g.
loader, err := file.NewLoader(ctx, &file.LoaderConfig{})
graph.AddLoaderNode("loader_node_key", loader)
func (Graph) AddPassthroughNode ¶
func (g Graph) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) error
AddPassthroughNode adds a passthrough node to the graph. mostly used in pregel mode of graph. e.g.
graph.AddPassthroughNode("passthrough_node_key")
func (Graph) AddRetrieverNode ¶
func (g Graph) AddRetrieverNode(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) error
AddRetrieverNode adds a node that implements retriever.Retriever. e.g.
retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{})
graph.AddRetrieverNode("retriever_node_key", retrieverNode)
func (Graph) AddToolsNode ¶
func (g Graph) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) error
AddToolsNode adds a node that implements tools.ToolsNode. e.g.
toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{})
graph.AddToolsNode("tools_node_key", toolsNode)
func (*Graph[I, O]) Compile ¶
func (g *Graph[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
Compile take the raw graph and compile it into a form ready to be run. e.g.
graph, err := compose.NewGraph[string, string]()
if err != nil {...}
runnable, err := graph.Compile(ctx, compose.WithGraphName("my_graph"))
if err != nil {...}
runnable.Invoke(ctx, "input") // invoke
runnable.Stream(ctx, "input") // stream
runnable.Collect(ctx, inputReader) // collect
runnable.Transform(ctx, inputReader) // transform
type GraphAddNodeOpt ¶
type GraphAddNodeOpt func(o *graphAddNodeOpts)
GraphAddNodeOpt is a functional option type for adding a node to a graph. e.g.
graph.AddNode("node_name", node, compose.WithInputKey("input_key"), compose.WithOutputKey("output_key"))
func WithGraphCompileOptions ¶
func WithGraphCompileOptions(opts ...GraphCompileOption) GraphAddNodeOpt
WithGraphCompileOptions when the node is an AnyGraph, use this option to set compile option for the node. e.g.
graph.AddNode("node_name", node, compose.WithGraphCompileOptions(compose.WithGraphName("my_sub_graph")))
func WithInputKey ¶
func WithInputKey(k string) GraphAddNodeOpt
WithInputKey sets the input key of the node. this will change the input value of the node, for example, if the pre node's output is map[string]any{"key01": "value01"}, and the current node's input key is "key01", then the current node's input value will be "value01".
func WithNodeKey ¶
func WithNodeKey(key string) GraphAddNodeOpt
WithNodeKey set the node key, which is used to identify the node in the chain. only for use in Chain/StateChain.
func WithNodeName ¶
func WithNodeName(n string) GraphAddNodeOpt
WithNodeName sets the name of the node.
func WithOutputKey ¶
func WithOutputKey(k string) GraphAddNodeOpt
WithOutputKey sets the output key of the node. this will change the output value of the node, for example, if the current node's output key is "key01", then the node's output value will be map[string]any{"key01": value}.
func WithStatePostHandler ¶
func WithStatePostHandler[O, S any](post StatePostHandler[O, S]) GraphAddNodeOpt
WithStatePostHandler modify node's output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
func WithStatePreHandler ¶
func WithStatePreHandler[I, S any](pre StatePreHandler[I, S]) GraphAddNodeOpt
WithStatePreHandler modify node's input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
func WithStreamStatePostHandler ¶
func WithStreamStatePostHandler[O, S any](post StreamStatePostHandler[O, S]) GraphAddNodeOpt
WithStreamStatePostHandler modify node's streaming output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when current node's output is an actual stream, and you want the downstream node's input to remain an actual stream after state post handler. caution: while StreamStatePostHandler is thread safe, modifying state within your own goroutine is NOT. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
func WithStreamStatePreHandler ¶
func WithStreamStatePreHandler[I, S any](pre StreamStatePreHandler[I, S]) GraphAddNodeOpt
WithStreamStatePreHandler modify node's streaming input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when upstream node's output is an actual stream, and you want the current node's input to remain an actual stream after state pre handler. caution: while StreamStatePreHandler is thread safe, modifying state within your own goroutine is NOT. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
type GraphBranch ¶
type GraphBranch struct {
// contains filtered or unexported fields
}
GraphBranch is the branch type for the graph. It is used to determine the next node based on the condition.
func NewGraphBranch ¶
func NewGraphBranch[T any](condition GraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
NewGraphBranch creates a new graph branch. It is used to determine the next node based on the condition. e.g.
condition := func(ctx context.Context, in string) (string, error) {
// logic to determine the next node
return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewGraphBranch(condition, endNodes)
graph.AddBranch("key_of_node_before_branch", branch)
func NewGraphMultiBranch ¶ added in v0.3.18
func NewGraphMultiBranch[T any](condition GraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch
func NewStreamGraphBranch ¶
func NewStreamGraphBranch[T any](condition StreamGraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
NewStreamGraphBranch creates a new stream graph branch. It is used to determine the next node based on the condition of stream input. e.g.
condition := func(ctx context.Context, in *schema.StreamReader[T]) (string, error) {
// logic to determine the next node.
// to use the feature of stream, you can use the first chunk to determine the next node.
return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewStreamGraphBranch(condition, endNodes)
graph.AddBranch("key_of_node_before_branch", branch)
func NewStreamGraphMultiBranch ¶ added in v0.3.18
func NewStreamGraphMultiBranch[T any](condition StreamGraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch
func (*GraphBranch) GetEndNode ¶
func (gb *GraphBranch) GetEndNode() map[string]bool
GetEndNode returns the all end nodes of the branch.
type GraphBranchCondition ¶
GraphBranchCondition is the condition type for the branch.
type GraphCompileCallback ¶
GraphCompileCallback is the callback which will be called when graph compilation finishes.
type GraphCompileOption ¶
type GraphCompileOption func(*graphCompileOptions)
GraphCompileOption options for compiling AnyGraph.
func WithCheckPointStore ¶ added in v0.3.18
func WithCheckPointStore(store CheckPointStore) GraphCompileOption
func WithEagerExecution ¶ added in v0.3.34
func WithEagerExecution() GraphCompileOption
WithEagerExecution enables the eager execution mode for the graph. In eager mode, nodes will be executed immediately once they are ready to run, without waiting for the completion of a super step, ref: https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine Note: Eager mode is not allowed when the graph's trigger mode is set to AnyPredecessor. Workflow uses eager mode by default. Deprecated: Eager execution is automatically enabled by default when a node's trigger mode is set to AllPredecessor. If you were using this option previously, it can be safely removed without changing behavior.
func WithEagerExecutionDisabled ¶ added in v0.4.0
func WithEagerExecutionDisabled() GraphCompileOption
WithEagerExecutionDisabled disables the eager execution mode for the graph. By default, eager execution is enabled for Workflow and Graph with the AllPredecessor trigger mode. After using this option, nodes will wait for the completion of a super step instead of execute immediately once they are ready to run. ref: https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine
func WithFanInMergeConfig ¶ added in v0.3.44
func WithFanInMergeConfig(confs map[string]FanInMergeConfig) GraphCompileOption
WithFanInMergeConfig sets the fan-in merge configurations for the graph nodes that receive inputs from multiple sources.
func WithGraphCompileCallbacks ¶
func WithGraphCompileCallbacks(cbs ...GraphCompileCallback) GraphCompileOption
WithGraphCompileCallbacks sets callbacks for graph compilation.
func WithGraphName ¶
func WithGraphName(graphName string) GraphCompileOption
WithGraphName sets a name for the graph. The name is used for debugging and logging purposes. If not set, a default name will be used.
func WithInterruptAfterNodes ¶ added in v0.3.18
func WithInterruptAfterNodes(nodes []string) GraphCompileOption
func WithInterruptBeforeNodes ¶ added in v0.3.18
func WithInterruptBeforeNodes(nodes []string) GraphCompileOption
func WithMaxRunSteps ¶
func WithMaxRunSteps(maxSteps int) GraphCompileOption
WithMaxRunSteps sets the maximum number of steps that a graph can run. This is useful to prevent infinite loops in graphs with cycles. If the number of steps exceeds maxSteps, the graph execution will be terminated with an error.
func WithNodeTriggerMode ¶
func WithNodeTriggerMode(triggerMode NodeTriggerMode) GraphCompileOption
WithNodeTriggerMode sets the trigger mode for nodes in the graph. The trigger mode determines when a node is triggered during graph execution, ref: https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine AnyPredecessor by default.
func WithSerializer ¶ added in v0.3.50
func WithSerializer(serializer Serializer) GraphCompileOption
type GraphInfo ¶
type GraphInfo struct {
CompileOptions []GraphCompileOption
Nodes map[string]GraphNodeInfo // node key -> node info
Edges map[string][]string // edge start node key -> edge end node key, control edges
DataEdges map[string][]string
Branches map[string][]GraphBranch // branch start node key -> branch
InputType, OutputType reflect.Type
Name string
NewGraphOptions []NewGraphOption
GenStateFn func(context.Context) any
}
GraphInfo the info which end users pass in when they are compiling a graph. it is used in compile callback for user to get the node info and instance. you may need all details info of the graph for observation.
type GraphInterruptOption ¶ added in v0.5.4
type GraphInterruptOption func(o *graphInterruptOptions)
func WithGraphInterruptTimeout ¶ added in v0.5.4
func WithGraphInterruptTimeout(timeout time.Duration) GraphInterruptOption
WithGraphInterruptTimeout specifies the max waiting time before generating an interrupt. After the max waiting time, the graph will force an interrupt. Any unfinished tasks will be re-run when the graph is resumed.
type GraphMultiBranchCondition ¶ added in v0.3.18
type GraphMultiBranchCondition[T any] func(ctx context.Context, in T) (endNode map[string]bool, err error)
GraphMultiBranchCondition is the condition type for the multi choice branch.
type GraphNodeInfo ¶
type GraphNodeInfo struct {
Component components.Component
Instance any
GraphAddNodeOpts []GraphAddNodeOpt
InputType, OutputType reflect.Type // mainly for lambda, whose input and output types cannot be inferred by component type
Name string
InputKey, OutputKey string
GraphInfo *GraphInfo
Mappings []*FieldMapping
}
GraphNodeInfo the info which end users pass in when they are adding nodes to graph.
type InterruptCtx ¶ added in v0.7.0
type InterruptCtx = core.InterruptCtx
InterruptCtx provides a complete, user-facing context for a single, resumable interrupt point.
type InterruptInfo ¶ added in v0.3.18
type InterruptInfo struct {
State any
BeforeNodes []string
AfterNodes []string
RerunNodes []string
RerunNodesExtra map[string]any
SubGraphs map[string]*InterruptInfo
InterruptContexts []*InterruptCtx
}
func ExtractInterruptInfo ¶ added in v0.3.18
func ExtractInterruptInfo(err error) (info *InterruptInfo, existed bool)
type InvokableToolEndpoint ¶ added in v0.5.14
type InvokableToolEndpoint func(ctx context.Context, input *ToolInput) (*ToolOutput, error)
type InvokableToolMiddleware ¶ added in v0.5.14
type InvokableToolMiddleware func(InvokableToolEndpoint) InvokableToolEndpoint
InvokableToolMiddleware is a function that wraps InvokableToolEndpoint to add custom processing logic. It can be used to intercept, modify, or enhance tool call execution for non-streaming tools.
type Invoke ¶
type Invoke[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output O, err error)
Invoke is the type of the invokable lambda function.
type InvokeWOOpt ¶
InvokeWOOpt is the type of the invokable lambda function without options.
type Lambda ¶
type Lambda struct {
// contains filtered or unexported fields
}
Lambda is the node that wraps the user provided lambda function. It can be used as a node in Graph or Chain (include Parallel and Branch). Create a Lambda by using AnyLambda/InvokableLambda/StreamableLambda/CollectableLambda/TransformableLambda. eg.
lambda := compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
return input, nil
})
func AnyLambda ¶
func AnyLambda[I, O, TOption any](i Invoke[I, O, TOption], s Stream[I, O, TOption], c Collect[I, O, TOption], t Transform[I, O, TOption], opts ...LambdaOpt) (*Lambda, error)
AnyLambda creates a Lambda with any lambda function. you can only implement one or more of the four lambda functions, and the rest use nil. eg.
invokeFunc := func(ctx context.Context, input string, opts ...myOption) (output string, err error) {
// ...
}
streamFunc := func(ctx context.Context, input string, opts ...myOption) (output *schema.StreamReader[string], err error) {
// ...
}
lambda := compose.AnyLambda(invokeFunc, streamFunc, nil, nil)
func CollectableLambda ¶
func CollectableLambda[I, O any](c CollectWOOpt[I, O], opts ...LambdaOpt) *Lambda
CollectableLambda creates a Lambda with collectable lambda function without options.
func CollectableLambdaWithOption ¶
func CollectableLambdaWithOption[I, O, TOption any](c Collect[I, O, TOption], opts ...LambdaOpt) *Lambda
CollectableLambdaWithOption creates a Lambda with collectable lambda function and options.
func InvokableLambda ¶
func InvokableLambda[I, O any](i InvokeWOOpt[I, O], opts ...LambdaOpt) *Lambda
InvokableLambda creates a Lambda with invokable lambda function without options.
func InvokableLambdaWithOption ¶
func InvokableLambdaWithOption[I, O, TOption any](i Invoke[I, O, TOption], opts ...LambdaOpt) *Lambda
InvokableLambdaWithOption creates a Lambda with invokable lambda function and options.
func MessageParser ¶
func MessageParser[T any](p schema.MessageParser[T], opts ...LambdaOpt) *Lambda
MessageParser creates a lambda that parses a message into an object T, usually used after a chatmodel. usage:
parser := schema.NewMessageJSONParser[MyStruct](&schema.MessageJSONParseConfig{
ParseFrom: schema.MessageParseFromContent,
})
parserLambda := MessageParser(parser)
chain := NewChain[*schema.Message, MyStruct]()
chain.AppendChatModel(chatModel)
chain.AppendLambda(parserLambda)
r, err := chain.Compile(context.Background())
// parsed is a MyStruct object
parsed, err := r.Invoke(context.Background(), &schema.Message{
Role: schema.MessageRoleUser,
Content: "return a json string for my struct",
})
func StreamableLambda ¶
func StreamableLambda[I, O any](s StreamWOOpt[I, O], opts ...LambdaOpt) *Lambda
StreamableLambda creates a Lambda with streamable lambda function without options.
func StreamableLambdaWithOption ¶
func StreamableLambdaWithOption[I, O, TOption any](s Stream[I, O, TOption], opts ...LambdaOpt) *Lambda
StreamableLambdaWithOption creates a Lambda with streamable lambda function and options.
func ToList ¶
ToList creates a Lambda that converts input I to a []I. It's useful when you want to convert a single input to a list of inputs. eg.
lambda := compose.ToList[*schema.Message]() chain := compose.NewChain[[]*schema.Message, []*schema.Message]() chain.AddChatModel(chatModel) // chatModel returns *schema.Message, but we need []*schema.Message chain.AddLambda(lambda) // convert *schema.Message to []*schema.Message
func TransformableLambda ¶
func TransformableLambda[I, O any](t TransformWOOpts[I, O], opts ...LambdaOpt) *Lambda
TransformableLambda creates a Lambda with transformable lambda function without options.
type LambdaOpt ¶
type LambdaOpt func(o *lambdaOpts)
LambdaOpt is the option for creating a Lambda.
func WithLambdaCallbackEnable ¶
WithLambdaCallbackEnable enables the callback aspect of the lambda function.
func WithLambdaType ¶
WithLambdaType sets the type of the lambda function.
type NewGraphOption ¶ added in v0.3.1
type NewGraphOption func(ngo *newGraphOptions)
func WithGenLocalState ¶ added in v0.3.1
func WithGenLocalState[S any](gls GenLocalState[S]) NewGraphOption
type NodePath ¶ added in v0.3.4
type NodePath struct {
// contains filtered or unexported fields
}
func NewNodePath ¶ added in v0.3.4
NewNodePath specifies a path to a node in the graph, which is composed of node keys. Starting from the top graph, following this set of node keys can lead to a specific node in the top graph or a subgraph.
e.g. NewNodePath("sub_graph_node_key", "node_key_within_sub_graph")
type NodeTriggerMode ¶
type NodeTriggerMode string
NodeTriggerMode controls the triggering mode of graph nodes.
const ( // AnyPredecessor means that the node will be triggered when any of its predecessors is included in the previous completed super step. // Ref:https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine AnyPredecessor NodeTriggerMode = "any_predecessor" // AllPredecessor means that the current node will only be triggered when all of its predecessor nodes have finished running. AllPredecessor NodeTriggerMode = "all_predecessor" )
type Option ¶
type Option struct {
// contains filtered or unexported fields
}
Option is a functional option type for calling a graph.
func WithCallbacks ¶
WithCallbacks set callback handlers for all components in a single call. e.g.
runnable.Invoke(ctx, "input", compose.WithCallbacks(&myCallbacks{}))
func WithChatModelOption ¶
WithChatModelOption is a functional option type for chat model component. e.g.
chatModelOption := compose.WithChatModelOption(model.WithTemperature(0.7)) runnable.Invoke(ctx, "input", chatModelOption)
func WithChatTemplateOption ¶
WithChatTemplateOption is a functional option type for chat template component.
func WithCheckPointID ¶ added in v0.3.18
func WithDocumentTransformerOption ¶
func WithDocumentTransformerOption(opts ...document.TransformerOption) Option
WithDocumentTransformerOption is a functional option type for document transformer component.
func WithEmbeddingOption ¶
WithEmbeddingOption is a functional option type for embedding component. e.g.
embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small"))
runnable.Invoke(ctx, "input", embeddingOption)
func WithForceNewRun ¶ added in v0.3.45
func WithForceNewRun() Option
WithForceNewRun forces the graph to run from the beginning, ignoring any checkpoints.
func WithIndexerOption ¶
WithIndexerOption is a functional option type for indexer component. e.g.
indexerOption := compose.WithIndexerOption(indexer.WithSubIndexes([]string{"my_sub_index"}))
runnable.Invoke(ctx, "input", indexerOption)
func WithLambdaOption ¶
WithLambdaOption is a functional option type for lambda component.
func WithLoaderOption ¶
func WithLoaderOption(opts ...document.LoaderOption) Option
WithLoaderOption is a functional option type for loader component. e.g.
loaderOption := compose.WithLoaderOption(document.WithCollection("my_collection"))
runnable.Invoke(ctx, "input", loaderOption)
func WithRetrieverOption ¶
WithRetrieverOption is a functional option type for retriever component. e.g.
retrieverOption := compose.WithRetrieverOption(retriever.WithIndex("my_index"))
runnable.Invoke(ctx, "input", retrieverOption)
func WithRuntimeMaxSteps ¶
WithRuntimeMaxSteps sets the maximum number of steps for the graph runtime. e.g.
runnable.Invoke(ctx, "input", compose.WithRuntimeMaxSteps(20))
func WithStateModifier ¶ added in v0.3.18
func WithStateModifier(sm StateModifier) Option
func WithToolsNodeOption ¶
func WithToolsNodeOption(opts ...ToolsNodeOption) Option
WithToolsNodeOption is a functional option type for tools node component.
func WithWriteToCheckPointID ¶ added in v0.3.53
WithWriteToCheckPointID specifies a different checkpoint ID to write to. If not provided, the checkpoint ID from WithCheckPointID will be used for writing. This is useful for scenarios where you want to load from an existed checkpoint but save the progress to a new, separate checkpoint.
func (Option) DesignateNode ¶
DesignateNode sets the key of the node to which the option will be applied. notice: only effective at the top graph. e.g.
embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small")) runnable.Invoke(ctx, "input", embeddingOption.DesignateNode("embedding_node_key"))
func (Option) DesignateNodeWithPath ¶ added in v0.3.4
DesignateNodeWithPath sets the path of the node(s) to which the option will be applied. You can specify a node in the subgraph through `NodePath` to make the option only take effect at this node.
e.g. nodePath := NewNodePath("sub_graph_node_key", "node_key_within_sub_graph") DesignateNodeWithPath(nodePath)
type Parallel ¶
type Parallel struct {
// contains filtered or unexported fields
}
Parallel run multiple nodes in parallel
use `NewParallel()` to create a new parallel type Example:
parallel := NewParallel()
parallel.AddChatModel("output_key01", chat01)
parallel.AddChatModel("output_key01", chat02)
chain := NewChain[any,any]()
chain.AppendParallel(parallel)
func NewParallel ¶
func NewParallel() *Parallel
NewParallel creates a new parallel type. it is useful when you want to run multiple nodes in parallel in a chain.
func (*Parallel) AddChatModel ¶
func (p *Parallel) AddChatModel(outputKey string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *Parallel
AddChatModel adds a chat model to the parallel. eg.
chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
Model: "gpt-4o",
})
chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
Model: "gpt-4o",
})
p.AddChatModel("output_key01", chatModel01)
p.AddChatModel("output_key02", chatModel02)
func (*Parallel) AddChatTemplate ¶
func (p *Parallel) AddChatTemplate(outputKey string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Parallel
AddChatTemplate adds a chat template to the parallel. eg.
chatTemplate01, err := prompt.FromMessages(schema.FString, &schema.Message{
Role: schema.System,
Content: "You are acting as a {role}.",
})
p.AddChatTemplate("output_key01", chatTemplate01)
func (*Parallel) AddDocumentTransformer ¶
func (p *Parallel) AddDocumentTransformer(outputKey string, node document.Transformer, opts ...GraphAddNodeOpt) *Parallel
AddDocumentTransformer adds an Document Transformer node to the parallel. eg.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})
p.AddDocumentTransformer("output_key01", markdownSplitter)
func (*Parallel) AddEmbedding ¶
func (p *Parallel) AddEmbedding(outputKey string, node embedding.Embedder, opts ...GraphAddNodeOpt) *Parallel
AddEmbedding adds an embedding node to the parallel. eg.
embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{
Model: "text-embedding-3-small",
})
p.AddEmbedding("output_key01", embeddingNode)
func (*Parallel) AddGraph ¶
func (p *Parallel) AddGraph(outputKey string, node AnyGraph, opts ...GraphAddNodeOpt) *Parallel
AddGraph adds a graph node to the parallel. It is useful when you want to use a graph or a chain as a node in the parallel. eg.
graph, err := compose.NewChain[any,any]()
p.AddGraph("output_key01", graph)
func (*Parallel) AddIndexer ¶
func (p *Parallel) AddIndexer(outputKey string, node indexer.Indexer, opts ...GraphAddNodeOpt) *Parallel
AddIndexer adds an indexer node to the parallel. eg.
indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{
Collection: "my_collection",
})
p.AddIndexer("output_key01", indexer)
func (*Parallel) AddLambda ¶
func (p *Parallel) AddLambda(outputKey string, node *Lambda, opts ...GraphAddNodeOpt) *Parallel
AddLambda adds a lambda node to the parallel. eg.
lambdaFunc := func(ctx context.Context, input *schema.Message) ([]*schema.Message, error) {
return []*schema.Message{input}, nil
}
p.AddLambda("output_key01", compose.InvokeLambda(lambdaFunc))
func (*Parallel) AddLoader ¶
func (p *Parallel) AddLoader(outputKey string, node document.Loader, opts ...GraphAddNodeOpt) *Parallel
AddLoader adds a loader node to the parallel. eg.
loader, err := file.NewLoader(ctx, &file.LoaderConfig{})
p.AddLoader("output_key01", loader)
func (*Parallel) AddPassthrough ¶
func (p *Parallel) AddPassthrough(outputKey string, opts ...GraphAddNodeOpt) *Parallel
AddPassthrough adds a passthrough node to the parallel. eg.
p.AddPassthrough("output_key01")
func (*Parallel) AddRetriever ¶
func (p *Parallel) AddRetriever(outputKey string, node retriever.Retriever, opts ...GraphAddNodeOpt) *Parallel
AddRetriever adds a retriever node to the parallel. eg.
retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{})
p.AddRetriever("output_key01", retriever)
func (*Parallel) AddToolsNode ¶
func (p *Parallel) AddToolsNode(outputKey string, node *ToolsNode, opts ...GraphAddNodeOpt) *Parallel
AddToolsNode adds a tools node to the parallel. eg.
toolsNode, err := compose.NewToolNode(ctx, &compose.ToolsNodeConfig{
Tools: []tool.BaseTool{...},
})
p.AddToolsNode("output_key01", toolsNode)
type Runnable ¶
type Runnable[I, O any] interface { Invoke(ctx context.Context, input I, opts ...Option) (output O, err error) Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error) Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error) Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error) }
Runnable is the interface for an executable object. Graph, Chain can be compiled into Runnable. runnable is the core conception of eino, we do downgrade compatibility for four data flow patterns, and can automatically connect components that only implement one or more methods. eg, if a component only implements Stream() method, you can still call Invoke() to convert stream output to invoke output.
type Serializer ¶ added in v0.3.50
type StateModifier ¶ added in v0.3.18
type StatePostHandler ¶
StatePostHandler is a function called after the node is executed. Notice: if user called Stream but with StatePostHandler, the StatePostHandler will read all stream chunks and merge them into a single object.
type StatePreHandler ¶
StatePreHandler is a function called before the node is executed. Notice: if user called Stream but with StatePreHandler, the StatePreHandler will read all stream chunks and merge them into a single object.
type Stream ¶
type Stream[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output *schema.StreamReader[O], err error)
Stream is the type of the streamable lambda function.
type StreamGraphBranchCondition ¶
type StreamGraphBranchCondition[T any] func(ctx context.Context, in *schema.StreamReader[T]) (endNode string, err error)
StreamGraphBranchCondition is the condition type for the stream branch.
type StreamGraphMultiBranchCondition ¶ added in v0.3.18
type StreamGraphMultiBranchCondition[T any] func(ctx context.Context, in *schema.StreamReader[T]) (endNodes map[string]bool, err error)
StreamGraphMultiBranchCondition is the condition type for the stream multi choice branch.
type StreamStatePostHandler ¶
type StreamStatePostHandler[O, S any] func(ctx context.Context, out *schema.StreamReader[O], state S) (*schema.StreamReader[O], error)
StreamStatePostHandler is a function that is called after the node is executed with stream input and output.
type StreamStatePreHandler ¶
type StreamStatePreHandler[I, S any] func(ctx context.Context, in *schema.StreamReader[I], state S) (*schema.StreamReader[I], error)
StreamStatePreHandler is a function that is called before the node is executed with stream input and output.
type StreamToolOutput ¶ added in v0.5.14
type StreamToolOutput struct {
// Result is a stream reader that provides access to the tool's streaming output.
Result *schema.StreamReader[string]
}
StreamToolOutput represents the result of a streaming tool call execution.
type StreamWOOpt ¶
type StreamWOOpt[I, O any] func(ctx context.Context, input I) (output *schema.StreamReader[O], err error)
StreamWOOpt is the type of the streamable lambda function without options.
type StreamableToolEndpoint ¶ added in v0.5.14
type StreamableToolEndpoint func(ctx context.Context, input *ToolInput) (*StreamToolOutput, error)
type StreamableToolMiddleware ¶ added in v0.5.14
type StreamableToolMiddleware func(StreamableToolEndpoint) StreamableToolEndpoint
StreamableToolMiddleware is a function that wraps StreamableToolEndpoint to add custom processing logic. It can be used to intercept, modify, or enhance tool call execution for streaming tools.
type ToolInput ¶ added in v0.5.14
type ToolInput struct {
// Name is the name of the tool to be executed.
Name string
// Arguments contains the arguments for the tool call.
Arguments string
// CallID is the unique identifier for this tool call.
CallID string
// CallOptions contains tool options for the execution.
CallOptions []tool.Option
}
ToolInput represents the input parameters for a tool call execution.
type ToolMiddleware ¶ added in v0.5.14
type ToolMiddleware struct {
// Invokable contains middleware function for non-streaming tool calls.
// Note: This middleware only applies to tools that implement the InvokableTool interface.
Invokable InvokableToolMiddleware
// Streamable contains middleware function for streaming tool calls.
// Note: This middleware only applies to tools that implement the StreamableTool interface.
Streamable StreamableToolMiddleware
}
type ToolOutput ¶ added in v0.5.14
type ToolOutput struct {
// Result contains the string output from the tool execution.
Result string
}
ToolOutput represents the result of a non-streaming tool call execution.
type ToolsInterruptAndRerunExtra ¶ added in v0.3.38
type ToolsNode ¶
type ToolsNode struct {
// contains filtered or unexported fields
}
ToolsNode represents a node capable of executing tools within a graph. The Graph Node interface is defined as follows:
Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error) Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)
Input: An AssistantMessage containing ToolCalls Output: An array of ToolMessage where the order of elements corresponds to the order of ToolCalls in the input
func NewToolNode ¶
func NewToolNode(ctx context.Context, conf *ToolsNodeConfig) (*ToolsNode, error)
NewToolNode creates a new ToolsNode. e.g.
conf := &ToolsNodeConfig{
Tools: []tool.BaseTool{invokableTool1, streamableTool2},
}
toolsNode, err := NewToolNode(ctx, conf)
func (*ToolsNode) Invoke ¶
func (tn *ToolsNode) Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error)
Invoke calls the tools and collects the results of invokable tools. it's parallel if there are multiple tool calls in the input message.
func (*ToolsNode) Stream ¶
func (tn *ToolsNode) Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)
Stream calls the tools and collects the results of stream readers. it's parallel if there are multiple tool calls in the input message.
type ToolsNodeConfig ¶
type ToolsNodeConfig struct {
// Tools specify the list of tools can be called which are BaseTool but must implement InvokableTool or StreamableTool.
Tools []tool.BaseTool
// UnknownToolsHandler handles tool calls for non-existent tools when LLM hallucinates.
// This field is optional. When not set, calling a non-existent tool will result in an error.
// When provided, if the LLM attempts to call a tool that doesn't exist in the Tools list,
// this handler will be invoked instead of returning an error, allowing graceful handling of hallucinated tools.
// Parameters:
// - ctx: The context for the tool call
// - name: The name of the non-existent tool
// - input: The tool call input generated by llm
// Returns:
// - string: The response to be returned as if the tool was executed
// - error: Any error that occurred during handling
UnknownToolsHandler func(ctx context.Context, name, input string) (string, error)
// ExecuteSequentially determines whether tool calls should be executed sequentially (in order) or in parallel.
// When set to true, tool calls will be executed one after another in the order they appear in the input message.
// When set to false (default), tool calls will be executed in parallel.
ExecuteSequentially bool
// ToolArgumentsHandler allows handling of tool arguments before execution.
// When provided, this function will be called for each tool call to process the arguments.
// Parameters:
// - ctx: The context for the tool call
// - name: The name of the tool being called
// - arguments: The original arguments string for the tool
// Returns:
// - string: The processed arguments string to be used for tool execution
// - error: Any error that occurred during preprocessing
ToolArgumentsHandler func(ctx context.Context, name, arguments string) (string, error)
// ToolCallMiddlewares configures middleware for tool calls.
// Each element can contain Invokable and/or Streamable middleware.
// Invokable middleware only applies to tools implementing InvokableTool interface.
// Streamable middleware only applies to tools implementing StreamableTool interface.
ToolCallMiddlewares []ToolMiddleware
}
ToolsNodeConfig is the config for ToolsNode.
type ToolsNodeOption ¶
type ToolsNodeOption func(o *toolsNodeOptions)
ToolsNodeOption is the option func type for ToolsNode.
func WithToolList ¶ added in v0.3.15
func WithToolList(tool ...tool.BaseTool) ToolsNodeOption
WithToolList sets the tool list for the ToolsNode.
func WithToolOption ¶
func WithToolOption(opts ...tool.Option) ToolsNodeOption
WithToolOption adds tool options to the ToolsNode.
type Transform ¶
type Transform[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output *schema.StreamReader[O], err error)
Transform is the type of the transformable lambda function.
type TransformWOOpts ¶
type TransformWOOpts[I, O any] func(ctx context.Context, input *schema.StreamReader[I]) (output *schema.StreamReader[O], err error)
TransformWOOpts is the type of the transformable lambda function without options.
type Workflow ¶ added in v0.3.8
type Workflow[I, O any] struct { // contains filtered or unexported fields }
Workflow is wrapper of graph, replacing AddEdge with declaring dependencies and field mappings between nodes. Under the hood it uses NodeTriggerMode(AllPredecessor), so does not support cycles.
func NewWorkflow ¶ added in v0.3.8
func NewWorkflow[I, O any](opts ...NewGraphOption) *Workflow[I, O]
NewWorkflow creates a new Workflow.
func (*Workflow[I, O]) AddBranch ¶ added in v0.3.18
func (wf *Workflow[I, O]) AddBranch(fromNodeKey string, branch *GraphBranch) *WorkflowBranch
AddBranch adds a branch to the workflow.
End Nodes Field Mappings: End nodes of the branch are required to define their own field mappings. This is a key distinction between Graph's Branch and Workflow's Branch: - Graph's Branch: Automatically passes its input to the selected node. - Workflow's Branch: Does not pass its input to the selected node.
func (*Workflow[I, O]) AddChatModelNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddChatModelNode(key string, chatModel model.BaseChatModel, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddChatTemplateNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddChatTemplateNode(key string, chatTemplate prompt.ChatTemplate, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddDocumentTransformerNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddDocumentTransformerNode(key string, transformer document.Transformer, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddEmbeddingNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddEmbeddingNode(key string, embedding embedding.Embedder, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddEnd
deprecated
added in
v0.3.8
func (wf *Workflow[I, O]) AddEnd(fromNodeKey string, inputs ...*FieldMapping) *Workflow[I, O]
Deprecated: use *Workflow[I,O].End() to obtain a WorkflowNode instance for END, then work with it just like a normal WorkflowNode.
func (*Workflow[I, O]) AddGraphNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddGraphNode(key string, graph AnyGraph, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddIndexerNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddIndexerNode(key string, indexer indexer.Indexer, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddLambdaNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddLambdaNode(key string, lambda *Lambda, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddLoaderNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddLoaderNode(key string, loader document.Loader, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddPassthroughNode ¶ added in v0.3.18
func (wf *Workflow[I, O]) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddRetrieverNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddRetrieverNode(key string, retriever retriever.Retriever, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddToolsNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddToolsNode(key string, tools *ToolsNode, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) End ¶ added in v0.3.18
func (wf *Workflow[I, O]) End() *WorkflowNode
End returns the WorkflowNode representing END node.
type WorkflowAddInputOpt ¶ added in v0.3.18
type WorkflowAddInputOpt func(*workflowAddInputOpts)
func WithNoDirectDependency ¶ added in v0.3.18
func WithNoDirectDependency() WorkflowAddInputOpt
WithNoDirectDependency creates a data mapping without establishing a direct execution dependency. The predecessor node will still complete before the current node executes, but through indirect execution paths rather than a direct dependency.
In a workflow graph, node dependencies typically serve two purposes: 1. Execution order: determining when nodes should run 2. Data flow: specifying how data passes between nodes
This option separates these concerns by:
- Creating data mapping from the predecessor to the current node
- Relying on the predecessor's path to reach the current node through other nodes that have direct execution dependencies
Example:
node.AddInputWithOptions("dataNode", mappings, WithNoDirectDependency())
Important:
Branch scenarios: When connecting nodes on different sides of a branch, WithNoDirectDependency MUST be used to let the branch itself handle the execution order, preventing incorrect dependencies that could bypass the branch.
Execution guarantee: The predecessor will still complete before the current node executes because the predecessor must have a path (through other nodes) that eventually reaches the current node.
Graph validity: There MUST be a path from the predecessor that eventually reaches the current node through other nodes with direct dependencies. This ensures the execution order while avoiding redundant direct dependencies.
Common use cases: - Cross-branch data access where the branch handles execution order - Avoiding redundant dependencies when a path already exists
type WorkflowBranch ¶ added in v0.3.18
type WorkflowBranch struct {
*GraphBranch
// contains filtered or unexported fields
}
type WorkflowNode ¶ added in v0.3.8
type WorkflowNode struct {
// contains filtered or unexported fields
}
WorkflowNode is the node of the Workflow.
func (*WorkflowNode) AddDependency ¶ added in v0.3.18
func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode
AddDependency creates an execution-only dependency between nodes. The current node will wait for the predecessor node to complete before executing, but no data will be passed between them.
Parameters:
- fromNodeKey: the key of the predecessor node that must complete before this node starts
Example:
// Wait for "setupNode" to complete before executing
node.AddDependency("setupNode")
This is useful when: - You need to ensure execution order without data transfer - The predecessor performs setup or initialization that must complete first - You want to explicitly separate execution dependencies from data flow
Returns the current node for method chaining.
func (*WorkflowNode) AddInput ¶ added in v0.3.8
func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode
AddInput creates both data and execution dependencies between nodes. It configures how data flows from the predecessor node (fromNodeKey) to the current node, and ensures the current node only executes after the predecessor completes.
Parameters:
- fromNodeKey: the key of the predecessor node
- inputs: field mappings that specify how data should flow from the predecessor to the current node. If no mappings are provided, the entire output of the predecessor will be used as input.
Example:
// Map between specific field
node.AddInput("userNode", MapFields("user.name", "displayName"))
// Use entire output
node.AddInput("dataNode")
Returns the current node for method chaining.
func (*WorkflowNode) AddInputWithOptions ¶ added in v0.3.18
func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode
AddInputWithOptions creates a dependency between nodes with custom configuration options. It allows fine-grained control over both data flow and execution dependencies.
Parameters:
- fromNodeKey: the key of the predecessor node
- inputs: field mappings that specify how data flows from the predecessor to the current node. If no mappings are provided, the entire output of the predecessor will be used as input.
- opts: configuration options that control how the dependency is established
Example:
// Create data mapping without direct execution dependency
node.AddInputWithOptions("dataNode", mappings, WithNoDirectDependency())
Returns the current node for method chaining.
func (*WorkflowNode) SetStaticValue ¶ added in v0.3.18
func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode
SetStaticValue sets a static value for a field path that will be available during workflow execution. These values are determined at compile time and remain constant throughout the workflow's lifecycle.
Example:
node.SetStaticValue(FieldPath{"query"}, "static query")
Source Files
¶
- branch.go
- chain.go
- chain_branch.go
- chain_parallel.go
- checkpoint.go
- component_to_graph_node.go
- dag.go
- doc.go
- error.go
- field_mapping.go
- generic_graph.go
- generic_helper.go
- graph.go
- graph_add_node_options.go
- graph_call_options.go
- graph_compile_options.go
- graph_manager.go
- graph_node.go
- graph_run.go
- interrupt.go
- introspect.go
- pregel.go
- resume.go
- runnable.go
- state.go
- stream_concat.go
- stream_reader.go
- tool_node.go
- types.go
- types_composable.go
- types_lambda.go
- utils.go
- values_merge.go
- workflow.go