stream

package
v0.63.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2026 License: MIT Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Data added in v0.63.0

type Data[I, O any] struct {
	// contains filtered or unexported fields
}

func (*Data[I, O]) GetID added in v0.63.0

func (d *Data[I, O]) GetID() string

func (*Data[I, O]) GetInput added in v0.63.0

func (d *Data[I, O]) GetInput() I

func (*Data[I, O]) GetOutput added in v0.63.0

func (d *Data[I, O]) GetOutput() O

func (*Data[I, O]) SetOutput added in v0.63.0

func (d *Data[I, O]) SetOutput(output O)

func (*Data[I, O]) Wait added in v0.63.0

func (d *Data[I, O]) Wait()

type DataCh added in v0.63.0

type DataCh[I, O any] struct {
	// contains filtered or unexported fields
}

func NewDataCh added in v0.63.0

func NewDataCh[I, O any]() *DataCh[I, O]

func (*DataCh[I, O]) Close added in v0.63.0

func (dc *DataCh[I, O]) Close()

func (*DataCh[I, O]) Enq added in v0.63.0

func (dc *DataCh[I, O]) Enq(input I) *Data[I, O]

func (*DataCh[I, O]) GetCh added in v0.63.0

func (dc *DataCh[I, O]) GetCh() chan any

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline 流式管道,any 为当前流经的数据类型

func FanIn

func FanIn(quit_ch chan struct{}, pipelines ...*Pipeline) *Pipeline

FanIn 合并多个 Pipeline 为一个(与 Join 等价,但接收者不参与)

func NewPipeline

func NewPipeline(quit_ch <-chan struct{}, input_ch chan any) *Pipeline

func NewPipelineFromSlice

func NewPipelineFromSlice(quit_ch <-chan struct{}, items []any) *Pipeline

FromSlice 从 slice 创建管道(内部自动推送并关闭)

func (*Pipeline) Broadcast

func (p *Pipeline) Broadcast(n int) []*Pipeline

广播:每个 item copy 给所有下游

func (*Pipeline) Buffer

func (p *Pipeline) Buffer(size int) *Pipeline

Buffer 设置输出通道缓冲区大小

func (*Pipeline) Collect

func (p *Pipeline) Collect() []any

Collect 消费所有输出,返回 slice(阻塞直到管道结束)

func (*Pipeline) FanOut

func (p *Pipeline) FanOut(n int) []*Pipeline

FanOut 争抢模式:每个 item 只给一个下游(负载均衡)

适合:并行处理,N 个 worker 分担工作量

func (*Pipeline) Filter

func (p *Pipeline) Filter(pred func(any) bool) *Pipeline

Filter 过滤,仅保留满足条件的元素

func (*Pipeline) FlatMap

func (p *Pipeline) FlatMap(fn func(any) []any) *Pipeline

FlatMap 一对多展开 any → []any,展平为 Pipeline

func (*Pipeline) ForEach

func (p *Pipeline) ForEach(fn func(any))

ForEach 逐个消费输出,执行 fn(阻塞直到管道结束)

func (*Pipeline) Join

func (p *Pipeline) Join(others ...*Pipeline) *Pipeline

Join 合并多个同类型 Pipeline 为一个 (fan in)

func (*Pipeline) Parallel

func (p *Pipeline) Parallel(n int, fn func(any) any) *Pipeline

Parallel 并行处理:N 个 worker 同时争抢执行 fn,结果自动合并

N 个 goroutine 同时从输入 channel 争抢读取,执行 fn 后写入输出 channel

func (*Pipeline) Run

func (p *Pipeline) Run() <-chan any

Run 启动管道,返回输出通道(用 range 消费)

func (*Pipeline) Skip

func (p *Pipeline) Skip(n int) *Pipeline

Skip 跳过前 n 个元素

func (*Pipeline) Split

func (p *Pipeline) Split(pred func(any) bool) (trueBranch, falseBranch *Pipeline)

Split 按条件分流为两个独立 Pipeline,各自可继续链式构建

trueBranch 走 pred == true 的数据
falseBranch 走 pred == false 的数据

func (*Pipeline) Take

func (p *Pipeline) Take(n int) *Pipeline

Take 仅取前 n 个元素,之后自动终止

func (*Pipeline) Tap

func (p *Pipeline) Tap(fn func(any)) *Pipeline

Tap 副作用(如日志、metrics),不改变数据,原样传递

func (*Pipeline) Transform

func (p *Pipeline) Transform(fn func(any) any) *Pipeline

Transform 类型转换 any → any,返回 Pipeline

From[int](ch).
    Transform(func(x int) string { return strconv.Itoa(x) }).
    Transform(func(s string) int { n, _ := strconv.Atoi(s); return n })

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL