pool

package
v0.0.1-dev.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchQueueOptions

type BatchQueueOptions struct {
	Capacity  int
	BatchSize int
}

BatchQueueOptions defines options for a batch queue

type DelayedQueueOptions

type DelayedQueueOptions struct {
	Capacity int
}

DelayedQueueOptions defines options for a delayed queue

type PriorityQueueOptions

type PriorityQueueOptions struct {
	Capacity int
}

PriorityQueueOptions defines options for a priority queue

type QueueOptions

type QueueOptions struct {
	Capacity int
}

QueueOptions defines the base options for all queue types

type RateLimitQueueOptions

type RateLimitQueueOptions struct {
	Capacity      int
	RatePerSecond float64
}

RateLimitQueueOptions defines options for a rate-limited queue

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of workers that process tasks

func DefaultWorkerPool

func DefaultWorkerPool(name string, numWorkers int) (*WorkerPool, error)

DefaultWorkerPool creates a worker pool with sensible defaults

func DeletionWorkerPool

func DeletionWorkerPool(numWorkers int) (*WorkerPool, error)

DeletionWorkerPool creates a worker pool optimized for deletion tasks

func NewWorkerPool

func NewWorkerPool(config WorkerPoolConfig) (*WorkerPool, error)

NewWorkerPool creates a new worker pool with the given configuration

func (*WorkerPool) CheckCircularDependencies

func (p *WorkerPool) CheckCircularDependencies(ctx context.Context, task worker.Task) error

CheckCircularDependencies detects circular dependencies in a task

func (*WorkerPool) Clear

func (p *WorkerPool) Clear(ctx context.Context) error

Clear removes all tasks from the queue

func (*WorkerPool) ClearDeadLetterQueue

func (p *WorkerPool) ClearDeadLetterQueue(ctx context.Context) error

ClearDeadLetterQueue removes all tasks from the dead letter queue

func (*WorkerPool) GetDeadLetterQueue

func (p *WorkerPool) GetDeadLetterQueue() queue.Queue

GetDeadLetterQueue returns the dead letter queue for inspection

func (*WorkerPool) GetMetrics

func (p *WorkerPool) GetMetrics() *metrics.Metrics

GetMetrics returns the current metrics

func (*WorkerPool) GetTaskStore

func (p *WorkerPool) GetTaskStore() store.TaskStore

GetTaskStore returns the task store for external access

func (*WorkerPool) ListDeadLetterTasks

func (p *WorkerPool) ListDeadLetterTasks(ctx context.Context) ([]worker.Task, error)

ListDeadLetterTasks returns all tasks in the dead letter queue

func (*WorkerPool) RecoverTasks

func (p *WorkerPool) RecoverTasks(ctx context.Context) error

RecoverTasks recovers tasks from the store on startup

func (*WorkerPool) RetryDeadLetterTask

func (p *WorkerPool) RetryDeadLetterTask(ctx context.Context, taskID string) error

RetryDeadLetterTask moves a task from dead letter queue back to main queue for retry

func (*WorkerPool) Size

func (p *WorkerPool) Size(ctx context.Context) (int, error)

Size returns the number of tasks in the queue

func (*WorkerPool) Start

func (p *WorkerPool) Start()

Start starts the worker pool

func (*WorkerPool) Stop

func (p *WorkerPool) Stop()

Stop stops the worker pool

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(ctx context.Context, task worker.Task) error

Submit submits a task to the worker pool

func (*WorkerPool) SubmitAfter

func (p *WorkerPool) SubmitAfter(ctx context.Context, task worker.Task, delay time.Duration) error

SubmitAfter submits a task to be executed after a delay

func (*WorkerPool) ValidateTaskDependencies

func (p *WorkerPool) ValidateTaskDependencies(ctx context.Context, task worker.Task) error

ValidateTaskDependencies checks if all dependencies for a task are satisfied

type WorkerPoolConfig

type WorkerPoolConfig struct {
	// Name of the worker pool
	Name string

	// Number of workers in the pool
	NumWorkers int

	// Queue configuration
	QueueType      queue.QueueType
	QueueCapacity  int
	QueueRateLimit time.Duration
	QueueBatchSize int

	// Dead letter queue configuration
	DeadLetterQueueCapacity int

	// Default timeout for task execution
	DefaultTimeout time.Duration

	// Default retry policy
	DefaultRetryPolicy worker.RetryPolicy

	// Error handler for task execution errors
	ErrorHandler func(error)

	// Metrics configuration
	EnableMetrics bool
	Metrics       *metrics.Metrics

	// Task store for persistence (now includes execution results)
	TaskStore store.TaskStore

	// Enable task persistence
	EnablePersistence bool
}

WorkerPoolConfig holds the configuration for a worker pool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL