Documentation
¶
Overview ¶
Package worker provides a reusable background worker pool for async job processing.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchConfig ¶
type BatchConfig[T any] struct { // Name is used for logging. Name string // BatchSize is the maximum items per batch. Default: 100. BatchSize int // FlushInterval is the maximum time to wait before flushing. Default: 1s. FlushInterval time.Duration // ProcessFn is called to process a batch of items. ProcessFn func(ctx context.Context, items []T) error }
BatchConfig configures a batch processor.
type BatchProcessor ¶
type BatchProcessor[T any] struct { // contains filtered or unexported fields }
BatchProcessor collects items and processes them in batches. It flushes when either the batch size is reached or the flush interval expires.
func NewBatchProcessor ¶
func NewBatchProcessor[T any](config BatchConfig[T]) *BatchProcessor[T]
NewBatchProcessor creates a new batch processor.
func (*BatchProcessor[T]) Add ¶
func (b *BatchProcessor[T]) Add(item T)
Add adds an item to the batch. If the batch is full, it triggers a flush.
func (*BatchProcessor[T]) AddBatch ¶
func (b *BatchProcessor[T]) AddBatch(items []T)
AddBatch adds multiple items at once.
func (*BatchProcessor[T]) BufferLength ¶
func (b *BatchProcessor[T]) BufferLength() int
BufferLength returns the current number of items in the buffer.
func (*BatchProcessor[T]) Flush ¶
func (b *BatchProcessor[T]) Flush()
Flush forces an immediate flush of the current batch.
func (*BatchProcessor[T]) Start ¶
func (b *BatchProcessor[T]) Start(ctx context.Context)
Start begins the batch processor.
func (*BatchProcessor[T]) Stop ¶
func (b *BatchProcessor[T]) Stop()
Stop gracefully shuts down the batch processor, flushing any remaining items.
type Job ¶
type Job interface {
// Execute performs the job's work. Returns an error if the job failed.
Execute(ctx context.Context) error
// ID returns a unique identifier for logging purposes.
ID() string
}
Job represents a unit of work to be processed by the worker pool.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool manages a pool of workers that process jobs from a queue.
func NewPool ¶
func NewPool(config PoolConfig) *Pool
NewPool creates a new worker pool with the given configuration.
func (*Pool) ActiveWorkers ¶
ActiveWorkers returns the number of currently active workers.
func (*Pool) QueueLength ¶
QueueLength returns the current number of jobs in the queue.
func (*Pool) Start ¶
Start begins the worker pool. It spawns a dispatcher goroutine that reads from the job queue and assigns work to available workers.
func (*Pool) Stop ¶
func (p *Pool) Stop()
Stop gracefully shuts down the worker pool. It stops accepting new jobs, waits for in-flight jobs to complete, and then returns.
type PoolConfig ¶
type PoolConfig struct {
// Name is used for logging.
Name string
// MaxWorkers is the maximum number of concurrent workers. Default: 10.
MaxWorkers int
// QueueSize is the buffer size for the job queue. Default: 100.
QueueSize int
// OnJobStart is called when a job starts processing.
OnJobStart func(job Job)
// OnJobComplete is called when a job finishes (successfully or with error).
OnJobComplete func(job Job, err error, duration time.Duration)
// OnJobPanic is called when a job panics.
OnJobPanic func(job Job, recovered interface{})
}
PoolConfig configures a worker pool.