threadpool

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrInvalidWorkerCount = utils.Error("Invalid workerCount value")
	ErrInvalidQueueSize   = utils.Error("Invalid queueSize value")
	ErrPoolNotStarted     = utils.Error("ThreadPool not started")
	ErrPoolAlreadyStarted = utils.Error("ThreadPool already started")
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job interface {
	Run(ctx context.Context)
}

func FuncRunner added in v0.7.0

func FuncRunner(job func(ctx context.Context)) Job

type OptionsFn added in v0.4.1

type OptionsFn func(*ThreadPool)

func WithLogger added in v0.4.1

func WithLogger(logger *log.Logger) OptionsFn

WithLogger adds a logger to the threadpool

type Pool

type Pool interface {
	Stop() error
	Dispatch(j Job)
	TryDispatch(j Job) bool
	DispatchWithTimeout(j Job, timeout time.Duration) bool
	DispatchWithContext(ctx context.Context, j Job) error
	Start(ctx context.Context) error
}

type ThreadPool

type ThreadPool struct {
	// contains filtered or unexported fields
}

func NewThreadPool

func NewThreadPool(workerCount int, queueSize int, opts ...OptionsFn) (*ThreadPool, error)

NewThreadPool is a constructor function that creates a new ThreadPool instance. It takes in parameters: - workerCount: the number of workers to be created in the ThreadPool. Must be greater than 0. If it's less than 1, it returns ErrInvalidWorkerCount. - queueSize: the size of the job queue in the ThreadPool. Must be greater than 0. If it's less than 1, it returns ErrInvalidQueueSize. - opts: optional functional options like WithLogger It returns a pointer to the created ThreadPool and an error.

Example usage: pool, err := NewThreadPool(5, 10)

if err != nil {
  // handle error
}

// Or with options: logger := log.NewLogger() pool, err := NewThreadPool(5, 10, WithLogger(logger))

pool.Dispatch(job) ...

func (*ThreadPool) Dispatch

func (t *ThreadPool) Dispatch(j Job)

Dispatch adds a new job to the jobQueue of the ThreadPool. The job will be executed by one of the worker goroutines in the ThreadPool. The job must implement the Job interface with a Run method that takes a context.Context parameter.

Example usage:

job := MyJob{}
threadPool.Dispatch(job)

Note: This function is blocking if jobQueue is full

func (*ThreadPool) DispatchWithContext added in v0.4.1

func (t *ThreadPool) DispatchWithContext(ctx context.Context, j Job) error

DispatchWithContext attempts to dispatch a job respecting context cancellation. It returns nil if the job was successfully dispatched, or an error if the context was canceled.

Example usage:

job := MyJob{}
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

if err := threadPool.DispatchWithContext(ctx, job); err != nil {
  // Handle dispatch error (context canceled or deadline exceeded)
}

func (*ThreadPool) DispatchWithTimeout added in v0.4.1

func (t *ThreadPool) DispatchWithTimeout(j Job, timeout time.Duration) bool

DispatchWithTimeout attempts to dispatch a job with a specified timeout. It returns true if the job was successfully dispatched, false if the timeout was reached.

Example usage:

job := MyJob{}
if !threadPool.DispatchWithTimeout(job, 100*time.Millisecond) {
  // Handle job timeout
}

func (*ThreadPool) GetQueueCapacity

func (t *ThreadPool) GetQueueCapacity() int

GetQueueCapacity returns the capacity of the job queue in the ThreadPool.

func (*ThreadPool) GetQueueLen

func (t *ThreadPool) GetQueueLen() int

GetQueueLen returns the number of jobs currently in the jobQueue of the ThreadPool. It calculates the size of the jobQueue by using the len() function on the jobQueue slice. The returned value represents the number of pending jobs waiting to be processed by the workers.

func (*ThreadPool) GetRequestCount

func (t *ThreadPool) GetRequestCount() uint64

GetRequestCount returns the total number of requests handled by the ThreadPool. If the ThreadPool has not been started, it returns 0. It internally calls the RequestCount method of the workers in the ThreadGroup to calculate the total number of requests.

func (*ThreadPool) GetWorkerCount

func (t *ThreadPool) GetWorkerCount() int

GetWorkerCount returns the current number of workers in the ThreadPool. It retrieves the value of workerCount from the ThreadPool and returns it. This count represents the number of workers that are actively processing jobs. Note that this count does not include idle or terminated workers.

func (*ThreadPool) Start

func (t *ThreadPool) Start(ctx context.Context) error

Start starts the execution of the ThreadPool. It returns an error if the ThreadPool has already been started. If the given context is nil, it will default to the background context. It creates a new WorkerGroup with the specified workerCount

func (*ThreadPool) Stop

func (t *ThreadPool) Stop() error

Stop stops the execution of the ThreadPool. It returns an error if the ThreadPool has not been started yet. It cancels the context and waits for all workers to finish their current jobs. After that, it cleans the worker list and sets the started flag to false. Note: this function is blocking

func (*ThreadPool) TryDispatch added in v0.4.1

func (t *ThreadPool) TryDispatch(j Job) bool

TryDispatch attempts to dispatch a job to the ThreadPool without blocking. It returns true if the job was successfully dispatched, false if the queue is full.

Example usage:

job := MyJob{}
if !threadPool.TryDispatch(job) {
  // Handle job rejection (queue full)
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(jobQueue chan Job, ctx context.Context) *Worker

func (*Worker) RequestCounter

func (w *Worker) RequestCounter() uint64

func (*Worker) Start

func (w *Worker) Start(wg *sync.WaitGroup, logger *log.Logger)

type WorkerGroup

type WorkerGroup struct {
	// contains filtered or unexported fields
}

func NewWorkerGroup

func NewWorkerGroup(workerCount int, jobQueue chan Job, parentCtx context.Context, logger *log.Logger) (*WorkerGroup, error)

NewWorkerGroup creates a new group of workers If logger is nil, panics will be recovered silently

func (*WorkerGroup) RequestCount

func (w *WorkerGroup) RequestCount() uint64

func (*WorkerGroup) Stop

func (w *WorkerGroup) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL