executors

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const ErrCodeTaskPanic = "TASK_PANIC"

Variables

View Source
var ErrExecutorCanceled = errors.Newf("EXECUTOR_CANCELED", "executor is canceled")
View Source
var ErrExecutorQueueFull = errors.Newf("EXECUTOR_QUEUE_FULL", "executor queue is full")

Functions

This section is empty.

Types

type DefaultExecutor

type DefaultExecutor struct {
	// contains filtered or unexported fields
}

func NewDefaultExecutor

func NewDefaultExecutor(ctx context.Context, rootConf config.RootConfig, concurrency int, queueCapacity int) *DefaultExecutor

NewDefaultExecutor creates a new executor with the given concurrency and queue capacity. Either number can be negative which means there is no limit on the number of concurrent tasks or the queue capacity.

The executor will run until the given context is canceled or the Executor.Cancel/Executor.CancelNow methods are called.

When the capacity of the queue is reached, the executor will reject new tasks by returning ErrExecutorQueueFull.

When the executor is canceled, it will reject new tasks by returning ErrExecutorCanceled.

Implementation notes: - The executor uses a priority queue to schedule tasks based on how soon they need to be executed. - The priority queue is implemented using a heap. - The executor uses a timer to schedule the next task to run by inspecting the Top Element from the priority queue. - The executor uses a linked list to store tasks that are ready to run but there is no room to execute them. - Tasks are removed from the queue when they are started or canceled.

func (*DefaultExecutor) AwaitTermination

func (e *DefaultExecutor) AwaitTermination(timeout time.Duration) bool

func (*DefaultExecutor) Cancel

func (e *DefaultExecutor) Cancel() bool

func (*DefaultExecutor) CancelNow

func (e *DefaultExecutor) CancelNow() []Future

func (*DefaultExecutor) Canceled

func (e *DefaultExecutor) Canceled() bool

func (*DefaultExecutor) Concurrency

func (e *DefaultExecutor) Concurrency() int

func (*DefaultExecutor) QueueCapacity

func (e *DefaultExecutor) QueueCapacity() int

func (*DefaultExecutor) QueueLength

func (e *DefaultExecutor) QueueLength() int

func (*DefaultExecutor) Schedule

func (e *DefaultExecutor) Schedule(runnable Runnable, delay time.Duration) (ScheduledFuture, error)

func (*DefaultExecutor) ScheduleWithFixedDelay

func (e *DefaultExecutor) ScheduleWithFixedDelay(
	runnable Runnable,
	initialDelay time.Duration,
	delay time.Duration,
) (ScheduledFuture, error)

func (*DefaultExecutor) ScheduleWithFixedRate

func (e *DefaultExecutor) ScheduleWithFixedRate(
	runnable Runnable,
	initialDelay time.Duration,
	period time.Duration,
) (ScheduledFuture, error)

func (*DefaultExecutor) SetConcurrency

func (e *DefaultExecutor) SetConcurrency(concurrency int)

func (*DefaultExecutor) SetQueueCapacity

func (e *DefaultExecutor) SetQueueCapacity(capacity int)

func (*DefaultExecutor) Submit

func (e *DefaultExecutor) Submit(runnable Runnable) (Future, error)

func (*DefaultExecutor) SubmitProducer

func (e *DefaultExecutor) SubmitProducer(callable Producer) (ProducerFuture, error)

func (*DefaultExecutor) Terminated

func (e *DefaultExecutor) Terminated() bool

type Executor

type Executor interface {

	// Concurrency returns the number of commands that can run concurrently.
	Concurrency() int

	// QueueLength returns the number of commands in the queue, it does not include running commands.
	QueueLength() int

	// Submit runs the command as soon as possible.
	// The command can be rejected with ErrExecutorQueueFull or ErrExecutorCanceled.
	Submit(runnable Runnable) (Future, error)

	// SubmitProducer runs the command as soon as possible. The produced value can be retrieved by calling Future.Value().
	// The command can be rejected with ErrExecutorQueueFull or ErrExecutorCanceled.
	SubmitProducer(callable Producer) (ProducerFuture, error)

	// Schedule runs the command after the given delay.
	// The command can be rejected with ErrExecutorQueueFull or ErrExecutorCanceled.
	Schedule(runnable Runnable, delay time.Duration) (ScheduledFuture, error)

	// ScheduleWithFixedDelay runs the command first after the given initial delay, and then repeatedly with the given
	// delay between the termination of one execution and the commencement of the next.
	// The command can be rejected with ErrExecutorQueueFull or ErrExecutorCanceled.
	ScheduleWithFixedDelay(runnable Runnable, initialDelay time.Duration, delay time.Duration) (ScheduledFuture, error)

	// ScheduleWithFixedRate runs the command first after the given initial delay, and then repeatedly with the given
	// period between the commencement of subsequent executions. A slow command will affect the start time of the next
	// execution if it takes longer than its period, in which case the next execution will start immediately after the
	// slow command finishes. Executions are always sequential, so no two executions will be running at the same time.
	// The command can be rejected with ErrExecutorQueueFull or ErrExecutorCanceled.
	ScheduleWithFixedRate(runnable Runnable, initialDelay time.Duration, period time.Duration) (ScheduledFuture, error)

	// Cancel initiates an orderly shutdown in which previously submitted commands are executed and new commands will
	// be rejected with ErrExecutorCanceled. This method does not wait for previously submitted commands to complete, see
	// AwaitTermination for that. It will return true if the executor successfully started the shutdown process.
	// Successive calls to Cancel do not have any effect and will always return false.
	Cancel() bool

	// Canceled returns true if the executor has been canceled.
	Canceled() bool

	// CancelNow initiates a shutdown as described in Cancel, but it will also cancel all running commands and return
	// all the enqueued commands. Cancellation is done by calling Future.Cancel() on each running command, this will be
	// effective only if the command is checking for cancellation by reading their context.Done() channel.
	// This method does not wait for running commands to complete, see AwaitTermination for that.
	CancelNow() []Future

	// AwaitTermination blocks until the queue is empty and there are no more running commands, or until the given timeout.
	// It returns true on successful termination, false if the timeout was reached.
	AwaitTermination(timeout time.Duration) bool

	// Terminated returns true if the executor has been canceled and the queue is empty and all commands have completed.
	Terminated() bool
}

Executor can schedule commands to run after a given delay, or to execute periodically. Executor also provides methods to manage its lifecycle.

type Future

type Future interface {
	// Cancel will return true if the future execution was Canceled and Err will return context.Canceled.
	// A Future can only be Canceled if it is not Done. New calls to Cancel will always return false.
	Cancel() bool

	// Canceled returns true if the future was canceled.
	Canceled() bool

	// Await returns a channel that should be used to block until the computation is Done. It is safe to call Await
	// multiple times, it will always return the same channel.
	Await() <-chan struct{}

	// Done returns true if the computation is done, false otherwise.
	Done() bool

	// Err returns the error that caused the Future to be Done, or nil if it completed successfully.
	// An error with Code ErrCodeTaskPanic will be returned if the task panicked.
	Err() error
}

Future represents the result of an asynchronous computation.

Example:

future, err := executor.Submit(func(ctx context.Context) error {
	// Do something
	return nil
})
if err != nil {
	// Handle error
}
// Do something else

// Wait for the computation to complete
<-future.Await()

// Check if the computation was successful
if future.Err() != nil {
	// Handle error
}

type Producer

type Producer func(ctx context.Context) (any, error)

type ProducerFuture

type ProducerFuture interface {
	Future

	// Value returns the computed value, it should only be called after the computation is done according to Future.IsDone
	// and there is no Error returned by Future.Err.
	Value() any
}

ProducerFuture represents the result of an asynchronous computation that produces a Value.

type Runnable

type Runnable func(ctx context.Context) error

type ScheduledFuture

type ScheduledFuture interface {
	Future
	Delay() time.Duration
	Periodic() bool
}

ScheduledFuture represents the result of an asynchronous computation that was scheduled to run after a given delay. Delay returns how much time is left before the computation is ready to execute. A zero or negative delay means it is ready to execute.

Periodic returns true if the computation is scheduled to run periodically, false if it is scheduled to run only once.

A ScheduledFuture will not be Done until it is Canceled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL