runnable

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package runnable provides composition primitives for building chains from Runnable components (the Go equivalent of LCEL).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Assign

type Assign[I any] struct {
	// contains filtered or unexported fields
}

Assign creates a parallel runnable that passes the input through unchanged while also computing additional keys. The result is a map[string]any containing the original input plus the computed values.

Usage:

chain := Assign(
    map[string]core.Runnable[map[string]any, any]{
        "context": retriever,
    },
)

This adds a "context" key to the input map while preserving existing keys.

func NewAssign

func NewAssign[I any](additions map[string]func(ctx context.Context, input I, opts ...core.Option) (any, error)) *Assign[I]

NewAssign creates a new Assign runnable.

func (*Assign[I]) Batch

func (a *Assign[I]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]map[string]any, error)

Batch runs for multiple inputs.

func (*Assign[I]) GetName

func (a *Assign[I]) GetName() string

GetName returns the name.

func (*Assign[I]) Invoke

func (a *Assign[I]) Invoke(ctx context.Context, input I, opts ...core.Option) (map[string]any, error)

Invoke passes through the input and adds computed keys.

func (*Assign[I]) Stream

func (a *Assign[I]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[map[string]any], error)

Stream returns a single-chunk stream.

type Branch

type Branch[I, O any] struct {
	// contains filtered or unexported fields
}

Branch selects which runnable to execute based on conditions. It evaluates conditions in order and runs the first one that returns true. If no conditions match, the default runnable is used. It implements Runnable[I, O].

func NewBranch

func NewBranch[I, O any](
	conditions []BranchCondition[I, O],
	defaultBranch core.Runnable[I, O],
) *Branch[I, O]

NewBranch creates a new Branch runnable. conditions are evaluated in order; the first true condition's runnable is executed. defaultBranch is used when no conditions match.

func (*Branch[I, O]) Batch

func (b *Branch[I, O]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]O, error)

Batch runs the branch for multiple inputs.

func (*Branch[I, O]) GetName

func (b *Branch[I, O]) GetName() string

GetName returns the name.

func (*Branch[I, O]) Invoke

func (b *Branch[I, O]) Invoke(ctx context.Context, input I, opts ...core.Option) (O, error)

Invoke evaluates conditions and runs the matching branch.

func (*Branch[I, O]) Stream

func (b *Branch[I, O]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[O], error)

Stream evaluates conditions and streams from the matching branch.

func (*Branch[I, O]) WithName

func (b *Branch[I, O]) WithName(name string) *Branch[I, O]

WithName sets the name for tracing.

type BranchCondition

type BranchCondition[I, O any] struct {
	Condition func(input I) bool
	Runnable  core.Runnable[I, O]
}

BranchCondition pairs a condition function with a runnable.

type Lambda

type Lambda[I, O any] struct {
	// contains filtered or unexported fields
}

Lambda wraps a Go function as a Runnable. It implements Runnable[I, O].

func NewLambda

func NewLambda[I, O any](fn func(ctx context.Context, input I) (O, error)) *Lambda[I, O]

NewLambda creates a new Lambda runnable from a function.

func (*Lambda[I, O]) Batch

func (l *Lambda[I, O]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]O, error)

Batch runs the function for each input.

func (*Lambda[I, O]) GetName

func (l *Lambda[I, O]) GetName() string

GetName returns the name of this lambda.

func (*Lambda[I, O]) Invoke

func (l *Lambda[I, O]) Invoke(ctx context.Context, input I, opts ...core.Option) (O, error)

Invoke runs the wrapped function.

func (*Lambda[I, O]) Stream

func (l *Lambda[I, O]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[O], error)

Stream returns a single-chunk stream of the function result.

func (*Lambda[I, O]) WithName

func (l *Lambda[I, O]) WithName(name string) *Lambda[I, O]

WithName sets the name for tracing.

type Parallel

type Parallel[I any] struct {
	// contains filtered or unexported fields
}

Parallel runs multiple runnables in parallel with the same input, collecting their outputs into a map[string]any. It implements Runnable[I, map[string]any].

func NewParallel

func NewParallel[I, O any](branches map[string]core.Runnable[I, O]) *Parallel[I]

NewParallel creates a Parallel runnable from a map of named runnables.

func NewParallelAny

func NewParallelAny[I any](branches map[string]func(ctx context.Context, input I, opts ...core.Option) (any, error)) *Parallel[I]

NewParallelAny creates a Parallel runnable from a map of heterogeneous runnables.

func (*Parallel[I]) Batch

func (p *Parallel[I]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]map[string]any, error)

Batch runs the parallel execution for multiple inputs.

func (*Parallel[I]) GetName

func (p *Parallel[I]) GetName() string

GetName returns the name of this parallel runnable.

func (*Parallel[I]) Invoke

func (p *Parallel[I]) Invoke(ctx context.Context, input I, opts ...core.Option) (map[string]any, error)

Invoke runs all branches in parallel and collects results into a map.

func (*Parallel[I]) Stream

func (p *Parallel[I]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[map[string]any], error)

Stream invokes and returns the result as a single-chunk stream.

func (*Parallel[I]) WithName

func (p *Parallel[I]) WithName(name string) *Parallel[I]

WithName sets the name for tracing.

type Passthrough

type Passthrough[T any] struct {
	// contains filtered or unexported fields
}

Passthrough passes its input through unchanged. It implements Runnable[T, T].

func NewPassthrough

func NewPassthrough[T any]() *Passthrough[T]

NewPassthrough creates a new Passthrough runnable.

func (*Passthrough[T]) Batch

func (p *Passthrough[T]) Batch(ctx context.Context, inputs []T, opts ...core.Option) ([]T, error)

Batch returns the inputs unchanged.

func (*Passthrough[T]) GetName

func (p *Passthrough[T]) GetName() string

GetName returns the name of this passthrough.

func (*Passthrough[T]) Invoke

func (p *Passthrough[T]) Invoke(ctx context.Context, input T, opts ...core.Option) (T, error)

Invoke returns the input unchanged.

func (*Passthrough[T]) Stream

func (p *Passthrough[T]) Stream(ctx context.Context, input T, opts ...core.Option) (*core.StreamIterator[T], error)

Stream returns a single-chunk stream of the input.

func (*Passthrough[T]) WithName

func (p *Passthrough[T]) WithName(name string) *Passthrough[T]

WithName sets the name for tracing.

type Sequence

type Sequence[I, O any] struct {
	// contains filtered or unexported fields
}

Sequence chains multiple runnables together: the output of each becomes the input of the next. Because Go generics don't support heterogeneous type lists, the intermediate types are erased to `any`. The overall Sequence is typed on the first input and last output.

func Pipe

func Pipe(runnables ...core.Runnable[any, any]) *Sequence[any, any]

Pipe creates a type-erased sequence from an untyped series of steps. Each step must accept and produce `any`. This is the most flexible but least type-safe variant.

func Pipe2

func Pipe2[A, B, C any](
	first core.Runnable[A, B],
	second core.Runnable[B, C],
) *Sequence[A, C]

Pipe2 chains two runnables into a Sequence.

func Pipe3

func Pipe3[A, B, C, D any](
	first core.Runnable[A, B],
	second core.Runnable[B, C],
	third core.Runnable[C, D],
) *Sequence[A, D]

Pipe3 chains three runnables into a Sequence.

func Pipe4

func Pipe4[A, B, C, D, E any](
	first core.Runnable[A, B],
	second core.Runnable[B, C],
	third core.Runnable[C, D],
	fourth core.Runnable[D, E],
) *Sequence[A, E]

Pipe4 chains four runnables into a Sequence.

func (*Sequence[I, O]) Batch

func (s *Sequence[I, O]) Batch(ctx context.Context, inputs []I, opts ...core.Option) ([]O, error)

Batch runs the sequence for multiple inputs.

func (*Sequence[I, O]) GetName

func (s *Sequence[I, O]) GetName() string

GetName returns the name of the sequence.

func (*Sequence[I, O]) Invoke

func (s *Sequence[I, O]) Invoke(ctx context.Context, input I, opts ...core.Option) (O, error)

Invoke runs all steps sequentially, passing each output as the next input.

func (*Sequence[I, O]) Stream

func (s *Sequence[I, O]) Stream(ctx context.Context, input I, opts ...core.Option) (*core.StreamIterator[O], error)

Stream runs all steps sequentially and returns the output of the last step as a stream. Currently this is a simple implementation that invokes all steps and streams the final result.

func (*Sequence[I, O]) WithName

func (s *Sequence[I, O]) WithName(name string) *Sequence[I, O]

WithName sets the sequence name for tracing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL