Documentation
¶
Index ¶
- type Pipe
- func NewPipe[T, U any](ctx context.Context, initBufSize int, handler func(item T) (U, error), ...) *Pipe[T, U]
- func NewPipeWithInit[T, U any](ctx context.Context, initBufSize int, ...) *Pipe[T, U]
- func NewPipeWithStore[T, U any](ctx context.Context, initBufSize int, ...) *Pipe[T, U]
- func NewSimplePipe[T, U any](ctx context.Context, in <-chan T, handler func(item T) (U, error)) *Pipe[T, U]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Pipe ¶
type Pipe[T, U any] struct { // contains filtered or unexported fields }
func NewPipe ¶
func NewPipe[T, U any]( ctx context.Context, initBufSize int, handler func(item T) (U, error), concurrency ...int, ) *Pipe[T, U]
NewPipe 创建一个基本的 Pipe,兼容不需要 store 的老代码
func NewPipeWithInit ¶
func NewPipeWithInit[T, U any]( ctx context.Context, initBufSize int, handler func(item T, store *utils.SafeMap[any]) (U, error), initWorker func() *utils.SafeMap[any], concurrency ...int, ) *Pipe[T, U]
NewPipeWithInit 创建一个带有 worker 初始化函数的 Pipe(内部使用) initWorker 会在每个 worker 协程启动时执行一次,用于初始化协程本地存储
func NewPipeWithStore ¶
func NewPipeWithStore[T, U any]( ctx context.Context, initBufSize int, handler func(item T, store *utils.SafeMap[any]) (U, error), initWorker func() *utils.SafeMap[any], concurrency ...int, ) *Pipe[T, U]
NewPipeWithStore 创建一个带有 worker 初始化函数的 Pipe initWorker 会在每个 worker 协程启动时执行一次,用于初始化协程本地存储 handler 的第二个参数会接收到 initWorker 返回的 store
func NewSimplePipe ¶
func (*Pipe[T, U]) FeedChannel ¶
func (p *Pipe[T, U]) FeedChannel(ch <-chan T)
Click to show internal directories.
Click to hide internal directories.