worker

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2025 License: GPL-3.0 Imports: 10 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultCounterCheck = 10 * time.Millisecond
)

Variables

View Source
var ErrFunctionTimeout = errors.New("function timeout")
View Source
var ErrNilFunction = errors.New("function to execute is nil")
View Source
var ErrPanic = errors.New("error panic")

Functions

func FanInOut

func FanInOut(
	ctx context.Context,
	goroutines int,
	buffer int,
	in chan interface{},
	fn func(context.Context, interface{}),
)

func GetGoroutinesCount

func GetGoroutinesCount(variables, maxGoroutines int) int

func GetMessageOrTimeout

func GetMessageOrTimeout(timeout time.Duration, msg chan []byte, def []byte) []byte

GetMessageOrTimeout get message or timeout

func MergeChannels

func MergeChannels(input ...<-chan interface{}) <-chan interface{}

MergeChannels merge multiple channels into one

func RecoverWrapper

func RecoverWrapper(ctx context.Context, next ExecuteFunc) (err error)

func RunAsyncMultipleWorkers

func RunAsyncMultipleWorkers(
	ctx context.Context,
	goroutines int,
	buffer int,
	fn func(context.Context, chan<- interface{}),
) <-chan interface{}

RunAsyncMultipleWorkers run async multiple workers and return channel to control them

func RunSyncMultipleWorkers

func RunSyncMultipleWorkers(ctx context.Context, goroutines int, fn func(ctx context.Context))

RunSyncMultipleWorkers run sync multiple workers

Types

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

func NewAggregator

func NewAggregator(ctx context.Context, count int, ticker time.Duration, processor func([]any) error) *Aggregator

func (*Aggregator) Add

func (w *Aggregator) Add(req any)

func (*Aggregator) Close

func (w *Aggregator) Close()

func (*Aggregator) Count

func (w *Aggregator) Count() int

func (*Aggregator) Flusher

func (w *Aggregator) Flusher() error

func (*Aggregator) Process

func (w *Aggregator) Process() error

type ExecuteFunc

type ExecuteFunc func(ctx context.Context) error

func FunctionWithTimeout

func FunctionWithTimeout(
	timeout time.Duration,
	fn ExecuteFunc,
) ExecuteFunc

FunctionWithTimeout function which return result after timeout (be careful not release resources without using context)

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool of workers

func NewPool

func NewPool(
	ctx context.Context,
	wrappers ...func(ctx context.Context, next ExecuteFunc) error,
) *Pool

NewPool return new pool

func (*Pool) Close

func (p *Pool) Close()

func (*Pool) Execute

func (p *Pool) Execute(fn ExecuteFunc)

Execute function in gorutine

func (*Pool) Wait

func (p *Pool) Wait() error

Wait wait for all functions

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(ctx context.Context, fn func(ctx context.Context) error) *Worker

func (*Worker) Cancel

func (w *Worker) Cancel()

func (*Worker) Context

func (w *Worker) Context() context.Context

func (*Worker) Handle

func (w *Worker) Handle() error

func (*Worker) ID

func (w *Worker) ID() string

func (*Worker) IsStopped

func (w *Worker) IsStopped() bool

type WorkersPool

type WorkersPool[K any] struct {
	// contains filtered or unexported fields
}

func NewWorkersPool

func NewWorkersPool[K any](
	ctx context.Context,
) *WorkersPool[K]

func (*WorkersPool[K]) Execute

func (w *WorkersPool[K]) Execute(fn func(ctx context.Context) error)

func (*WorkersPool[K]) Get

func (w *WorkersPool[K]) Get(id string) (*Worker, bool)

func (*WorkersPool[K]) PersistentWorker

func (w *WorkersPool[K]) PersistentWorker(
	ctx context.Context,
	ch chan K,
	handler func(context.Context, K) error,
) error

func (*WorkersPool[K]) Remove

func (w *WorkersPool[K]) Remove(id string)

func (*WorkersPool[K]) Size

func (w *WorkersPool[K]) Size() uint64

func (*WorkersPool[K]) Stop

func (w *WorkersPool[K]) Stop()

func (*WorkersPool[K]) TemporalWorker

func (w *WorkersPool[K]) TemporalWorker(
	ctx context.Context,
	idleTimeout time.Duration,
	timeout func(),
	ch chan K,
	handler func(context.Context, K) error,
) error

func (*WorkersPool[K]) Wait

func (w *WorkersPool[K]) Wait() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL