fanout

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2020 License: MIT Imports: 8 Imported by: 0

README

pkg/sync/pipeline/fanout

功能:

  • 支持定义Worker 数量的goroutine,进行消费
  • 内部支持的元数据传递(pkg/net/metadata)

示例:

//名称为cache 执行线程为1 buffer长度为1024
cache := fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024))
cache.Do(c, func(c context.Context) { SomeFunc(c, args...) })
cache.Close()

Documentation

Overview

Example
package main

import "context"

// addCache 加缓存的例子
func addCache(c context.Context, id, value int) {
	// some thing...
}

func main() {
	// 这里只是举个例子 真正使用的时候 应该用bm/rpc 传过来的context
	var c = context.Background()
	// 新建一个fanout 对象 名称为cache 名称主要用来上报监控和打日志使用 最好不要重复
	// (可选参数) worker数量为1 表示后台只有1个线程在工作
	// (可选参数) buffer 为1024 表示缓存chan长度为1024 如果chan慢了 再调用Do方法就会报错 设定长度主要为了防止OOM
	cache := New("cache", Worker(1), Buffer(1024))
	// 需要异步执行的方法
	// 这里传进来的c里面的meta信息会被复制 超时会忽略 addCache拿到的context已经没有超时信息了
	cache.Do(c, func(c context.Context) { addCache(c, 0, 0) })
	// 程序结束的时候关闭fanout 会等待后台线程完成后返回
	cache.Close()
}

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrFull chan full.
	ErrFull = errors.New("fanout: chan full")
)

Functions

This section is empty.

Types

type Fanout

type Fanout struct {
	// contains filtered or unexported fields
}

Fanout async consume data from chan.

func New

func New(name string, opts ...Option) *Fanout

New new a fanout struct.

func (*Fanout) Close

func (c *Fanout) Close() error

Close close fanout

func (*Fanout) Do

func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) (err error)

Do save a callback func.

type Option

type Option func(*options)

Option fanout option

func Buffer

func Buffer(n int) Option

Buffer specifies the buffer of fanout

func Worker

func Worker(n int) Option

Worker specifies the worker of fanout

Source Files

  • fanout.go
  • metrics.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL