pool

package
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2025 License: Apache-2.0 Imports: 8 Imported by: 1

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithQueueBacklogRate

func WithQueueBacklogRate(rate float64) option.Option[OnDemandBlockTaskPool]

Types

type OnDemandBlockTaskPool

type OnDemandBlockTaskPool struct {
	// contains filtered or unexported fields
}

OnDemandBlockTaskPool 按需创建goroutine的并发阻塞的任务池

func NewOnDemandBlockTaskPool

func NewOnDemandBlockTaskPool(initGo int, queueSize int, opts ...option.Option[OnDemandBlockTaskPool]) (*OnDemandBlockTaskPool, error)

NewOnDemandBlockTaskPool 创建一个新的 OnDemandBlockTaskPool initGo 是初始协程数 queueSize 是队列大小,即最多有多少个任务在等待调度 使用相应的Option选项可以动态扩展协程数

Example
p, _ := NewOnDemandBlockTaskPool(10, 100)
_ = p.Start()
// wg 只是用来确保任务执行的,你在实际使用过程中是不需要的
var wg sync.WaitGroup
wg.Add(1)
_ = p.Submit(context.Background(), TaskFunc(func(ctx context.Context) error {
	fmt.Println("hello, world")
	wg.Done()
	return nil
}))
wg.Wait()
Output:

hello, world

func (*OnDemandBlockTaskPool) Shutdown

func (b *OnDemandBlockTaskPool) Shutdown() (<-chan struct{}, error)

Shutdown 将会拒绝提交新的任务,但是会继续执行已提交任务 当执行完毕后,会往返回的 chan 中丢入信号 Shutdown 会负责关闭返回的 chan Shutdown 无法中断正在执行的任务

func (*OnDemandBlockTaskPool) ShutdownNow

func (b *OnDemandBlockTaskPool) ShutdownNow() ([]Task, error)

ShutdownNow 立刻关闭任务池,并且返回所有剩余未执行的任务(不包含正在执行的任务)

func (*OnDemandBlockTaskPool) Start

func (b *OnDemandBlockTaskPool) Start() error

Start 开始调度任务执行 Start 之后,调用者可以继续使用 Submit 提交任务

func (*OnDemandBlockTaskPool) States

func (b *OnDemandBlockTaskPool) States(ctx context.Context, interval time.Duration) (<-chan State, error)
Example
p, _ := NewOnDemandBlockTaskPool(10, 100)
var wg sync.WaitGroup
wg.Add(1)
_ = p.Submit(context.Background(), TaskFunc(func(ctx context.Context) error {
	wg.Done()
	return nil
}))
_ = p.Start()
ch, err := p.States(context.Background(), time.Second*10)
if err == nil {
	fmt.Println("get ch")
}
state := <-ch
fmt.Println(state.PoolState)
fmt.Println(state.RunningTasksCnt)
fmt.Println(state.WaitingTasksCnt)
fmt.Println(state.GoCnt)
fmt.Println(state.QueueSize)
wg.Wait()
Output:

get ch
2
0
0
10
100

func (*OnDemandBlockTaskPool) Submit

func (b *OnDemandBlockTaskPool) Submit(ctx context.Context, task Task) error

Submit 提交一个任务 如果此时队列已满,那么将会阻塞调用者。 如果因为 ctx 的原因返回,那么将会返回 ctx.Err() 在调用 Start 前后都可以调用 Submit

type State

type State struct {
	PoolState       int32
	GoCnt           int32
	WaitingTasksCnt int
	QueueSize       int
	RunningTasksCnt int32
	Timestamp       int64
}

type Task

type Task interface {
	// Run 执行任务
	// 如果 ctx 设置了超时时间,那么实现者需要自己决定是否进行超时控制
	Run(ctx context.Context) error
}

Task 代表一个任务

type TaskFunc

type TaskFunc func(ctx context.Context) error

TaskFunc 一个可执行的任务

func (TaskFunc) Run

func (t TaskFunc) Run(ctx context.Context) error

Run 执行任务 超时控制取决于衍生出 TaskFunc 的方法

type TaskPool

type TaskPool interface {
	// Submit 执行一个任务
	// 如果任务池提供了阻塞的功能,那么如果在 ctx 过期都没有提交成功,那么应该返回错误
	// 调用 Start 之后能否继续提交任务,则取决于具体的实现
	// 调用 Shutdown 或者 ShutdownNow 之后提交任务都会返回错误
	Submit(ctx context.Context, task Task) error

	// Start 开始调度任务执行。在调用 Start 之前,所有的任务都不会被调度执行。
	// Start 之后,能否继续调用 Submit 提交任务,取决于具体的实现
	Start() error

	// Shutdown 关闭任务池。如果此时尚未调用 Start 方法,那么将会立刻返回。
	// 任务池将会停止接收新的任务,但是会继续执行剩下的任务,
	// 在所有任务执行完毕之后,用户可以从返回的 chan 中得到通知
	// 任务池在发出通知之后会关闭 chan struct{}
	Shutdown() (<-chan struct{}, error)

	// ShutdownNow 立刻关闭线程池
	// 任务池能否中断当前正在执行的任务,取决于 TaskPool 的具体实现,以及 Task 的具体实现
	// 该方法会返回所有剩下的任务,剩下的任务是否包含正在执行的任务,也取决于具体的实现
	ShutdownNow() ([]Task, error)

	// States 暴露 TaskPool 生命周期内的运行状态
	// ctx 是让用户来控制什么时候退出采样。那么最基本的两个退出机制:一个是 ctx 被 cancel 了或者超时了,一个是TaskPool 被关闭了
	// error 仅仅表示创建 chan state 是否成功
	// interval 表示获取TaskPool运行期间内部状态的周期/时间间隔
	States(ctx context.Context, interval time.Duration) (<-chan State, error)
}

TaskPool 任务池

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL