Documentation
¶
Index ¶
- type Context
- type None
- type Option
- type TaskMayPanic
- type Tuner
- type Worker
- type WorkerPool
- func (p *WorkerPool[T, R]) AddTask(task T)
- func (p *WorkerPool[T, R]) Cap() int32
- func (p *WorkerPool[T, R]) CloseAndWait()
- func (p *WorkerPool[T, R]) GetOriginConcurrency() int32
- func (p *WorkerPool[T, R]) GetResultChan() <-chan R
- func (p *WorkerPool[T, R]) LastTunerTs() time.Time
- func (p *WorkerPool[T, R]) Name() string
- func (p *WorkerPool[T, R]) Release()
- func (p *WorkerPool[T, R]) Running() int32
- func (p *WorkerPool[T, R]) SetResultSender(sender chan R)
- func (p *WorkerPool[T, R]) SetTaskReceiver(recv chan T)
- func (p *WorkerPool[T, R]) Start(ctx *Context)
- func (p *WorkerPool[T, R]) Tune(numWorkers int32, wait bool)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Context ¶
Context is the context used for worker pool
func (*Context) OnError ¶
OnError store the error and cancel the context. If the error is already set, it will not overwrite it.
func (*Context) OperatorErr ¶
OperatorErr returns the error caused by business logic.
type None ¶
type None struct{}
None is a type placeholder for the worker pool that does not have a result receiver.
type Option ¶
type Option[T TaskMayPanic, R any] interface { Apply(pool *WorkerPool[T, R]) }
Option is the config option for WorkerPool.
type TaskMayPanic ¶
type TaskMayPanic interface {
// RecoverArgs returns the argument for pkg/util.Recover function of this task.
// The error returned is which will be passed to upper level, if not provided,
// we will use the default error.
RecoverArgs() (metricsLabel string, funcInfo string, err error)
}
TaskMayPanic is a type to remind the developer that need to handle panic in the task.
type Tuner ¶
Tuner is an interface that provides capacity for tuning the worker pools. It's used to pass worker pool without import cycle caused by generic type.
type Worker ¶
type Worker[T TaskMayPanic, R any] interface { // HandleTask consumes a task(T), either produces a result(R) or return an error. // The result is sent to the result channel by calling `send` function, and the // error returned will be catched, log, and broadcasted it to other operators // by worker pool. // TODO(joechenrh): we can pass the context to HandleTask, so we don't need to // store the context in each worker implementation. HandleTask(task T, send func(R)) error Close() error }
Worker is worker interface.
type WorkerPool ¶
type WorkerPool[T TaskMayPanic, R any] struct { // contains filtered or unexported fields }
WorkerPool is a pool of workers.
func NewWorkerPool ¶
func NewWorkerPool[T TaskMayPanic, R any]( name string, _ util.Component, numWorkers int, createWorker func() Worker[T, R], opts ...Option[T, R], ) *WorkerPool[T, R]
NewWorkerPool creates a new worker pool.
func (*WorkerPool[T, R]) AddTask ¶
func (p *WorkerPool[T, R]) AddTask(task T)
AddTask adds a task to the pool, only used in test.
func (*WorkerPool[T, R]) Cap ¶
func (p *WorkerPool[T, R]) Cap() int32
Cap returns the capacity of the pool.
func (*WorkerPool[T, R]) CloseAndWait ¶
func (p *WorkerPool[T, R]) CloseAndWait()
CloseAndWait manually closes the pool and wait for complete, only used in test
func (*WorkerPool[T, R]) GetOriginConcurrency ¶
func (p *WorkerPool[T, R]) GetOriginConcurrency() int32
GetOriginConcurrency return the concurrency of the pool at the init.
func (*WorkerPool[T, R]) GetResultChan ¶
func (p *WorkerPool[T, R]) GetResultChan() <-chan R
GetResultChan gets the result channel from the pool.
func (*WorkerPool[T, R]) LastTunerTs ¶
func (p *WorkerPool[T, R]) LastTunerTs() time.Time
LastTunerTs returns the last time when the pool was tuned.
func (*WorkerPool[T, R]) Name ¶
func (p *WorkerPool[T, R]) Name() string
Name returns the name of the pool.
func (*WorkerPool[T, R]) Release ¶
func (p *WorkerPool[T, R]) Release()
Release waits the pool to be released. It will wait the input channel to be closed, or the context being cancelled by business error.
func (*WorkerPool[T, R]) Running ¶
func (p *WorkerPool[T, R]) Running() int32
Running returns the number of running workers.
func (*WorkerPool[T, R]) SetResultSender ¶
func (p *WorkerPool[T, R]) SetResultSender(sender chan R)
SetResultSender sets the result sender for the pool.
func (*WorkerPool[T, R]) SetTaskReceiver ¶
func (p *WorkerPool[T, R]) SetTaskReceiver(recv chan T)
SetTaskReceiver sets the task receiver for the pool.
func (*WorkerPool[T, R]) Start ¶
func (p *WorkerPool[T, R]) Start(ctx *Context)
Start starts default count of workers.
func (*WorkerPool[T, R]) Tune ¶
func (p *WorkerPool[T, R]) Tune(numWorkers int32, wait bool)
Tune tunes the pool to the specified number of workers. wait: whether to wait for all workers to close when reducing workers count. this method can only be called after Start.