pipeline

package
v1.4.4-alpha1202-loadi... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2025 License: AGPL-3.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Pipe

type Pipe[T, U any] struct {
	// contains filtered or unexported fields
}

func NewPipe

func NewPipe[T, U any](
	ctx context.Context,
	initBufSize int,
	handler func(item T) (U, error),
	concurrency ...int,
) *Pipe[T, U]

NewPipe 创建一个基本的 Pipe,兼容不需要 store 的老代码

func NewPipeWithInit

func NewPipeWithInit[T, U any](
	ctx context.Context,
	initBufSize int,
	handler func(item T, store *utils.SafeMap[any]) (U, error),
	initWorker func() *utils.SafeMap[any],
	concurrency ...int,
) *Pipe[T, U]

NewPipeWithInit 创建一个带有 worker 初始化函数的 Pipe(内部使用) initWorker 会在每个 worker 协程启动时执行一次,用于初始化协程本地存储

func NewPipeWithStore

func NewPipeWithStore[T, U any](
	ctx context.Context,
	initBufSize int,
	handler func(item T, store *utils.SafeMap[any]) (U, error),
	initWorker func() *utils.SafeMap[any],
	concurrency ...int,
) *Pipe[T, U]

NewPipeWithStore 创建一个带有 worker 初始化函数的 Pipe initWorker 会在每个 worker 协程启动时执行一次,用于初始化协程本地存储 handler 的第二个参数会接收到 initWorker 返回的 store

func NewSimplePipe

func NewSimplePipe[T, U any](ctx context.Context, in <-chan T, handler func(item T) (U, error)) *Pipe[T, U]

func (*Pipe[T, U]) Close

func (p *Pipe[T, U]) Close()

func (*Pipe[T, U]) Error

func (p *Pipe[T, U]) Error() error

func (*Pipe[T, U]) Feed

func (p *Pipe[T, U]) Feed(item T)

func (*Pipe[T, U]) FeedChannel

func (p *Pipe[T, U]) FeedChannel(ch <-chan T)

func (*Pipe[T, U]) FeedSlice

func (p *Pipe[T, U]) FeedSlice(items []T)

func (*Pipe[T, U]) Out

func (p *Pipe[T, U]) Out() <-chan U

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL