channel

package
v0.1.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2025 License: MIT Imports: 2 Imported by: 0

README

channel/unlimited:无界通道(Unbounded Channel)

简单、泛型、安全的“无界缓冲”通道实现。通过内部 goroutine + 链式队列将生产者与消费者解耦:写入侧永远向 In 写入,读取侧从 Out 读取。内部使用 link.DLink[T] 作为队列,保证 FIFO 顺序,支持在关闭后排空并优雅关闭 Out

特性

  • FIFO 顺序保证:读出顺序与写入顺序一致。
  • 无界缓冲:不需要预估容量,突发流量也不会阻塞写入(但请注意内存占用风险)。
  • 优雅关闭Close() 只会关闭 In,待内部队列数据全部发送完毕后自动关闭 Out;多次调用安全。
  • 简洁 APINew/Len/Cap/Close + In/Out 两个方向清晰易读。

安装与导入

import "github.com/Rehtt/Kit/channel"

快速开始

package main

import (
    "fmt"
    "github.com/Rehtt/Kit/channel"
)

func main() {
    c := channel.New[int]()
    defer c.Close()

    // 生产者
    go func() {
        for i := 0; i < 10; i++ {
            c.In <- i
        }
        // 关闭 In,触发排空后自动关闭 Out
        c.Close()
    }()

    // 消费者(单消费可严格保证顺序)
    for v := range c.Out {
        fmt.Println(v)
    }
}

API

// New 创建无界通道。
func New[T any]() *Chan[T]

type Chan[T any] struct {
    In  chan<- T // 写入端(只写)
    Out <-chan T // 读取端(只读)
}

// 当前排队中的元素个数(可能滞后于最新写入片刻,取决于内部 goroutine 调度)。
func (c *Chan[T]) Len() int64

// 历史上曾达到的最大容量上界(由内部队列报告,便于观测峰值)。
func (c *Chan[T]) Cap() int64

// 关闭写入端 In,并在排空后自动关闭 Out;可安全重复调用。
func (c *Chan[T]) Close()

行为与注意事项

  • 多生产者:可同时向同一个 In 并发写入,遵循 Go 原生 chan 的并发安全语义。
  • 多消费者:可以并发从 Out 读,但数据会在消费者之间分摊;若需要广播/一条消息被多个消费者同时处理,请使用上层的广播组件(例如本仓库中的 util/chan_broadcaster)。
  • 内存占用:该通道为“无界”,若消费者处理不及时,队列会增长并占用更多内存。建议:
    • 监控 Len()/Cap() 指标;
    • 对上游做背压/限速;
    • 在关闭时确保消费者尽快排空。
  • 关闭语义Close() 后不再接受写入;内部会继续将队列中剩余数据发送到 Out,随后自动关闭 Out。这避免了消费者端的“读到一半通道即被关闭”的问题。

make(chan T, n) 的对比

  • 优点
    • 无需容量估算;
    • 突发写入不会因缓冲满而阻塞;
    • 关闭语义更友好(排空再关 Out)。
  • 取舍
    • 无上限意味着需要自行约束上游速率或保障消费能力,以免内存增长。

基准测试

项目内提供了两个基准:

  • BenchmarkSerial:单写单读的吞吐性能;
  • BenchmarkParallel:并行写入,观测内部队列与调度的争用情况。

运行:

go test -run ^$ -bench . -benchtime=2s ./channel

适用场景

  • 削峰填谷:应对偶发的写入洪峰;
  • 解耦生产/消费速率:消费者较慢但不希望阻塞生产者;
  • 简化容量管理:无需纠结缓冲区大小选择。

许可证

遵循仓库根目录的 LICENSE

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Chan

type Chan[T any] struct {
	In  chan<- T
	Out <-chan T
	// contains filtered or unexported fields
}

func New

func New[T any]() (c *Chan[T])

func (*Chan[T]) Cap

func (c *Chan[T]) Cap() int64

func (*Chan[T]) Close

func (c *Chan[T]) Close()

func (*Chan[T]) Len

func (c *Chan[T]) Len() int64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL