pipeline

package
v0.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBreak  = errors.New("pipeline loop break")
	ErrReturn = errors.New("pipeline execution return")
)

定义控制流哨兵错误

Functions

This section is empty.

Types

type BreakStep

type BreakStep[T any] struct {
	StepName string
}

BreakStep 跳出当前 Loop。

func (*BreakStep[T]) Execute

func (s *BreakStep[T]) Execute(ctx context.Context, state T) error

func (*BreakStep[T]) Name

func (s *BreakStep[T]) Name() string

type Hook

type Hook[T any] interface {
	// OnStepStart is called before a step executes.
	OnStepStart(ctx context.Context, step Step[T], state T)

	// OnStepError is called if a step returns an error.
	OnStepError(ctx context.Context, step Step[T], state T, err error)

	// OnStepComplete is called after a step successfully executes.
	OnStepComplete(ctx context.Context, step Step[T], state T)
}

Hook allows observing pipeline execution.

type IfStep

type IfStep[T any] struct {
	StepName  string
	Condition func(ctx context.Context, state T) bool
	Then      Step[T]
	Else      Step[T]
}

IfStep 提供管线中的条件分支逻辑。

func NewIf

func NewIf[T any](name string, cond func(context.Context, T) bool, thenStep, elseStep Step[T]) *IfStep[T]

func (*IfStep[T]) Execute

func (s *IfStep[T]) Execute(ctx context.Context, state T) error

func (*IfStep[T]) Name

func (s *IfStep[T]) Name() string

type LoopStep

type LoopStep[T any] struct {
	StepName  string
	Condition func(context.Context, T) bool
	Body      Step[T]
	MaxLoops  int
}

LoopStep 提供管线中的重复执行逻辑,支持 ErrBreak。

func NewLoop

func NewLoop[T any](name string, cond func(context.Context, T) bool, body Step[T], maxLoops int) *LoopStep[T]

func (*LoopStep[T]) Execute

func (s *LoopStep[T]) Execute(ctx context.Context, state T) error

func (*LoopStep[T]) Name

func (s *LoopStep[T]) Name() string

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline manages a sequence of steps that execute in order. It provides hooks for observing execution and supports graceful early exit.

Pipeline is generic over the state type, allowing type-safe data sharing between steps.

Example:

pipeline := pipeline.New[pipeline.State]().
    AddStep(step1).
    AddStep(step2).
    AddHook(observer)

if err := pipeline.Execute(ctx, state); err != nil {
    log.Fatal(err)
}

func New

func New[T any]() *Pipeline[T]

New creates a new, empty Pipeline ready to have steps added.

func (*Pipeline[T]) AddHook

func (p *Pipeline[T]) AddHook(hook Hook[T]) *Pipeline[T]

AddHook appends an observer hook to the pipeline and returns the pipeline for method chaining. Hooks are called at various points during execution to allow monitoring or logging.

Parameters:

  • hook: The hook to add

Returns the pipeline for chaining

func (*Pipeline[T]) AddStep

func (p *Pipeline[T]) AddStep(step Step[T]) *Pipeline[T]

AddStep appends a step to the pipeline and returns the pipeline for method chaining.

Parameters:

  • step: The step to add

Returns the pipeline for chaining

func (*Pipeline[T]) AddSteps

func (p *Pipeline[T]) AddSteps(steps ...Step[T]) *Pipeline[T]

AddSteps appends multiple steps to the pipeline and returns the pipeline for method chaining.

Parameters:

  • steps: The steps to add

Returns the pipeline for chaining

func (*Pipeline[T]) Execute

func (p *Pipeline[T]) Execute(ctx context.Context, state T) error

Execute runs all steps sequentially. If any step fails, execution stops and the error is returned. If the context is canceled, execution stops immediately with the context error.

Hooks are called before and after each step execution to allow monitoring.

Parameters:

  • ctx: Context for cancellation
  • state: The state object passed to each step

Returns an error if any step fails or context is canceled

type ReturnStep

type ReturnStep[T any] struct {
	StepName string
}

ReturnStep 终止整个 Pipeline 执行。

func (*ReturnStep[T]) Execute

func (s *ReturnStep[T]) Execute(ctx context.Context, state T) error

func (*ReturnStep[T]) Name

func (s *ReturnStep[T]) Name() string

type State

type State struct {
	// contains filtered or unexported fields
}

State provides a thread-safe key-value store for sharing data between pipeline steps. It supports type-safe retrieval through generic getters.

State is safe for concurrent use from multiple goroutines. All read and write operations are protected by a RWMutex.

Example:

state := pipeline.NewState()
state.Set("user_id", 12345)
state.Set("name", "Alice")

// Type-safe retrieval
if id, ok := state.GetInt("user_id"); ok {
    fmt.Println(id) // 12345
}

func NewState

func NewState() *State

NewState creates a new, empty State for use in a pipeline execution.

func (*State) Clone

func (s *State) Clone() *State

Clone creates a shallow copy of the state. The returned State has a copy of the data map but the values themselves are shared references.

Returns a new State with the same data

func (*State) Delete

func (s *State) Delete(key string)

Delete removes a key-value pair from the state.

Parameters:

  • key: The identifier to remove

func (*State) Get

func (s *State) Get(key string) (interface{}, bool)

Get retrieves a raw value by key. Returns (value, true) if found, or (nil, false) if the key doesn't exist.

For type-safe retrieval, use one of the typed getters: GetString, GetInt, GetFloat, GetBool, GetStringSlice, GetMap.

Parameters:

  • key: The identifier to look up

Returns the value and whether it was found

func (*State) GetBool

func (s *State) GetBool(key string) (bool, bool)

GetBool retrieves a bool value by key. Returns (value, true) if the key exists and holds a bool, or (false, false) otherwise.

Parameters:

  • key: The identifier to look up

Returns the bool value and whether retrieval succeeded

func (*State) GetFloat

func (s *State) GetFloat(key string) (float64, bool)

GetFloat retrieves a float64 value by key. Returns (value, true) if the key exists and holds a float64, or (0.0, false) otherwise.

Parameters:

  • key: The identifier to look up

Returns the float64 value and whether retrieval succeeded

func (*State) GetInt

func (s *State) GetInt(key string) (int, bool)

GetInt retrieves an int value by key. Returns (value, true) if the key exists and holds an int, or (0, false) otherwise.

Parameters:

  • key: The identifier to look up

Returns the int value and whether retrieval succeeded

func (*State) GetMap

func (s *State) GetMap(key string) (map[string]interface{}, bool)

GetMap retrieves a map[string]interface{} value by key. Returns (value, true) if the key exists and holds a map, or (nil, false) otherwise.

Parameters:

  • key: The identifier to look up

Returns the map value and whether retrieval succeeded

func (*State) GetString

func (s *State) GetString(key string) (string, bool)

GetString retrieves a string value by key. Returns (value, true) if the key exists and holds a string, or ("", false) otherwise.

Parameters:

  • key: The identifier to look up

Returns the string value and whether retrieval succeeded

func (*State) GetStringSlice

func (s *State) GetStringSlice(key string) ([]string, bool)

GetStringSlice retrieves a []string value by key. Returns (value, true) if the key exists and holds a []string, or (nil, false) otherwise.

Parameters:

  • key: The identifier to look up

Returns the []string value and whether retrieval succeeded

func (*State) Set

func (s *State) Set(key string, value interface{})

Set stores a value in the state with the given key. Any type can be stored; use the type-safe getters to retrieve values.

Parameters:

  • key: The identifier for this value
  • value: The value to store (can be any type)

type Step

type Step[T any] interface {
	// Name returns the name of the step for logging and debugging.
	Name() string

	// Execute performs the step's operation, reading from and writing to the State.
	Execute(ctx context.Context, state T) error
}

Step defines an individual operation in the pipeline.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL