tasks

package
v1.9.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package tasks is a generated GoMock package.

Package tasks is a generated GoMock package.

Index

Constants

View Source
const (
	WeightedChannelDefaultSize = 1000
)

Variables

View Source
var (
	PriorityHigh        = getPriority(highPriorityClass, mediumPrioritySubclass)
	PriorityLow         = getPriority(highPriorityClass, lowPrioritySubclass)
	PriorityPreemptable = getPriority(lowPriorityClass, mediumPrioritySubclass)
)

Functions

This section is empty.

Types

type ChannelWeightFn

type ChannelWeightFn[K comparable] func(K) int

ChannelWeightFn is the function for mapping a task channel (key) to its weight

type DynamicWorkerPoolLimiter

type DynamicWorkerPoolLimiter interface {
	// Dynamic concurrency limiter. Evaluated at submit time.
	Concurrency() int
	// Dynamic buffer size limiter. Evaluated at submit time.
	BufferSize() int
}

DynamicWorkerPoolLimiter provides dynamic limiters for DynamicWorkerPoolScheduler.

type DynamicWorkerPoolScheduler

type DynamicWorkerPoolScheduler struct {
	// contains filtered or unexported fields
}

DynamicWorkerPoolScheduler manages a pool of worker goroutines to execute Runnable instances. It limits the number of concurrently running workers and buffers tasks when that limit is reached. It limits the buffer size and rejects tasks when that limit is reached. New workers are created on-demand. Workers check for more tasks in the buffer after completing a task. If no tasks are available, the worker stops. The pool can be stopped, which aborts all buffered tasks.

func NewDynamicWorkerPoolScheduler

func NewDynamicWorkerPoolScheduler(
	limiter DynamicWorkerPoolLimiter,
	metricsHandler metrics.Handler,
) *DynamicWorkerPoolScheduler

NewDynamicWorkerPoolScheduler creates a DynamicWorkerPoolScheduler with the given limiter.

func (*DynamicWorkerPoolScheduler) InitiateShutdown

func (pool *DynamicWorkerPoolScheduler) InitiateShutdown()

InitiateShutdown aborts all buffered tasks and empties the buffer.

func (*DynamicWorkerPoolScheduler) TrySubmit

func (pool *DynamicWorkerPoolScheduler) TrySubmit(task Runnable) bool

func (*DynamicWorkerPoolScheduler) WaitShutdown

func (pool *DynamicWorkerPoolScheduler) WaitShutdown()

WaitShutdown waits for all worker goroutines to complete.

type ExecutionAwareScheduler

type ExecutionAwareScheduler[T Task] struct {
	// contains filtered or unexported fields
}

ExecutionAwareScheduler is a scheduler that wraps a base scheduler and adds an executionQueueScheduler for handling execution contention.

By default, tasks are processed by the base scheduler. When an execution experiences contention (e.g., busy workflow error), it gets routed to the executionQueueScheduler which ensures tasks are processed sequentially per execution.

func NewExecutionAwareScheduler

func NewExecutionAwareScheduler[T Task](
	baseScheduler Scheduler[T],
	options ExecutionAwareSchedulerOptions,
	queueKeyFn QueueKeyFn[T],
	logger log.Logger,
	metricsHandler metrics.Handler,
	timeSource clock.TimeSource,
) *ExecutionAwareScheduler[T]

NewExecutionAwareScheduler creates a new ExecutionAwareScheduler.

func (*ExecutionAwareScheduler[T]) HandleBusyWorkflow

func (s *ExecutionAwareScheduler[T]) HandleBusyWorkflow(task T) bool

HandleBusyWorkflow routes a task to the executionQueueScheduler when it encounters a contention error. Returns true if the task was handled (submitted to EQS), false if the caller should handle it (e.g., feature disabled or EQS at max capacity).

func (*ExecutionAwareScheduler[T]) HasExecutionQueue

func (s *ExecutionAwareScheduler[T]) HasExecutionQueue(task T) bool

HasExecutionQueue returns true if the task's execution has an active queue in the executionQueueScheduler.

func (*ExecutionAwareScheduler[T]) Start

func (s *ExecutionAwareScheduler[T]) Start()

func (*ExecutionAwareScheduler[T]) Stop

func (s *ExecutionAwareScheduler[T]) Stop()

func (*ExecutionAwareScheduler[T]) Submit

func (s *ExecutionAwareScheduler[T]) Submit(task T)

func (*ExecutionAwareScheduler[T]) TrySubmit

func (s *ExecutionAwareScheduler[T]) TrySubmit(task T) bool

type ExecutionAwareSchedulerOptions

type ExecutionAwareSchedulerOptions struct {
	// Enabled controls whether the executionQueueScheduler is active.
	Enabled func() bool
	// MaxQueues is the maximum number of concurrent execution queues.
	// When this limit is reached, new queues are rejected and tasks fall back to the base scheduler.
	MaxQueues func() int
	// QueueTTL is how long an idle queue stays in the map before being swept.
	QueueTTL func() time.Duration
	// QueueConcurrency is the max number of worker goroutines per queue.
	// Values <= 0 are capped to 1 (strictly sequential).
	QueueConcurrency func() int
}

ExecutionAwareSchedulerOptions contains configuration for the ExecutionAwareScheduler.

type FIFOScheduler

type FIFOScheduler[T Task] struct {
	// contains filtered or unexported fields
}

func NewFIFOScheduler

func NewFIFOScheduler[T Task](
	options *FIFOSchedulerOptions,
	logger log.Logger,
) *FIFOScheduler[T]

NewFIFOScheduler creates a new FIFOScheduler

func (*FIFOScheduler[T]) Start

func (f *FIFOScheduler[T]) Start()

func (*FIFOScheduler[T]) Stop

func (f *FIFOScheduler[T]) Stop()

func (*FIFOScheduler[T]) Submit

func (f *FIFOScheduler[T]) Submit(task T)

func (*FIFOScheduler[T]) TrySubmit

func (f *FIFOScheduler[T]) TrySubmit(task T) bool

type FIFOSchedulerOptions

type FIFOSchedulerOptions struct {
	QueueSize   int
	WorkerCount dynamicconfig.TypedSubscribable[int]
}

FIFOSchedulerOptions is the configs for FIFOScheduler

type GroupByScheduler

type GroupByScheduler[K comparable, T Task] struct {
	// contains filtered or unexported fields
}

GroupByScheduler groups tasks based on a provided key function and submits that task for processing on a dedicated scheduler for that group.

func NewGroupByScheduler

func NewGroupByScheduler[K comparable, T Task](options GroupBySchedulerOptions[K, T]) *GroupByScheduler[K, T]

NewGroupByScheduler creates a new GroupByScheduler from given options.

func (*GroupByScheduler[K, T]) Start

func (*GroupByScheduler[K, T]) Start()

func (*GroupByScheduler[K, T]) Stop

func (s *GroupByScheduler[K, T]) Stop()

Stop signals running tasks to stop, aborts any pending tasks and waits up to a minute for all running tasks to complete.

func (*GroupByScheduler[K, T]) Submit

func (s *GroupByScheduler[K, T]) Submit(task T)

func (*GroupByScheduler[K, T]) TrySubmit

func (s *GroupByScheduler[K, T]) TrySubmit(task T) bool

TrySubmit submits a task for processing. If called after the scheduler is shut down, the task will be accepted and aborted.

type GroupBySchedulerOptions

type GroupBySchedulerOptions[K comparable, T Task] struct {
	Logger log.Logger
	// A function to determine the group of a task.
	KeyFn func(T) K
	// Factory for creating a runnable from a task.
	RunnableFactory func(T) Runnable
	// When a new group is encountered, use this function to create a scheduler for that group.
	SchedulerFactory func(K) RunnableScheduler
}

GroupBySchedulerOptions are options for creating a GroupByScheduler.

type InterleavedWeightedRoundRobinScheduler

type InterleavedWeightedRoundRobinScheduler[T Task, K comparable] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

InterleavedWeightedRoundRobinScheduler is a round robin scheduler implementation ref: https://en.wikipedia.org/wiki/Weighted_round_robin#Interleaved_WRR

func NewInterleavedWeightedRoundRobinScheduler

func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable](
	options InterleavedWeightedRoundRobinSchedulerOptions[T, K],
	fifoScheduler Scheduler[T],
	logger log.Logger,
) *InterleavedWeightedRoundRobinScheduler[T, K]

func (*InterleavedWeightedRoundRobinScheduler[T, K]) Start

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Start()

func (*InterleavedWeightedRoundRobinScheduler[T, K]) Stop

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Stop()

func (*InterleavedWeightedRoundRobinScheduler[T, K]) Submit

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Submit(
	task T,
)

func (*InterleavedWeightedRoundRobinScheduler[T, K]) TrySubmit

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) TrySubmit(
	task T,
) bool

type InterleavedWeightedRoundRobinSchedulerOptions

type InterleavedWeightedRoundRobinSchedulerOptions[T Task, K comparable] struct {
	// Required for mapping a task to it's corresponding task channel
	TaskChannelKeyFn TaskChannelKeyFn[T, K]
	// Required for getting the weight for a task channel
	ChannelWeightFn ChannelWeightFn[K]
	// Optional, if specified, re-evaluate task channel weight when channel is not empty
	ChannelWeightUpdateCh chan struct{}
	// Optional, if specified, delete inactive channels after this duration
	InactiveChannelDeletionDelay dynamicconfig.DurationPropertyFn
}

InterleavedWeightedRoundRobinSchedulerOptions is the config for interleaved weighted round robin scheduler

type MetricTagsFn

type MetricTagsFn[T Task] func(T) []metrics.Tag

type MockRunnable

type MockRunnable struct {
	// contains filtered or unexported fields
}

MockRunnable is a mock of Runnable interface.

func NewMockRunnable

func NewMockRunnable(ctrl *gomock.Controller) *MockRunnable

NewMockRunnable creates a new mock instance.

func (*MockRunnable) Abort

func (m *MockRunnable) Abort()

Abort mocks base method.

func (*MockRunnable) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockRunnable) Run

func (m *MockRunnable) Run(arg0 context.Context)

Run mocks base method.

type MockRunnableMockRecorder

type MockRunnableMockRecorder struct {
	// contains filtered or unexported fields
}

MockRunnableMockRecorder is the mock recorder for MockRunnable.

func (*MockRunnableMockRecorder) Abort

func (mr *MockRunnableMockRecorder) Abort() *gomock.Call

Abort indicates an expected call of Abort.

func (*MockRunnableMockRecorder) Run

func (mr *MockRunnableMockRecorder) Run(arg0 any) *gomock.Call

Run indicates an expected call of Run.

type MockScheduler

type MockScheduler[T Task] struct {
	// contains filtered or unexported fields
}

MockScheduler is a mock of Scheduler interface.

func NewMockScheduler

func NewMockScheduler[T Task](ctrl *gomock.Controller) *MockScheduler[T]

NewMockScheduler creates a new mock instance.

func (*MockScheduler[T]) EXPECT

func (m *MockScheduler[T]) EXPECT() *MockSchedulerMockRecorder[T]

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockScheduler[T]) Start

func (m *MockScheduler[T]) Start()

Start mocks base method.

func (*MockScheduler[T]) Stop

func (m *MockScheduler[T]) Stop()

Stop mocks base method.

func (*MockScheduler[T]) Submit

func (m *MockScheduler[T]) Submit(task T)

Submit mocks base method.

func (*MockScheduler[T]) TrySubmit

func (m *MockScheduler[T]) TrySubmit(task T) bool

TrySubmit mocks base method.

type MockSchedulerMockRecorder

type MockSchedulerMockRecorder[T Task] struct {
	// contains filtered or unexported fields
}

MockSchedulerMockRecorder is the mock recorder for MockScheduler.

func (*MockSchedulerMockRecorder[T]) Start

func (mr *MockSchedulerMockRecorder[T]) Start() *gomock.Call

Start indicates an expected call of Start.

func (*MockSchedulerMockRecorder[T]) Stop

func (mr *MockSchedulerMockRecorder[T]) Stop() *gomock.Call

Stop indicates an expected call of Stop.

func (*MockSchedulerMockRecorder[T]) Submit

func (mr *MockSchedulerMockRecorder[T]) Submit(task any) *gomock.Call

Submit indicates an expected call of Submit.

func (*MockSchedulerMockRecorder[T]) TrySubmit

func (mr *MockSchedulerMockRecorder[T]) TrySubmit(task any) *gomock.Call

TrySubmit indicates an expected call of TrySubmit.

type MockTask

type MockTask struct {
	// contains filtered or unexported fields
}

MockTask is a mock of Task interface.

func NewMockTask

func NewMockTask(ctrl *gomock.Controller) *MockTask

NewMockTask creates a new mock instance.

func (*MockTask) Abort

func (m *MockTask) Abort()

Abort mocks base method.

func (*MockTask) Ack

func (m *MockTask) Ack()

Ack mocks base method.

func (*MockTask) Cancel

func (m *MockTask) Cancel()

Cancel mocks base method.

func (*MockTask) EXPECT

func (m *MockTask) EXPECT() *MockTaskMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockTask) Execute

func (m *MockTask) Execute() error

Execute mocks base method.

func (*MockTask) HandleErr

func (m *MockTask) HandleErr(err error) error

HandleErr mocks base method.

func (*MockTask) IsRetryableError

func (m *MockTask) IsRetryableError(err error) bool

IsRetryableError mocks base method.

func (*MockTask) Nack

func (m *MockTask) Nack(err error)

Nack mocks base method.

func (*MockTask) Reschedule

func (m *MockTask) Reschedule()

Reschedule mocks base method.

func (*MockTask) RetryPolicy

func (m *MockTask) RetryPolicy() backoff.RetryPolicy

RetryPolicy mocks base method.

func (*MockTask) State

func (m *MockTask) State() State

State mocks base method.

type MockTaskMockRecorder

type MockTaskMockRecorder struct {
	// contains filtered or unexported fields
}

MockTaskMockRecorder is the mock recorder for MockTask.

func (*MockTaskMockRecorder) Abort

func (mr *MockTaskMockRecorder) Abort() *gomock.Call

Abort indicates an expected call of Abort.

func (*MockTaskMockRecorder) Ack

func (mr *MockTaskMockRecorder) Ack() *gomock.Call

Ack indicates an expected call of Ack.

func (*MockTaskMockRecorder) Cancel

func (mr *MockTaskMockRecorder) Cancel() *gomock.Call

Cancel indicates an expected call of Cancel.

func (*MockTaskMockRecorder) Execute

func (mr *MockTaskMockRecorder) Execute() *gomock.Call

Execute indicates an expected call of Execute.

func (*MockTaskMockRecorder) HandleErr

func (mr *MockTaskMockRecorder) HandleErr(err any) *gomock.Call

HandleErr indicates an expected call of HandleErr.

func (*MockTaskMockRecorder) IsRetryableError

func (mr *MockTaskMockRecorder) IsRetryableError(err any) *gomock.Call

IsRetryableError indicates an expected call of IsRetryableError.

func (*MockTaskMockRecorder) Nack

func (mr *MockTaskMockRecorder) Nack(err any) *gomock.Call

Nack indicates an expected call of Nack.

func (*MockTaskMockRecorder) Reschedule

func (mr *MockTaskMockRecorder) Reschedule() *gomock.Call

Reschedule indicates an expected call of Reschedule.

func (*MockTaskMockRecorder) RetryPolicy

func (mr *MockTaskMockRecorder) RetryPolicy() *gomock.Call

RetryPolicy indicates an expected call of RetryPolicy.

func (*MockTaskMockRecorder) State

func (mr *MockTaskMockRecorder) State() *gomock.Call

State indicates an expected call of State.

type Priority

type Priority int

func (Priority) CallerType

func (p Priority) CallerType() string

func (Priority) String

func (p Priority) String() string

type QueueKeyFn

type QueueKeyFn[T Task] func(T) any

QueueKeyFn extracts a queue key from a task for queue routing.

type QuotaRequestFn

type QuotaRequestFn[T Task] func(T) quotas.Request

type RateLimitedScheduler

type RateLimitedScheduler[T Task] struct {
	// contains filtered or unexported fields
}

func NewRateLimitedScheduler

func NewRateLimitedScheduler[T Task](
	scheduler Scheduler[T],
	rateLimiter quotas.RequestRateLimiter,
	timeSource clock.TimeSource,
	quotaRequestFn QuotaRequestFn[T],
	metricTagsFn MetricTagsFn[T],
	options RateLimitedSchedulerOptions,
	logger log.Logger,
	metricsHandler metrics.Handler,
) *RateLimitedScheduler[T]

func (*RateLimitedScheduler[T]) Start

func (s *RateLimitedScheduler[T]) Start()

func (*RateLimitedScheduler[T]) Stop

func (s *RateLimitedScheduler[T]) Stop()

func (*RateLimitedScheduler[T]) Submit

func (s *RateLimitedScheduler[T]) Submit(task T)

func (*RateLimitedScheduler[T]) TrySubmit

func (s *RateLimitedScheduler[T]) TrySubmit(task T) bool

type RateLimitedSchedulerOptions

type RateLimitedSchedulerOptions struct {
	Enabled          dynamicconfig.BoolPropertyFn
	EnableShadowMode dynamicconfig.BoolPropertyFn
}

type RateLimitedTaskRunnable

type RateLimitedTaskRunnable struct {
	Runnable
	Limiter quotas.RateLimiter
	// contains filtered or unexported fields
}

RateLimitedTaskRunnable wraps a Runnable with a rate limiter.

func NewRateLimitedTaskRunnableFromTask

func NewRateLimitedTaskRunnableFromTask(
	task Task,
	limiter quotas.RateLimiter,
	metricsHandler metrics.Handler,
) RateLimitedTaskRunnable

NewRateLimitedTaskRunnableFromTask creates a [NewRateLimitedTaskRunnable] from a Task and a [rate.Limiter].

func (RateLimitedTaskRunnable) Run

Run the embedded Runnable, applying the rate limiter.

type Runnable

type Runnable interface {
	// Run and handle errors, abort on context error.
	Run(context.Context)
	// Abort marks the task as aborted, usually means task scheduler shutdown.
	Abort()
}

type RunnableScheduler

type RunnableScheduler interface {
	// InitiateShutdown signals the scheduler to stop without waiting for shutdown to complete.
	InitiateShutdown()
	// WaitShutdown waits for the scheduler to complete shutdown. Must be called after InitiateShutdown().
	WaitShutdown()
	// Submit a Runnable for scheduling, if the scheduler is already stopped, the runnable will be aborted.
	// Returns a boolean indicating whether the task was accepted.
	TrySubmit(Runnable) bool
}

RunnableScheduler is scheduler for Runnable tasks.

type RunnableTask

type RunnableTask struct {
	Task
}

RunnableTask turns a Task into a Runnable. Does **not** retry tasks.

func (RunnableTask) Run

func (a RunnableTask) Run(ctx context.Context)

Run the embedded task, handling errors and aborting on context errors.

type Scheduler

type Scheduler[T Task] interface {
	Submit(task T)
	TrySubmit(task T) bool
	Start()
	Stop()
}

Scheduler is the generic interface for scheduling & processing tasks

type SequentialScheduler

type SequentialScheduler[T Task] struct {
	// contains filtered or unexported fields
}

func NewSequentialScheduler

func NewSequentialScheduler[T Task](
	options *SequentialSchedulerOptions,
	taskQueueHashFn collection.HashFunc,
	taskQueueFactory SequentialTaskQueueFactory[T],
	logger log.Logger,
) *SequentialScheduler[T]

func (*SequentialScheduler[T]) Start

func (s *SequentialScheduler[T]) Start()

func (*SequentialScheduler[T]) Stop

func (s *SequentialScheduler[T]) Stop()

func (*SequentialScheduler[T]) Submit

func (s *SequentialScheduler[T]) Submit(task T)

func (*SequentialScheduler[T]) TrySubmit

func (s *SequentialScheduler[T]) TrySubmit(task T) bool

TrySubmit use mu locking to make it thread safe which has higher latency and not suitable for high throughput

type SequentialSchedulerOptions

type SequentialSchedulerOptions struct {
	QueueSize   int
	WorkerCount dynamicconfig.TypedSubscribable[int]
}

type SequentialTaskQueue

type SequentialTaskQueue[T Task] interface {
	// ID return the ID of the queue, as well as the tasks inside (same)
	ID() any
	// Add push a task to the task set
	Add(T)
	// Remove pop a task from the task set
	Remove() T
	// IsEmpty indicate if the task set is empty
	IsEmpty() bool
	// Len return the size of the queue
	Len() int
}

type SequentialTaskQueueFactory

type SequentialTaskQueueFactory[T Task] func(task T) SequentialTaskQueue[T]

type State

type State int

State represents the current state of a task

const (
	// TaskStatePending is the state for a task when it's waiting to be processed or currently being processed
	TaskStatePending State = iota + 1
	// TaskStateAborted is the state for a task when its executor shuts down
	TaskStateAborted
	// TaskStateCancelled is the state for a task when its execution has request to be cancelled
	TaskStateCancelled
	// TaskStateAcked is the state for a task if it has been successfully completed
	TaskStateAcked
	// TaskStateNacked is the state for a task if it can not be processed
	TaskStateNacked
)

type Task

type Task interface {
	// Execute process this task
	Execute() error
	// HandleErr handle the error returned by Execute
	HandleErr(err error) error
	// IsRetryableError check whether to retry after HandleErr(Execute())
	IsRetryableError(err error) bool
	// RetryPolicy returns the retry policy for task processing
	RetryPolicy() backoff.RetryPolicy
	// Abort marks the task as aborted, usually means task executor shutdown
	Abort()
	// Cancel marks the task as cancelled, usually by the task submitter
	Cancel()
	// Ack marks the task as successful completed
	Ack()
	// Nack marks the task as unsuccessful completed
	Nack(err error)
	// Reschedule marks the task for retry
	Reschedule()
	// State returns the current task state
	State() State
}

Task is the interface for tasks which should be executed sequentially

type TaskChannelKeyFn

type TaskChannelKeyFn[T Task, K comparable] func(T) K

TaskChannelKeyFn is the function for mapping a task to its task channel (key)

type WeightedChannel

type WeightedChannel[T Task] struct {
	// contains filtered or unexported fields
}

func NewWeightedChannel

func NewWeightedChannel[T Task](
	weight int,
	size int,
	now time.Time,
) *WeightedChannel[T]

func (*WeightedChannel[T]) Cap

func (c *WeightedChannel[T]) Cap() int

func (*WeightedChannel[T]) Chan

func (c *WeightedChannel[T]) Chan() chan T

func (*WeightedChannel[T]) DecrementRefCount

func (c *WeightedChannel[T]) DecrementRefCount()

func (*WeightedChannel[T]) IncrementRefCount

func (c *WeightedChannel[T]) IncrementRefCount()

func (*WeightedChannel[T]) LastActiveTime

func (c *WeightedChannel[T]) LastActiveTime() time.Time

func (*WeightedChannel[T]) Len

func (c *WeightedChannel[T]) Len() int

func (*WeightedChannel[T]) RefCount

func (c *WeightedChannel[T]) RefCount() int32

func (*WeightedChannel[T]) SetWeight

func (c *WeightedChannel[T]) SetWeight(newWeight int)

func (*WeightedChannel[T]) UpdateLastActiveTime

func (c *WeightedChannel[T]) UpdateLastActiveTime(now time.Time)

func (*WeightedChannel[T]) Weight

func (c *WeightedChannel[T]) Weight() int

type WeightedChannels

type WeightedChannels[T Task] []*WeightedChannel[T]

func (WeightedChannels[T]) Len

func (c WeightedChannels[T]) Len() int

func (WeightedChannels[T]) Less

func (c WeightedChannels[T]) Less(i, j int) bool

func (WeightedChannels[T]) Swap

func (c WeightedChannels[T]) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL