pipeline

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package pipeline provides a flexible framework for composing and executing sequences of operations (Steps) for LLM workflows like RAG and data processing.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBreak  = errors.New("pipeline loop break")
	ErrReturn = errors.New("pipeline execution return")
)

定义控制流哨兵错误

Functions

This section is empty.

Types

type BreakStep added in v0.1.8

type BreakStep[T any] struct {
	StepName string
}

BreakStep 跳出当前 Loop。

func (*BreakStep[T]) Execute added in v0.1.8

func (s *BreakStep[T]) Execute(ctx context.Context, state T) error

func (*BreakStep[T]) Name added in v0.1.8

func (s *BreakStep[T]) Name() string

type Hook

type Hook[T any] interface {
	// OnStepStart is called before a step executes.
	OnStepStart(ctx context.Context, step Step[T], state T)

	// OnStepError is called if a step returns an error.
	OnStepError(ctx context.Context, step Step[T], state T, err error)

	// OnStepComplete is called after a step successfully executes.
	OnStepComplete(ctx context.Context, step Step[T], state T)
}

Hook allows observing pipeline execution.

type IfStep added in v0.1.8

type IfStep[T any] struct {
	StepName  string
	Condition func(ctx context.Context, state T) bool
	Then      Step[T]
	Else      Step[T]
}

IfStep 提供管线中的条件分支逻辑。

func NewIf added in v0.1.8

func NewIf[T any](name string, cond func(context.Context, T) bool, thenStep, elseStep Step[T]) *IfStep[T]

func (*IfStep[T]) Execute added in v0.1.8

func (s *IfStep[T]) Execute(ctx context.Context, state T) error

func (*IfStep[T]) Name added in v0.1.8

func (s *IfStep[T]) Name() string

type LoopStep added in v0.1.8

type LoopStep[T any] struct {
	StepName  string
	Condition func(context.Context, T) bool
	Body      Step[T]
	MaxLoops  int
}

LoopStep 提供管线中的重复执行逻辑,支持 ErrBreak。

func NewLoop added in v0.1.8

func NewLoop[T any](name string, cond func(context.Context, T) bool, body Step[T], maxLoops int) *LoopStep[T]

func (*LoopStep[T]) Execute added in v0.1.8

func (s *LoopStep[T]) Execute(ctx context.Context, state T) error

func (*LoopStep[T]) Name added in v0.1.8

func (s *LoopStep[T]) Name() string

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline manages a sequence of steps.

func New

func New[T any]() *Pipeline[T]

New creates a new, empty pipeline.

func (*Pipeline[T]) AddHook

func (p *Pipeline[T]) AddHook(hook Hook[T]) *Pipeline[T]

AddHook appends an observer hook to the pipeline.

func (*Pipeline[T]) AddStep

func (p *Pipeline[T]) AddStep(step Step[T]) *Pipeline[T]

AddStep appends a step to the pipeline.

func (*Pipeline[T]) AddSteps added in v0.1.3

func (p *Pipeline[T]) AddSteps(steps ...Step[T]) *Pipeline[T]

func (*Pipeline[T]) Execute

func (p *Pipeline[T]) Execute(ctx context.Context, state T) error

Execute runs all steps sequentially. If any step fails or the context is canceled, execution stops and the error is returned.

type ReturnStep added in v0.1.8

type ReturnStep[T any] struct {
	StepName string
}

ReturnStep 终止整个 Pipeline 执行。

func (*ReturnStep[T]) Execute added in v0.1.8

func (s *ReturnStep[T]) Execute(ctx context.Context, state T) error

func (*ReturnStep[T]) Name added in v0.1.8

func (s *ReturnStep[T]) Name() string

type State

type State struct {
	// contains filtered or unexported fields
}

State is a thread-safe container for data passed between pipeline steps.

func NewState

func NewState() *State

NewState creates a new, empty pipeline state.

func (*State) Clone

func (s *State) Clone() *State

Clone creates a shallow copy of the current state.

func (*State) Delete

func (s *State) Delete(key string)

Delete removes a key from the state.

func (*State) Get

func (s *State) Get(key string) (interface{}, bool)

Get retrieves a value from the state. Returns nil and false if the key does not exist.

func (*State) GetString

func (s *State) GetString(key string) string

GetString is a helper to retrieve a string value. Returns empty string if missing or wrong type.

func (*State) Set

func (s *State) Set(key string, value interface{})

Set stores a value in the state.

type Step

type Step[T any] interface {
	// Name returns the name of the step for logging and debugging.
	Name() string

	// Execute performs the step's operation, reading from and writing to the State.
	Execute(ctx context.Context, state T) error
}

Step defines an individual operation in the pipeline.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL