workerpool

package
v1.1.0-beta.0...-69a1b2c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

Context is the context used for worker pool

func NewContext

func NewContext(
	ctx context.Context,
) *Context

NewContext creates a new Context

func (*Context) Cancel

func (ctx *Context) Cancel()

Cancel cancels the context of the operator.

func (*Context) OnError

func (ctx *Context) OnError(err error)

OnError store the error and cancel the context. If the error is already set, it will not overwrite it.

func (*Context) OperatorErr

func (ctx *Context) OperatorErr() error

OperatorErr returns the error caused by business logic.

type None

type None struct{}

None is a type placeholder for the worker pool that does not have a result receiver.

type Option

type Option[T TaskMayPanic, R any] interface {
	Apply(pool *WorkerPool[T, R])
}

Option is the config option for WorkerPool.

type TaskMayPanic

type TaskMayPanic interface {
	// RecoverArgs returns the argument for pkg/util.Recover function of this task.
	// The error returned is which will be passed to upper level, if not provided,
	// we will use the default error.
	RecoverArgs() (metricsLabel string, funcInfo string, err error)
}

TaskMayPanic is a type to remind the developer that need to handle panic in the task.

type Tuner

type Tuner interface {
	Tune(numWorkers int32, wait bool)
}

Tuner is an interface that provides capacity for tuning the worker pools. It's used to pass worker pool without import cycle caused by generic type.

type Worker

type Worker[T TaskMayPanic, R any] interface {
	// HandleTask consumes a task(T), either produces a result(R) or return an error.
	// The result is sent to the result channel by calling `send` function, and the
	// error returned will be catched, log, and broadcasted it to other operators
	// by worker pool.
	// TODO(joechenrh): we can pass the context to HandleTask, so we don't need to
	// store the context in each worker implementation.
	HandleTask(task T, send func(R)) error
	Close() error
}

Worker is worker interface.

type WorkerPool

type WorkerPool[T TaskMayPanic, R any] struct {
	// contains filtered or unexported fields
}

WorkerPool is a pool of workers.

func NewWorkerPool

func NewWorkerPool[T TaskMayPanic, R any](
	name string,
	_ util.Component,
	numWorkers int,
	createWorker func() Worker[T, R],
	opts ...Option[T, R],
) *WorkerPool[T, R]

NewWorkerPool creates a new worker pool.

func (*WorkerPool[T, R]) AddTask

func (p *WorkerPool[T, R]) AddTask(task T)

AddTask adds a task to the pool, only used in test.

func (*WorkerPool[T, R]) Cap

func (p *WorkerPool[T, R]) Cap() int32

Cap returns the capacity of the pool.

func (*WorkerPool[T, R]) CloseAndWait

func (p *WorkerPool[T, R]) CloseAndWait()

CloseAndWait manually closes the pool and wait for complete, only used in test

func (*WorkerPool[T, R]) GetOriginConcurrency

func (p *WorkerPool[T, R]) GetOriginConcurrency() int32

GetOriginConcurrency return the concurrency of the pool at the init.

func (*WorkerPool[T, R]) GetResultChan

func (p *WorkerPool[T, R]) GetResultChan() <-chan R

GetResultChan gets the result channel from the pool.

func (*WorkerPool[T, R]) LastTunerTs

func (p *WorkerPool[T, R]) LastTunerTs() time.Time

LastTunerTs returns the last time when the pool was tuned.

func (*WorkerPool[T, R]) Name

func (p *WorkerPool[T, R]) Name() string

Name returns the name of the pool.

func (*WorkerPool[T, R]) Release

func (p *WorkerPool[T, R]) Release()

Release waits the pool to be released. It will wait the input channel to be closed, or the context being cancelled by business error.

func (*WorkerPool[T, R]) Running

func (p *WorkerPool[T, R]) Running() int32

Running returns the number of running workers.

func (*WorkerPool[T, R]) SetResultSender

func (p *WorkerPool[T, R]) SetResultSender(sender chan R)

SetResultSender sets the result sender for the pool.

func (*WorkerPool[T, R]) SetTaskReceiver

func (p *WorkerPool[T, R]) SetTaskReceiver(recv chan T)

SetTaskReceiver sets the task receiver for the pool.

func (*WorkerPool[T, R]) Start

func (p *WorkerPool[T, R]) Start(ctx *Context)

Start starts default count of workers.

func (*WorkerPool[T, R]) Tune

func (p *WorkerPool[T, R]) Tune(numWorkers int32, wait bool)

Tune tunes the pool to the specified number of workers. wait: whether to wait for all workers to close when reducing workers count. this method can only be called after Start.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL