worker

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2025 License: Apache-2.0 Imports: 5 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrQueueFull = errors.New("task queue is full")

Functions

This section is empty.

Types

type Config

type Config struct {
	MaxWorkers  int           // maximum number of workers
	QueueSize   int           // task queue size
	TaskTimeout time.Duration // timeout for single task
}

Config represents pool configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns default configuration

func (*Config) Validate

func (cfg *Config) Validate() error

Validate validates configuration

type Metrics

type Metrics struct {
	ActiveWorkers  atomic.Int64
	PendingTasks   atomic.Int64
	CompletedTasks atomic.Int64
	FailedTasks    atomic.Int64
	ProcessingTime atomic.Int64 // nanoseconds
}

Metrics tracks pool's operational metrics

func (*Metrics) Reset

func (m *Metrics) Reset()

Reset resets all metrics to zero

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents a worker pool

func NewPool

func NewPool(cfg *Config, processors ...Processor) *Pool

NewPool creates a new worker pool

Usage:

// Create a task processor
processor := func(task any) error {
    // Process the task here
    // ...
    return nil
}

// Create a worker pool configuration
cfg := &worker.Config{
    MaxWorkers:  10,   // Maximum number of worker goroutines
    QueueSize:   100,  // Maximum number of tasks that can be queued
    TaskTimeout: time.Minute, // Timeout for a single task execution
}

// Create a new worker pool
pool := worker.NewPool(cfg, processor)

// Start the worker pool
pool.Start()

// Submit tasks to the pool
err := pool.Submit("task1")
if err != nil {
    log.Printf("Failed to submit task: %v", err)
}

err = pool.Submit("task2")
if err != nil {
    log.Printf("Failed to submit task: %v", err)
}

// Get pool metrics
metrics := pool.GetMetrics()
log.Printf("Pool metrics: %+v", metrics)

// Check if the pool is busy
if pool.IsBusy() {
    log.Println("Pool is busy")
}

// Stop the worker pool
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pool.Stop(ctx)

func (*Pool) GetMetrics

func (p *Pool) GetMetrics() map[string]int64

GetMetrics returns the current metrics

func (*Pool) IsBusy

func (p *Pool) IsBusy() bool

IsBusy returns whether the pool is busy

func (*Pool) IsEmpty

func (p *Pool) IsEmpty() bool

IsEmpty returns whether the pool is empty

func (*Pool) IsIdle

func (p *Pool) IsIdle() bool

IsIdle returns whether the pool is idle

func (*Pool) Start

func (p *Pool) Start()

Start starts the worker pool

func (*Pool) Stop

func (p *Pool) Stop(ctx context.Context)

Stop stops the worker pool

func (*Pool) Submit

func (p *Pool) Submit(task any) error

Submit submits a task to the pool

type Processor

type Processor interface {
	Process(task any) error
}

Processor represents a task processor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL