taskpool

package
v0.16.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMinWorkers          = 1
	DefaultMaxWorkers          = 0 // 0 = runtime.NumCPU() * 2
	DefaultIdleTimeout         = 30 * time.Second
	DefaultTaskQueueSize       = 100
	DefaultTaskTimeout         = 5 * time.Second
	DefaultMaxTaskTimeout      = 30 * time.Second
	DefaultHealthCheckInterval = 1 * time.Minute
)

Variables

View Source
var (
	ErrQueueFull          = errors.New("taskpool: queue is full")
	ErrWorkerStopped      = errors.New("taskpool: worker is stopped")
	ErrPoolShutdown       = errors.New("taskpool: pool is shutdown")
	ErrInvalidPriority    = errors.New("taskpool: invalid priority")
	ErrInvalidConfig      = errors.New("taskpool: invalid configuration")
	ErrDelegateInitFailed = errors.New("taskpool: delegate initialization failed")
	ErrMaxWorkersReached  = errors.New("taskpool: max workers reached")
)

Functions

This section is empty.

Types

type Config

type Config[TIn, TOut any] struct {
	MinWorkers          int
	MaxWorkers          int           // 0 = runtime.NumCPU() * 2
	IdleTimeout         time.Duration // Workers stop if idle longer than this
	TaskQueueSize       int
	TaskTimeout         time.Duration
	MaxTaskTimeout      time.Duration
	DelegateFactory     DelegateFactory[TIn, TOut]
	DelegateConfig      any
	HealthCheckInterval time.Duration
	Logger              log.Logger
}

func (*Config[TIn, TOut]) Validate

func (c *Config[TIn, TOut]) Validate() error

Validate validates the configuration and applies defaults.

type DefaultScheduler

type DefaultScheduler[TIn, TOut any] struct {
	// contains filtered or unexported fields
}

func (*DefaultScheduler[TIn, TOut]) Shutdown

func (s *DefaultScheduler[TIn, TOut]) Shutdown(ctx context.Context) error

func (*DefaultScheduler[TIn, TOut]) Stats

func (s *DefaultScheduler[TIn, TOut]) Stats() SchedulerStats

func (*DefaultScheduler[TIn, TOut]) Submit

func (s *DefaultScheduler[TIn, TOut]) Submit(ctx context.Context, payload TIn, opts ...SubmitOption) (Result[TOut], error)

func (*DefaultScheduler[TIn, TOut]) SubmitAsync

func (s *DefaultScheduler[TIn, TOut]) SubmitAsync(ctx context.Context, payload TIn, opts ...SubmitOption) (<-chan Result[TOut], error)

type DelegateFactory

type DelegateFactory[TIn, TOut any] func() WorkerDelegate[TIn, TOut]

type Priority

type Priority int
const (
	PriorityLow    Priority = 0
	PriorityMedium Priority = 1
	PriorityHigh   Priority = 2
)

type Result

type Result[TOut any] struct {
	TaskId   string
	Data     TOut
	Error    error
	Duration time.Duration
}

Result contains the outcome of task execution.

type Scheduler

type Scheduler[TIn, TOut any] interface {
	// Submit blocks until task completes or context is canceled.
	Submit(ctx context.Context, payload TIn, opts ...SubmitOption) (Result[TOut], error)

	// SubmitAsync returns immediately with a result channel.
	SubmitAsync(ctx context.Context, payload TIn, opts ...SubmitOption) (<-chan Result[TOut], error)

	Stats() SchedulerStats

	// Shutdown waits for running tasks to complete.
	Shutdown(ctx context.Context) error
}

Scheduler manages task submission and execution.

func New

func New[TIn, TOut any](config Config[TIn, TOut]) (Scheduler[TIn, TOut], error)

type SchedulerStats

type SchedulerStats struct {
	TotalSubmitted uint64
	TotalCompleted uint64
	TotalFailed    uint64
	ActiveWorkers  int
	IdleWorkers    int
	TotalWorkers   int
	QueuedTasks    int
}

type SubmitOption

type SubmitOption func(*submitOptions)

func WithPriority

func WithPriority(p Priority) SubmitOption

type Task

type Task[TIn, TOut any] struct {
	Id          string
	Context     context.Context
	Priority    Priority
	Payload     TIn
	Result      chan<- Result[TOut]
	Done        chan struct{} // Closed by worker after execution
	SubmittedAt time.Time
}

Task is a unit of work executed by a worker.

type Worker

type Worker[TIn, TOut any] struct {
	// contains filtered or unexported fields
}

Worker executes tasks in a dedicated OS thread.

type WorkerDelegate

type WorkerDelegate[TIn, TOut any] interface {
	// Init is called once when worker starts.
	Init(ctx context.Context, config any) error

	// Execute runs for each task. Must respect context cancellation.
	Execute(ctx context.Context, payload TIn) (TOut, error)

	// Destroy is called once when worker stops.
	Destroy() error

	// HealthCheck is called periodically.
	HealthCheck() error
}

WorkerDelegate defines pluggable task execution logic. Each worker owns its delegate instance, all methods run in worker's OS thread.

type WorkerPool

type WorkerPool[TIn, TOut any] struct {
	// contains filtered or unexported fields
}

WorkerPool manages elastic worker scaling with priority queues.

func (*WorkerPool[TIn, TOut]) Shutdown

func (p *WorkerPool[TIn, TOut]) Shutdown(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL