worker

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package worker provides a reusable background worker pool for async job processing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchConfig

type BatchConfig[T any] struct {
	// Name is used for logging.
	Name string
	// BatchSize is the maximum items per batch. Default: 100.
	BatchSize int
	// FlushInterval is the maximum time to wait before flushing. Default: 1s.
	FlushInterval time.Duration
	// ProcessFn is called to process a batch of items.
	ProcessFn func(ctx context.Context, items []T) error
}

BatchConfig configures a batch processor.

type BatchProcessor

type BatchProcessor[T any] struct {
	// contains filtered or unexported fields
}

BatchProcessor collects items and processes them in batches. It flushes when either the batch size is reached or the flush interval expires.

func NewBatchProcessor

func NewBatchProcessor[T any](config BatchConfig[T]) *BatchProcessor[T]

NewBatchProcessor creates a new batch processor.

func (*BatchProcessor[T]) Add

func (b *BatchProcessor[T]) Add(item T)

Add adds an item to the batch. If the batch is full, it triggers a flush.

func (*BatchProcessor[T]) AddBatch

func (b *BatchProcessor[T]) AddBatch(items []T)

AddBatch adds multiple items at once.

func (*BatchProcessor[T]) BufferLength

func (b *BatchProcessor[T]) BufferLength() int

BufferLength returns the current number of items in the buffer.

func (*BatchProcessor[T]) Flush

func (b *BatchProcessor[T]) Flush()

Flush forces an immediate flush of the current batch.

func (*BatchProcessor[T]) Start

func (b *BatchProcessor[T]) Start(ctx context.Context)

Start begins the batch processor.

func (*BatchProcessor[T]) Stop

func (b *BatchProcessor[T]) Stop()

Stop gracefully shuts down the batch processor, flushing any remaining items.

type Job

type Job interface {
	// Execute performs the job's work. Returns an error if the job failed.
	Execute(ctx context.Context) error
	// ID returns a unique identifier for logging purposes.
	ID() string
}

Job represents a unit of work to be processed by the worker pool.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a pool of workers that process jobs from a queue.

func NewPool

func NewPool(config PoolConfig) *Pool

NewPool creates a new worker pool with the given configuration.

func (*Pool) ActiveWorkers

func (p *Pool) ActiveWorkers() int

ActiveWorkers returns the number of currently active workers.

func (*Pool) QueueLength

func (p *Pool) QueueLength() int

QueueLength returns the current number of jobs in the queue.

func (*Pool) Start

func (p *Pool) Start(ctx context.Context)

Start begins the worker pool. It spawns a dispatcher goroutine that reads from the job queue and assigns work to available workers.

func (*Pool) Stop

func (p *Pool) Stop()

Stop gracefully shuts down the worker pool. It stops accepting new jobs, waits for in-flight jobs to complete, and then returns.

func (*Pool) Submit

func (p *Pool) Submit(job Job) bool

Submit adds a job to the queue. Returns true if the job was queued, false if the queue is full (non-blocking).

func (*Pool) SubmitWait

func (p *Pool) SubmitWait(ctx context.Context, job Job) error

SubmitWait adds a job to the queue, blocking until space is available or the context is cancelled.

type PoolConfig

type PoolConfig struct {
	// Name is used for logging.
	Name string
	// MaxWorkers is the maximum number of concurrent workers. Default: 10.
	MaxWorkers int
	// QueueSize is the buffer size for the job queue. Default: 100.
	QueueSize int
	// OnJobStart is called when a job starts processing.
	OnJobStart func(job Job)
	// OnJobComplete is called when a job finishes (successfully or with error).
	OnJobComplete func(job Job, err error, duration time.Duration)
	// OnJobPanic is called when a job panics.
	OnJobPanic func(job Job, recovered interface{})
}

PoolConfig configures a worker pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL