pipeline

package
v0.46.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 3 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddOnPipe

func AddOnPipe[X, Y any](q <-chan struct{}, f func(X) Y, in <-chan X) chan Y

AddOnPipe 通用管道节点:接收X类型输入,通过函数f转换为Y类型,输出到Y通道;支持通过quit通道终止 参数:

q: 终止信号通道(关闭时组件退出)
f: 数据转换函数(业务逻辑由外部注入,组件不关心具体实现)
in: 输入通道(接收X类型数据)

返回:

输出通道(发送Y类型数据)

func FanIn

func FanIn[X any](q <-chan struct{}, inputs ...<-chan X) chan X

FanIn 多通道合并:将多个<-chan X类型的输入通道,合并到一个chan X输出通道

func FanOut

func FanOut[X any](q <-chan struct{}, in <-chan X, num int) []chan X

FanOut 数据分发:将单一输入通道的数据均匀分配到多个输出通道 参数:

q: 终止信号通道(关闭时组件退出)
in: 输入通道(单一数据源)
num: 输出通道数量

返回:

输出通道切片(每个通道都会收到输入的完整数据副本)

func Take

func Take[X any](q <-chan struct{}, n int, in <-chan X) chan X

Take 从输入通道in中截取前n个数据,输出到out通道;截取完成后关闭out 参数:

q: 终止信号通道(关闭时提前退出)
n: 需截取的数据量
in: 输入通道(承接FanOut/FanIn的输出)

Types

type Broadcast

type Broadcast[X any] struct {
	// contains filtered or unexported fields
}

Broadcast 广播组件:将输入通道in的消息广播到所有订阅的输出通道

func NewBroadcast

func NewBroadcast[X any](q <-chan struct{}, in <-chan X) *Broadcast[X]

NewBroadcast 初始化广播组件

func (*Broadcast[X]) Run

func (b *Broadcast[X]) Run()

Run 核心广播逻辑:从输入通道读消息,发送到所有订阅者

func (*Broadcast[X]) Subscribe

func (b *Broadcast[X]) Subscribe() <-chan X

Subscribe 订阅广播(返回一个接收广播消息的通道)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL