Documentation
¶
Overview ¶
Package runnable provides composition primitives for building chains from Runnable components (the Go equivalent of LCEL).
Index ¶
- type Assign
- func (a *Assign[I]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]map[string]any, error)
- func (a *Assign[I]) GetName() string
- func (a *Assign[I]) Invoke(ctx context.Context, input I, opts ...core.Option) (map[string]any, error)
- func (a *Assign[I]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[map[string]any], error)
- type Branch
- func (b *Branch[I, O]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]O, error)
- func (b *Branch[I, O]) GetName() string
- func (b *Branch[I, O]) Invoke(ctx context.Context, input I, opts ...core.Option) (O, error)
- func (b *Branch[I, O]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[O], error)
- func (b *Branch[I, O]) WithName(name string) *Branch[I, O]
- type BranchCondition
- type Lambda
- func (l *Lambda[I, O]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]O, error)
- func (l *Lambda[I, O]) GetName() string
- func (l *Lambda[I, O]) Invoke(ctx context.Context, input I, opts ...core.Option) (O, error)
- func (l *Lambda[I, O]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[O], error)
- func (l *Lambda[I, O]) WithName(name string) *Lambda[I, O]
- type Parallel
- func (p *Parallel[I]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]map[string]any, error)
- func (p *Parallel[I]) GetName() string
- func (p *Parallel[I]) Invoke(ctx context.Context, input I, opts ...core.Option) (map[string]any, error)
- func (p *Parallel[I]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[map[string]any], error)
- func (p *Parallel[I]) WithName(name string) *Parallel[I]
- type Passthrough
- func (p *Passthrough[T]) Batch(ctx context.Context, inputs []T, opts ...core.Option) ([]T, error)
- func (p *Passthrough[T]) GetName() string
- func (p *Passthrough[T]) Invoke(ctx context.Context, input T, opts ...core.Option) (T, error)
- func (p *Passthrough[T]) Stream(ctx context.Context, input T, opts ...core.Option) (*core.StreamIterator[T], error)
- func (p *Passthrough[T]) WithName(name string) *Passthrough[T]
- type Sequence
- func Pipe(runnables ...core.Runnable[any, any]) *Sequence[any, any]
- func Pipe2[A, B, C any](first core.Runnable[A, B], second core.Runnable[B, C]) *Sequence[A, C]
- func Pipe3[A, B, C, D any](first core.Runnable[A, B], second core.Runnable[B, C], ...) *Sequence[A, D]
- func Pipe4[A, B, C, D, E any](first core.Runnable[A, B], second core.Runnable[B, C], ...) *Sequence[A, E]
- func (s *Sequence[I, O]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]O, error)
- func (s *Sequence[I, O]) GetName() string
- func (s *Sequence[I, O]) Invoke(ctx context.Context, input I, opts ...core.Option) (O, error)
- func (s *Sequence[I, O]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[O], error)
- func (s *Sequence[I, O]) WithName(name string) *Sequence[I, O]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Assign ¶
type Assign[I any] struct { // contains filtered or unexported fields }
Assign creates a parallel runnable that passes the input through unchanged while also computing additional keys. The result is a map[string]any containing the original input plus the computed values.
Usage:
chain := Assign(
map[string]core.Runnable[map[string]any, any]{
"context": retriever,
},
)
This adds a "context" key to the input map while preserving existing keys.
func NewAssign ¶
func NewAssign[I any](additions map[string]func(ctx context.Context, input I, opts ...core.Option) (any, error)) *Assign[I]
NewAssign creates a new Assign runnable.
func (*Assign[I]) Batch ¶
func (a *Assign[I]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]map[string]any, error)
Batch runs for multiple inputs.
type Branch ¶
type Branch[I, O any] struct { // contains filtered or unexported fields }
Branch selects which runnable to execute based on conditions. It evaluates conditions in order and runs the first one that returns true. If no conditions match, the default runnable is used. It implements Runnable[I, O].
func NewBranch ¶
func NewBranch[I, O any]( conditions []BranchCondition[I, O], defaultBranch core.Runnable[I, O], ) *Branch[I, O]
NewBranch creates a new Branch runnable. conditions are evaluated in order; the first true condition's runnable is executed. defaultBranch is used when no conditions match.
type BranchCondition ¶
BranchCondition pairs a condition function with a runnable.
type Lambda ¶
type Lambda[I, O any] struct { // contains filtered or unexported fields }
Lambda wraps a Go function as a Runnable. It implements Runnable[I, O].
type Parallel ¶
type Parallel[I any] struct { // contains filtered or unexported fields }
Parallel runs multiple runnables in parallel with the same input, collecting their outputs into a map[string]any. It implements Runnable[I, map[string]any].
func NewParallel ¶
NewParallel creates a Parallel runnable from a map of named runnables.
func NewParallelAny ¶
func NewParallelAny[I any](branches map[string]func(ctx context.Context, input I, opts ...core.Option) (any, error)) *Parallel[I]
NewParallelAny creates a Parallel runnable from a map of heterogeneous runnables.
func (*Parallel[I]) Batch ¶
func (p *Parallel[I]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]map[string]any, error)
Batch runs the parallel execution for multiple inputs.
func (*Parallel[I]) Invoke ¶
func (p *Parallel[I]) Invoke(ctx context.Context, input I, opts ...core.Option) (map[string]any, error)
Invoke runs all branches in parallel and collects results into a map.
type Passthrough ¶
type Passthrough[T any] struct { // contains filtered or unexported fields }
Passthrough passes its input through unchanged. It implements Runnable[T, T].
func NewPassthrough ¶
func NewPassthrough[T any]() *Passthrough[T]
NewPassthrough creates a new Passthrough runnable.
func (*Passthrough[T]) GetName ¶
func (p *Passthrough[T]) GetName() string
GetName returns the name of this passthrough.
func (*Passthrough[T]) Stream ¶
func (p *Passthrough[T]) Stream(ctx context.Context, input T, opts ...core.Option) (*core.StreamIterator[T], error)
Stream returns a single-chunk stream of the input.
func (*Passthrough[T]) WithName ¶
func (p *Passthrough[T]) WithName(name string) *Passthrough[T]
WithName sets the name for tracing.
type Sequence ¶
type Sequence[I, O any] struct { // contains filtered or unexported fields }
Sequence chains multiple runnables together: the output of each becomes the input of the next. Because Go generics don't support heterogeneous type lists, the intermediate types are erased to `any`. The overall Sequence is typed on the first input and last output.
func Pipe ¶
Pipe creates a type-erased sequence from an untyped series of steps. Each step must accept and produce `any`. This is the most flexible but least type-safe variant.
func Pipe3 ¶
func Pipe3[A, B, C, D any]( first core.Runnable[A, B], second core.Runnable[B, C], third core.Runnable[C, D], ) *Sequence[A, D]
Pipe3 chains three runnables into a Sequence.
func Pipe4 ¶
func Pipe4[A, B, C, D, E any]( first core.Runnable[A, B], second core.Runnable[B, C], third core.Runnable[C, D], fourth core.Runnable[D, E], ) *Sequence[A, E]
Pipe4 chains four runnables into a Sequence.
func (*Sequence[I, O]) Invoke ¶
Invoke runs all steps sequentially, passing each output as the next input.
func (*Sequence[I, O]) Stream ¶
func (s *Sequence[I, O]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[O], error)
Stream runs all steps sequentially and returns the output of the last step as a stream. Currently this is a simple implementation that invokes all steps and streams the final result.