core

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func PostDelayedTaskAndReplyWithResult

func PostDelayedTaskAndReplyWithResult[T any](
	targetRunner TaskRunner,
	task TaskWithResult[T],
	delay time.Duration,
	reply ReplyWithResult[T],
	replyRunner TaskRunner,
)

PostDelayedTaskAndReplyWithResult is similar to PostTaskAndReplyWithResult, but delays the execution of the task.

The reply is NOT delayed - it executes immediately after the task completes. Only the initial task execution is delayed by the specified duration.

Example:

PostDelayedTaskAndReplyWithResult(
    runner,
    func(ctx context.Context) (string, error) {
        return "delayed result", nil
    },
    2*time.Second,  // Wait 2 seconds before starting task
    func(ctx context.Context, result string, err error) {
        fmt.Println(result)  // Executes immediately after task completes
    },
    replyRunner,
)

func PostDelayedTaskAndReplyWithResultAndTraits

func PostDelayedTaskAndReplyWithResultAndTraits[T any](
	targetRunner TaskRunner,
	task TaskWithResult[T],
	delay time.Duration,
	taskTraits TaskTraits,
	reply ReplyWithResult[T],
	replyTraits TaskTraits,
	replyRunner TaskRunner,
)

PostDelayedTaskAndReplyWithResultAndTraits is the full-featured delayed version with separate traits for task and reply.

func PostTaskAndReplyWithResult

func PostTaskAndReplyWithResult[T any](
	targetRunner TaskRunner,
	task TaskWithResult[T],
	reply ReplyWithResult[T],
	replyRunner TaskRunner,
)

PostTaskAndReplyWithResult executes a task that returns a result of type T and an error, then passes that result to a reply callback on the replyRunner.

This function uses closure capture to safely pass the result across goroutines. The captured variables (result and err) will escape to the heap, ensuring thread safety.

Execution guarantee (Happens-Before): - The task ALWAYS completes before the reply starts - The reply ALWAYS sees the final values written by the task - This is guaranteed by the sequential execution in wrappedTask

Example:

PostTaskAndReplyWithResult(
    backgroundRunner,
    func(ctx context.Context) (int, error) {
        return len("Hello"), nil
    },
    func(ctx context.Context, length int, err error) {
        fmt.Printf("Length: %d\n", length)
    },
    uiRunner,
)

func PostTaskAndReplyWithResultAndTraits

func PostTaskAndReplyWithResultAndTraits[T any](
	targetRunner TaskRunner,
	task TaskWithResult[T],
	taskTraits TaskTraits,
	reply ReplyWithResult[T],
	replyTraits TaskTraits,
	replyRunner TaskRunner,
)

PostTaskAndReplyWithResultAndTraits is the full-featured version that allows specifying different traits for the task and reply separately.

This is useful when: - Task is background work (BestEffort) but reply is UI update (UserVisible/UserBlocking) - Task has different priority requirements than the reply

Example:

PostTaskAndReplyWithResultAndTraits(
    backgroundRunner,
    func(ctx context.Context) (*UserData, error) {
        return fetchUserFromDB(ctx)
    },
    TraitsBestEffort(),        // Background work, low priority
    func(ctx context.Context, user *UserData, err error) {
        updateUI(user)
    },
    TraitsUserVisible(),       // UI update, higher priority
    uiRunner,
)

Types

type DefaultPanicHandler added in v0.3.0

type DefaultPanicHandler struct{}

DefaultPanicHandler provides a basic panic handler that logs to stdout.

func (*DefaultPanicHandler) HandlePanic added in v0.3.0

func (h *DefaultPanicHandler) HandlePanic(ctx context.Context, runnerName string, workerID int, panicInfo any, stackTrace []byte)

HandlePanic prints panic information to stdout.

type DefaultRejectedTaskHandler added in v0.3.0

type DefaultRejectedTaskHandler struct{}

DefaultRejectedTaskHandler provides a basic handler that logs rejected tasks.

func (*DefaultRejectedTaskHandler) HandleRejectedTask added in v0.3.0

func (h *DefaultRejectedTaskHandler) HandleRejectedTask(runnerName string, reason string)

HandleRejectedTask logs the rejected task.

type DelayManager

type DelayManager struct {
	// contains filtered or unexported fields
}

func NewDelayManager

func NewDelayManager() *DelayManager

func (*DelayManager) AddDelayedTask

func (dm *DelayManager) AddDelayedTask(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)

func (*DelayManager) Stop

func (dm *DelayManager) Stop()

func (*DelayManager) TaskCount

func (dm *DelayManager) TaskCount() int

type DelayedTask

type DelayedTask struct {
	RunAt  time.Time
	Task   Task
	Traits TaskTraits
	Target TaskRunner
	// contains filtered or unexported fields
}

DelayedTask represents a task scheduled for the future

type DelayedTaskHeap

type DelayedTaskHeap []*DelayedTask

DelayedTaskHeap implements heap.Interface

func (DelayedTaskHeap) Len

func (h DelayedTaskHeap) Len() int

func (DelayedTaskHeap) Less

func (h DelayedTaskHeap) Less(i, j int) bool

func (*DelayedTaskHeap) Peek

func (h *DelayedTaskHeap) Peek() *DelayedTask

func (*DelayedTaskHeap) Pop

func (h *DelayedTaskHeap) Pop() any

func (*DelayedTaskHeap) Push

func (h *DelayedTaskHeap) Push(x any)

func (DelayedTaskHeap) Swap

func (h DelayedTaskHeap) Swap(i, j int)

type FIFOTaskQueue

type FIFOTaskQueue struct {
	// contains filtered or unexported fields
}

func NewFIFOTaskQueue

func NewFIFOTaskQueue() *FIFOTaskQueue

func (*FIFOTaskQueue) Clear

func (q *FIFOTaskQueue) Clear()

Clear removes all tasks from the queue and releases references

func (*FIFOTaskQueue) IsEmpty

func (q *FIFOTaskQueue) IsEmpty() bool

func (*FIFOTaskQueue) Len

func (q *FIFOTaskQueue) Len() int

func (*FIFOTaskQueue) MaybeCompact

func (q *FIFOTaskQueue) MaybeCompact()

func (*FIFOTaskQueue) PeekTraits

func (q *FIFOTaskQueue) PeekTraits() (TaskTraits, bool)

func (*FIFOTaskQueue) Pop

func (q *FIFOTaskQueue) Pop() (TaskItem, bool)

func (*FIFOTaskQueue) PopUpTo

func (q *FIFOTaskQueue) PopUpTo(max int) []TaskItem

func (*FIFOTaskQueue) Push

func (q *FIFOTaskQueue) Push(t Task, traits TaskTraits)

func (*FIFOTaskQueue) PushWithID added in v0.3.0

func (q *FIFOTaskQueue) PushWithID(t Task, traits TaskTraits) TaskID

type Metrics added in v0.3.0

type Metrics interface {
	// RecordTaskDuration records how long a task took to execute.
	//
	// Parameters:
	// - runnerName: The name of the task runner
	// - priority: The task priority
	// - duration: How long the task took to execute
	RecordTaskDuration(runnerName string, priority TaskPriority, duration time.Duration)

	// RecordTaskPanic records that a task panicked during execution.
	//
	// Parameters:
	// - runnerName: The name of the task runner
	// - panicInfo: The panic value recovered from the task
	RecordTaskPanic(runnerName string, panicInfo any)

	// RecordQueueDepth records the current queue depth.
	// This can be called periodically to track queue growth/shrinkage.
	//
	// Parameters:
	// - runnerName: The name of the task runner
	// - depth: The current number of tasks in the queue
	RecordQueueDepth(runnerName string, depth int)

	// RecordTaskRejected records that a task was rejected (e.g., during shutdown).
	//
	// Parameters:
	// - runnerName: The name of the task runner
	// - reason: Why the task was rejected
	RecordTaskRejected(runnerName string, reason string)
}

Metrics defines the interface for collecting task execution metrics. Implementations can send metrics to monitoring systems (Prometheus, StatsD, etc.).

All methods are optional; implementations should handle nil receivers gracefully. Methods should be non-blocking and fast to avoid impacting task execution performance.

type NilMetrics added in v0.3.0

type NilMetrics struct{}

NilMetrics provides a no-op metrics implementation that does nothing. This is the default when no metrics interface is provided.

func (*NilMetrics) RecordQueueDepth added in v0.3.0

func (m *NilMetrics) RecordQueueDepth(runnerName string, depth int)

RecordQueueDepth is a no-op.

func (*NilMetrics) RecordTaskDuration added in v0.3.0

func (m *NilMetrics) RecordTaskDuration(runnerName string, priority TaskPriority, duration time.Duration)

RecordTaskDuration is a no-op.

func (*NilMetrics) RecordTaskPanic added in v0.3.0

func (m *NilMetrics) RecordTaskPanic(runnerName string, panicInfo any)

RecordTaskPanic is a no-op.

func (*NilMetrics) RecordTaskRejected added in v0.3.0

func (m *NilMetrics) RecordTaskRejected(runnerName string, reason string)

RecordTaskRejected is a no-op.

type PanicHandler added in v0.3.0

type PanicHandler interface {
	// HandlePanic is called when a task panics.
	//
	// Parameters:
	// - ctx: The context from the panicked task (may contain task runner info)
	// - runnerName: The name of the task runner where the panic occurred
	// - workerID: The ID of the worker (for thread pool workers, -1 for single-threaded runners)
	// - panicInfo: The panic value recovered from the task
	// - stackTrace: The stack trace at the time of panic
	HandlePanic(ctx context.Context, runnerName string, workerID int, panicInfo any, stackTrace []byte)
}

PanicHandler is called when a task panics during execution. This allows custom panic handling, logging, and recovery strategies.

Implementations should be thread-safe as they may be called concurrently.

type ParallelTaskRunner added in v0.3.0

type ParallelTaskRunner struct {
	// contains filtered or unexported fields
}

ParallelTaskRunner executes up to maxConcurrency tasks simultaneously. Tasks are queued with priority support and executed as slots become available.

func NewParallelTaskRunner added in v0.3.0

func NewParallelTaskRunner(threadPool ThreadPool, maxConcurrency int) *ParallelTaskRunner

NewParallelTaskRunner creates a new ParallelTaskRunner with the specified concurrency limit. Panics if threadPool is nil or maxConcurrency is out of valid range [1, 10000].

func (*ParallelTaskRunner) FlushAsync added in v0.3.0

func (r *ParallelTaskRunner) FlushAsync(callback func())

FlushAsync posts a barrier task that executes callback when all prior tasks complete. This is a non-blocking alternative to WaitIdle.

The callback will execute after all tasks posted before FlushAsync() have completed. Tasks posted after FlushAsync() will not run before the callback.

Implementation note: This posts a special barrier task to the queue. When the scheduler encounters the barrier, it waits for all currently running tasks to complete before executing the callback. This provides true barrier semantics.

Example:

runner.PostTask(task1)
runner.PostTask(task2)
runner.FlushAsync(func() {
    // This runs after task1 and task2 complete
    fmt.Println("task1 and task2 completed!")
})
runner.PostTask(task3)  // Will NOT run before the callback

func (*ParallelTaskRunner) GetThreadPool added in v0.3.0

func (r *ParallelTaskRunner) GetThreadPool() ThreadPool

GetThreadPool returns the underlying ThreadPool used by this runner

func (*ParallelTaskRunner) IsClosed added in v0.3.0

func (r *ParallelTaskRunner) IsClosed() bool

IsClosed returns true if the runner has been shut down.

func (*ParallelTaskRunner) MaxConcurrency added in v0.3.0

func (r *ParallelTaskRunner) MaxConcurrency() int

MaxConcurrency returns the maximum number of concurrent tasks.

func (*ParallelTaskRunner) Metadata added in v0.3.0

func (r *ParallelTaskRunner) Metadata() map[string]any

Metadata returns the metadata associated with the task runner

func (*ParallelTaskRunner) Name added in v0.3.0

func (r *ParallelTaskRunner) Name() string

Name returns the name of the task runner

func (*ParallelTaskRunner) PendingTaskCount added in v0.3.0

func (r *ParallelTaskRunner) PendingTaskCount() int

PendingTaskCount returns the number of queued tasks waiting to run.

func (*ParallelTaskRunner) PostDelayedTask added in v0.3.0

func (r *ParallelTaskRunner) PostDelayedTask(task Task, delay time.Duration)

PostDelayedTask submits a task to execute after a delay.

func (*ParallelTaskRunner) PostDelayedTaskNamed added in v0.3.0

func (r *ParallelTaskRunner) PostDelayedTaskNamed(name string, task Task, delay time.Duration)

PostDelayedTaskNamed submits a delayed named task.

func (*ParallelTaskRunner) PostDelayedTaskWithTraits added in v0.3.0

func (r *ParallelTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)

PostDelayedTaskWithTraits submits a delayed task with specified traits.

func (*ParallelTaskRunner) PostDelayedTaskWithTraitsNamed added in v0.3.0

func (r *ParallelTaskRunner) PostDelayedTaskWithTraitsNamed(name string, task Task, delay time.Duration, traits TaskTraits)

PostDelayedTaskWithTraitsNamed submits a delayed named task with specified traits.

func (*ParallelTaskRunner) PostRepeatingTask added in v0.3.0

func (r *ParallelTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle

PostRepeatingTask submits a repeating task

func (*ParallelTaskRunner) PostRepeatingTaskWithInitialDelay added in v0.3.0

func (r *ParallelTaskRunner) PostRepeatingTaskWithInitialDelay(
	task Task,
	initialDelay, interval time.Duration,
	traits TaskTraits,
) RepeatingTaskHandle

PostRepeatingTaskWithInitialDelay submits a repeating task with an initial delay

func (*ParallelTaskRunner) PostRepeatingTaskWithTraits added in v0.3.0

func (r *ParallelTaskRunner) PostRepeatingTaskWithTraits(
	task Task,
	interval time.Duration,
	traits TaskTraits,
) RepeatingTaskHandle

PostRepeatingTaskWithTraits submits a repeating task with specific traits

func (*ParallelTaskRunner) PostTask added in v0.3.0

func (r *ParallelTaskRunner) PostTask(task Task)

PostTask submits a task with default traits.

func (*ParallelTaskRunner) PostTaskAndReply added in v0.3.0

func (r *ParallelTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)

PostTaskAndReply executes task on this runner, then posts reply to replyRunner.

func (*ParallelTaskRunner) PostTaskAndReplyWithTraits added in v0.3.0

func (r *ParallelTaskRunner) PostTaskAndReplyWithTraits(
	task Task,
	taskTraits TaskTraits,
	reply Task,
	replyTraits TaskTraits,
	replyRunner TaskRunner,
)

PostTaskAndReplyWithTraits allows specifying different traits for task and reply.

func (*ParallelTaskRunner) PostTaskNamed added in v0.3.0

func (r *ParallelTaskRunner) PostTaskNamed(name string, task Task)

PostTaskNamed submits a task with a caller-provided display name.

func (*ParallelTaskRunner) PostTaskWithTraits added in v0.3.0

func (r *ParallelTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)

PostTaskWithTraits submits a task with specified traits.

func (*ParallelTaskRunner) PostTaskWithTraitsNamed added in v0.3.0

func (r *ParallelTaskRunner) PostTaskWithTraitsNamed(name string, task Task, traits TaskTraits)

PostTaskWithTraitsNamed submits a named task with specified traits.

func (*ParallelTaskRunner) RunningTaskCount added in v0.3.0

func (r *ParallelTaskRunner) RunningTaskCount() int

RunningTaskCount returns the number of currently executing tasks.

func (*ParallelTaskRunner) SetMetadata added in v0.3.0

func (r *ParallelTaskRunner) SetMetadata(key string, value any)

SetMetadata sets a metadata key-value pair

func (*ParallelTaskRunner) SetName added in v0.3.0

func (r *ParallelTaskRunner) SetName(name string)

SetName sets the name of the task runner

func (*ParallelTaskRunner) Shutdown added in v0.3.0

func (r *ParallelTaskRunner) Shutdown()

Shutdown marks the runner as closed and clears all pending tasks. This method is non-blocking and can be safely called from within a task.

Shutdown does NOT interrupt currently executing tasks - they will run to completion. However, no new tasks will be started from the queue after Shutdown is called.

func (*ParallelTaskRunner) Stats added in v0.3.0

func (r *ParallelTaskRunner) Stats() RunnerStats

Stats returns current observability data for this runner.

func (*ParallelTaskRunner) WaitIdle added in v0.3.0

func (r *ParallelTaskRunner) WaitIdle(ctx context.Context) error

WaitIdle blocks until all currently queued tasks have completed execution.

This method waits until both the queue is empty AND no tasks are currently executing (runningCount == 0).

Returns error if: - Context is cancelled or deadline exceeded - Runner is closed when WaitIdle is called

Note: Tasks posted after WaitIdle is called are not waited for.

func (*ParallelTaskRunner) WaitShutdown added in v0.3.0

func (r *ParallelTaskRunner) WaitShutdown(ctx context.Context) error

WaitShutdown blocks until Shutdown() is called on this runner. Returns error if context is cancelled.

type PoolStats added in v0.3.0

type PoolStats struct {
	ID      string
	Workers int
	Queued  int
	Active  int
	Delayed int
	Running bool
}

PoolStats represents runtime observability state for a thread pool.

type PriorityTaskQueue

type PriorityTaskQueue struct {
	// contains filtered or unexported fields
}

func NewPriorityTaskQueue

func NewPriorityTaskQueue() *PriorityTaskQueue

func (*PriorityTaskQueue) Clear

func (q *PriorityTaskQueue) Clear()

Clear removes all tasks from the queue and releases references

func (*PriorityTaskQueue) IsEmpty

func (q *PriorityTaskQueue) IsEmpty() bool

func (*PriorityTaskQueue) Len

func (q *PriorityTaskQueue) Len() int

func (*PriorityTaskQueue) MaybeCompact

func (q *PriorityTaskQueue) MaybeCompact()

func (*PriorityTaskQueue) PeekTraits

func (q *PriorityTaskQueue) PeekTraits() (TaskTraits, bool)

func (*PriorityTaskQueue) Pop

func (q *PriorityTaskQueue) Pop() (TaskItem, bool)

func (*PriorityTaskQueue) PopUpTo

func (q *PriorityTaskQueue) PopUpTo(max int) []TaskItem

func (*PriorityTaskQueue) Push

func (q *PriorityTaskQueue) Push(t Task, traits TaskTraits)

func (*PriorityTaskQueue) PushWithID added in v0.3.0

func (q *PriorityTaskQueue) PushWithID(t Task, traits TaskTraits) TaskID

type QueuePolicy added in v0.2.0

type QueuePolicy int

QueuePolicy defines how to handle full queue situations

const (
	// QueuePolicyDrop: Silently drop tasks when queue is full (current behavior)
	QueuePolicyDrop QueuePolicy = iota

	// QueuePolicyReject: Call rejection callback when queue is full
	QueuePolicyReject

	// QueuePolicyWait: Block until queue has space or context is done
	QueuePolicyWait
)

type RejectedTaskHandler added in v0.3.0

type RejectedTaskHandler interface {
	// HandleRejectedTask is called when a task is rejected.
	//
	// Parameters:
	// - runnerName: The name of the task runner
	// - reason: Why the task was rejected (e.g., "shutdown", "backpressure")
	HandleRejectedTask(runnerName string, reason string)
}

RejectedTaskHandler is called when a task is rejected by the scheduler. This can happen when: - The scheduler is shutting down - The signal channel is full (backpressure) - The task queue is full (if bounded queues are implemented in the future)

Implementations should be thread-safe as they may be called concurrently.

type RejectionCallback added in v0.2.0

type RejectionCallback func(task Task, traits TaskTraits)

RejectionCallback is called when a task is rejected (QueuePolicyReject mode)

type RepeatingTaskHandle

type RepeatingTaskHandle interface {
	// Stop stops the repeating task. It will not interrupt a currently executing task,
	// but will prevent future executions from being scheduled.
	Stop()

	// IsStopped returns true if the task has been stopped.
	IsStopped() bool
}

RepeatingTaskHandle controls the lifecycle of a repeating task.

type ReplyWithResult

type ReplyWithResult[T any] func(ctx context.Context, result T, err error)

ReplyWithResult defines a reply callback that receives a result of type T and an error. This is the counterpart to TaskWithResult, receiving the values returned by the task.

type RunnerStats added in v0.3.0

type RunnerStats struct {
	Name           string
	Type           string
	Pending        int
	Running        int
	Rejected       int64
	Closed         bool
	BarrierPending bool
}

RunnerStats represents runtime observability state for a task runner.

type SequencedTaskRunner

type SequencedTaskRunner struct {
	// contains filtered or unexported fields
}

func NewSequencedTaskRunner

func NewSequencedTaskRunner(threadPool ThreadPool) *SequencedTaskRunner

func (*SequencedTaskRunner) FlushAsync

func (r *SequencedTaskRunner) FlushAsync(callback func())

FlushAsync posts a barrier task that executes the callback when all prior tasks complete. This is a non-blocking alternative to WaitIdle.

The callback will be executed on this runner's thread, after all tasks posted before FlushAsync have completed.

Example:

runner.PostTask(task1)
runner.PostTask(task2)
runner.FlushAsync(func() {
    fmt.Println("task1 and task2 completed!")
})

func (*SequencedTaskRunner) GetThreadPool

func (r *SequencedTaskRunner) GetThreadPool() ThreadPool

GetThreadPool returns the underlying ThreadPool used by this runner

func (*SequencedTaskRunner) IsClosed

func (r *SequencedTaskRunner) IsClosed() bool

IsClosed returns true if the runner has been shut down.

func (*SequencedTaskRunner) Metadata

func (r *SequencedTaskRunner) Metadata() map[string]any

Metadata returns the metadata associated with the task runner

func (*SequencedTaskRunner) Name

func (r *SequencedTaskRunner) Name() string

Name returns the name of the task runner

func (*SequencedTaskRunner) PendingTaskCount added in v0.3.0

func (r *SequencedTaskRunner) PendingTaskCount() int

PendingTaskCount returns the number of queued tasks waiting to run.

func (*SequencedTaskRunner) PostDelayedTask

func (r *SequencedTaskRunner) PostDelayedTask(task Task, delay time.Duration)

PostDelayedTask submits a task to execute after a delay

func (*SequencedTaskRunner) PostDelayedTaskNamed added in v0.3.0

func (r *SequencedTaskRunner) PostDelayedTaskNamed(name string, task Task, delay time.Duration)

PostDelayedTaskNamed submits a delayed named task.

func (*SequencedTaskRunner) PostDelayedTaskWithTraits

func (r *SequencedTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)

PostDelayedTaskWithTraits submits a delayed task with specified traits

func (*SequencedTaskRunner) PostDelayedTaskWithTraitsNamed added in v0.3.0

func (r *SequencedTaskRunner) PostDelayedTaskWithTraitsNamed(name string, task Task, delay time.Duration, traits TaskTraits)

PostDelayedTaskWithTraitsNamed submits a delayed named task with specified traits.

func (*SequencedTaskRunner) PostRepeatingTask

func (r *SequencedTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle

PostRepeatingTask submits a task that repeats at a fixed interval

func (*SequencedTaskRunner) PostRepeatingTaskWithInitialDelay

func (r *SequencedTaskRunner) PostRepeatingTaskWithInitialDelay(
	task Task,
	initialDelay, interval time.Duration,
	traits TaskTraits,
) RepeatingTaskHandle

PostRepeatingTaskWithInitialDelay submits a repeating task with an initial delay The task will first execute after initialDelay, then repeat every interval.

func (*SequencedTaskRunner) PostRepeatingTaskWithTraits

func (r *SequencedTaskRunner) PostRepeatingTaskWithTraits(
	task Task,
	interval time.Duration,
	traits TaskTraits,
) RepeatingTaskHandle

PostRepeatingTaskWithTraits submits a repeating task with specific traits

func (*SequencedTaskRunner) PostTask

func (r *SequencedTaskRunner) PostTask(task Task)

PostTask submits a task with default traits

func (*SequencedTaskRunner) PostTaskAndReply

func (r *SequencedTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)

PostTaskAndReply executes task on this runner, then posts reply to replyRunner. If task panics, reply will not be executed.

func (*SequencedTaskRunner) PostTaskAndReplyWithTraits

func (r *SequencedTaskRunner) PostTaskAndReplyWithTraits(
	task Task,
	taskTraits TaskTraits,
	reply Task,
	replyTraits TaskTraits,
	replyRunner TaskRunner,
)

PostTaskAndReplyWithTraits allows specifying different traits for task and reply. This is useful when task is background work (BestEffort) but reply is UI update (UserVisible).

func (*SequencedTaskRunner) PostTaskNamed added in v0.3.0

func (r *SequencedTaskRunner) PostTaskNamed(name string, task Task)

PostTaskNamed submits a task with a caller-provided display name.

func (*SequencedTaskRunner) PostTaskWithTraits

func (r *SequencedTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)

PostTaskWithTraits submits a task with specified traits

func (*SequencedTaskRunner) PostTaskWithTraitsNamed added in v0.3.0

func (r *SequencedTaskRunner) PostTaskWithTraitsNamed(name string, task Task, traits TaskTraits)

PostTaskWithTraitsNamed submits a named task with specified traits.

func (*SequencedTaskRunner) RunningTaskCount added in v0.3.0

func (r *SequencedTaskRunner) RunningTaskCount() int

RunningTaskCount returns 1 if runLoop is running/scheduled, otherwise 0.

func (*SequencedTaskRunner) SetMetadata

func (r *SequencedTaskRunner) SetMetadata(key string, value any)

SetMetadata sets a metadata key-value pair

func (*SequencedTaskRunner) SetName

func (r *SequencedTaskRunner) SetName(name string)

SetName sets the name of the task runner

func (*SequencedTaskRunner) Shutdown

func (r *SequencedTaskRunner) Shutdown()

Shutdown gracefully stops the runner by: 1. Marking it as closed (stops accepting new tasks) 2. Clearing all pending tasks in the queue 3. All repeating tasks will automatically stop on their next execution 4. Signaling all WaitShutdown() waiters

Note: This method is non-blocking and can be safely called from within a task. Note: This will not interrupt currently executing tasks.

func (*SequencedTaskRunner) Stats added in v0.3.0

func (r *SequencedTaskRunner) Stats() RunnerStats

Stats returns current observability data for this runner.

func (*SequencedTaskRunner) WaitIdle

func (r *SequencedTaskRunner) WaitIdle(ctx context.Context) error

WaitIdle blocks until all currently queued tasks have completed execution. This is implemented by posting a barrier task and waiting for it to execute.

Due to the sequential nature of SequencedTaskRunner, when the barrier task executes, all tasks posted before WaitIdle are guaranteed to have completed.

Returns error if: - Context is cancelled or deadline exceeded - Runner is closed when WaitIdle is called

Note: Tasks posted after WaitIdle is called are not waited for. Note: Repeating tasks will continue to repeat and are not waited for.

func (*SequencedTaskRunner) WaitShutdown

func (r *SequencedTaskRunner) WaitShutdown(ctx context.Context) error

WaitShutdown blocks until Shutdown() is called on this runner.

This is useful for waiting for the runner to be shut down, either by an external caller or by a task running on the runner itself.

Returns error if context is cancelled or deadline exceeded.

Example:

// Task shuts down the runner when condition is met
runner.PostTask(func(ctx context.Context) {
    if conditionMet() {
        me := GetCurrentTaskRunner(ctx)
        me.Shutdown()
    }
})

// Main thread waits for shutdown
runner.WaitShutdown(context.Background())

type SingleThreadTaskRunner

type SingleThreadTaskRunner struct {
	// contains filtered or unexported fields
}

SingleThreadTaskRunner binds a dedicated Goroutine to execute tasks sequentially. It guarantees that all tasks submitted to it run on the same Goroutine (Thread Affinity).

Use cases: 1. Blocking IO operations (e.g., NetworkReceiver) 2. CGO calls that require Thread Local Storage 3. Simulating Main Thread / UI Thread behavior

Key differences from SequencedTaskRunner: - SequencedTaskRunner: Tasks execute sequentially but may run on different worker goroutines - SingleThreadTaskRunner: Tasks execute sequentially AND always on the same dedicated goroutine

func NewSingleThreadTaskRunner

func NewSingleThreadTaskRunner() *SingleThreadTaskRunner

NewSingleThreadTaskRunner creates and starts a new SingleThreadTaskRunner. It immediately spawns a dedicated goroutine for task execution.

func (*SingleThreadTaskRunner) FlushAsync

func (r *SingleThreadTaskRunner) FlushAsync(callback func())

FlushAsync posts a barrier task that executes the callback when all prior tasks complete. This is a non-blocking alternative to WaitIdle.

The callback will be executed on this runner's dedicated goroutine, after all tasks posted before FlushAsync have completed.

Example:

runner.PostTask(task1)
runner.PostTask(task2)
runner.FlushAsync(func() {
    fmt.Println("task1 and task2 completed!")
})

func (*SingleThreadTaskRunner) GetQueuePolicy added in v0.2.0

func (r *SingleThreadTaskRunner) GetQueuePolicy() QueuePolicy

GetQueuePolicy returns the current queue policy

func (*SingleThreadTaskRunner) GetThreadPool

func (r *SingleThreadTaskRunner) GetThreadPool() ThreadPool

GetThreadPool returns nil because SingleThreadTaskRunner doesn't use a thread pool

func (*SingleThreadTaskRunner) IsClosed

func (r *SingleThreadTaskRunner) IsClosed() bool

IsClosed returns true if the runner has been stopped

func (*SingleThreadTaskRunner) Metadata

func (r *SingleThreadTaskRunner) Metadata() map[string]any

Metadata returns the metadata associated with the task runner

func (*SingleThreadTaskRunner) Name

func (r *SingleThreadTaskRunner) Name() string

Name returns the name of the task runner

func (*SingleThreadTaskRunner) PendingTaskCount added in v0.3.0

func (r *SingleThreadTaskRunner) PendingTaskCount() int

PendingTaskCount returns the number of queued tasks waiting to run.

func (*SingleThreadTaskRunner) PostDelayedTask

func (r *SingleThreadTaskRunner) PostDelayedTask(task Task, delay time.Duration)

PostDelayedTask submits a delayed task

func (*SingleThreadTaskRunner) PostDelayedTaskWithTraits

func (r *SingleThreadTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)

PostDelayedTaskWithTraits submits a delayed task with traits. Uses time.AfterFunc which is independent of the global TaskScheduler, ensuring IO-related timers are not affected by scheduler load.

func (*SingleThreadTaskRunner) PostRepeatingTask

func (r *SingleThreadTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle

PostRepeatingTask submits a task that repeats at a fixed interval

func (*SingleThreadTaskRunner) PostRepeatingTaskWithInitialDelay

func (r *SingleThreadTaskRunner) PostRepeatingTaskWithInitialDelay(
	task Task,
	initialDelay, interval time.Duration,
	traits TaskTraits,
) RepeatingTaskHandle

PostRepeatingTaskWithInitialDelay submits a repeating task with an initial delay

func (*SingleThreadTaskRunner) PostRepeatingTaskWithTraits

func (r *SingleThreadTaskRunner) PostRepeatingTaskWithTraits(
	task Task,
	interval time.Duration,
	traits TaskTraits,
) RepeatingTaskHandle

PostRepeatingTaskWithTraits submits a repeating task with traits

func (*SingleThreadTaskRunner) PostTask

func (r *SingleThreadTaskRunner) PostTask(task Task)

PostTask submits a task for execution

func (*SingleThreadTaskRunner) PostTaskAndReply

func (r *SingleThreadTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)

PostTaskAndReply executes task on this runner, then posts reply to replyRunner. If task panics, reply will not be executed. Both task and reply will execute on the same dedicated goroutine if replyRunner is this runner.

func (*SingleThreadTaskRunner) PostTaskAndReplyWithTraits

func (r *SingleThreadTaskRunner) PostTaskAndReplyWithTraits(
	task Task,
	taskTraits TaskTraits,
	reply Task,
	replyTraits TaskTraits,
	replyRunner TaskRunner,
)

PostTaskAndReplyWithTraits allows specifying different traits for task and reply. This is useful when task is background work (BestEffort) but reply is UI update (UserVisible). Note: For SingleThreadTaskRunner, traits don't affect execution order since all tasks run sequentially on the same goroutine, but they may be used for logging or metrics.

func (*SingleThreadTaskRunner) PostTaskNamed added in v0.3.0

func (r *SingleThreadTaskRunner) PostTaskNamed(name string, task Task)

PostTaskNamed submits a task with a caller-provided display name.

func (*SingleThreadTaskRunner) PostTaskWithTraits

func (r *SingleThreadTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)

PostTaskWithTraits submits a task with traits (traits are ignored for single-threaded execution) The behavior when the queue is full depends on the configured QueuePolicy: - QueuePolicyDrop: Silently drops the task (default) - QueuePolicyReject: Calls the rejection callback if set - QueuePolicyWait: Blocks until queue has space or context is done

func (*SingleThreadTaskRunner) PostTaskWithTraitsNamed added in v0.3.0

func (r *SingleThreadTaskRunner) PostTaskWithTraitsNamed(name string, task Task, traits TaskTraits)

PostTaskWithTraitsNamed submits a named task with traits.

func (*SingleThreadTaskRunner) RejectedCount added in v0.2.0

func (r *SingleThreadTaskRunner) RejectedCount() int64

RejectedCount returns the number of tasks that have been rejected due to full queue Only incremented when QueuePolicy is QueuePolicyReject

func (*SingleThreadTaskRunner) RunningTaskCount added in v0.3.0

func (r *SingleThreadTaskRunner) RunningTaskCount() int

RunningTaskCount returns the number of tasks currently executing.

func (*SingleThreadTaskRunner) SetMetadata

func (r *SingleThreadTaskRunner) SetMetadata(key string, value any)

SetMetadata sets a metadata key-value pair

func (*SingleThreadTaskRunner) SetName

func (r *SingleThreadTaskRunner) SetName(name string)

SetName sets the name of the task runner

func (*SingleThreadTaskRunner) SetQueuePolicy added in v0.2.0

func (r *SingleThreadTaskRunner) SetQueuePolicy(policy QueuePolicy)

SetQueuePolicy sets the policy for handling full queue situations

func (*SingleThreadTaskRunner) SetRejectionCallback added in v0.2.0

func (r *SingleThreadTaskRunner) SetRejectionCallback(callback RejectionCallback)

SetRejectionCallback sets the callback to be called when a task is rejected Only used when QueuePolicy is set to QueuePolicyReject

func (*SingleThreadTaskRunner) Shutdown

func (r *SingleThreadTaskRunner) Shutdown()

Shutdown marks the runner as closed and signals shutdown waiters. Unlike Stop(), this method does NOT immediately terminate the runLoop. This allows tasks to call Shutdown() from within themselves.

After calling Shutdown(): - WaitShutdown() will return - IsClosed() will return true - New tasks posted will be ignored - Existing queued tasks will still execute - Call Stop() to actually terminate the runLoop

func (*SingleThreadTaskRunner) Stats added in v0.3.0

Stats returns current observability data for this runner.

func (*SingleThreadTaskRunner) Stop

func (r *SingleThreadTaskRunner) Stop()

Stop stops the runner and releases resources

func (*SingleThreadTaskRunner) WaitIdle

func (r *SingleThreadTaskRunner) WaitIdle(ctx context.Context) error

WaitIdle blocks until all currently queued tasks have completed execution. This is implemented by posting a barrier task and waiting for it to execute.

Since SingleThreadTaskRunner executes tasks sequentially on a dedicated goroutine, when the barrier task executes, all tasks posted before WaitIdle are guaranteed to have completed.

Returns error if: - Context is cancelled or deadline exceeded - Runner is closed when WaitIdle is called

Note: Tasks posted after WaitIdle is called are not waited for. Note: Repeating tasks will continue to repeat and are not waited for.

func (*SingleThreadTaskRunner) WaitShutdown

func (r *SingleThreadTaskRunner) WaitShutdown(ctx context.Context) error

WaitShutdown blocks until Shutdown() is called on this runner.

This is useful for waiting for the runner to be shut down, either by an external caller or by a task running on the runner itself.

Returns error if context is cancelled or deadline exceeded.

Example:

// IO thread: receives messages and posts shutdown when condition met
ioRunner.PostTask(func(ctx context.Context) {
    for {
        msg := receiveMessage()
        mainRunner.PostTask(func(ctx context.Context) {
            if shouldShutdown(msg) {
                me := GetCurrentTaskRunner(ctx)
                me.Shutdown()
            }
        })
    }
})

// Main thread waits for shutdown
mainRunner.WaitShutdown(context.Background())

type Task

type Task func(ctx context.Context)

Task is the unit of work (Closure)

type TaskID added in v0.3.0

type TaskID uuid.UUID

TaskID is a unique identifier for tasks, using UUID for guaranteed uniqueness.

func GenerateTaskID added in v0.3.0

func GenerateTaskID() TaskID

GenerateTaskID creates a new unique TaskID.

func (TaskID) IsZero added in v0.3.0

func (id TaskID) IsZero() bool

IsZero returns true if the TaskID is the zero UUID.

func (TaskID) String added in v0.3.0

func (id TaskID) String() string

String returns the string representation of the TaskID.

type TaskItem

type TaskItem struct {
	Task   Task
	Traits TaskTraits
	ID     TaskID // Unique identifier for the task
}

type TaskPriority

type TaskPriority int
const (
	// TaskPriorityBestEffort: Lowest priority
	TaskPriorityBestEffort TaskPriority = iota

	// TaskPriorityUserVisible: Default priority
	TaskPriorityUserVisible

	// TaskPriorityUserBlocking: Highest priority
	// `UserBlocking` means the task may block the main thread.
	// If main thread is blocked, the UI will be unresponsive.
	// The user experience will be affected if the task blocks the main thread.
	TaskPriorityUserBlocking
)

type TaskQueue

type TaskQueue interface {
	Push(t Task, traits TaskTraits)
	PushWithID(t Task, traits TaskTraits) TaskID // Push with specific TaskID
	Pop() (TaskItem, bool)
	PopUpTo(max int) []TaskItem
	PeekTraits() (TaskTraits, bool)
	Len() int
	IsEmpty() bool
	MaybeCompact()
	Clear() // Clear all tasks from the queue
}

TaskQueue defines the interface for different queue implementations

type TaskRunner

type TaskRunner interface {
	PostTask(task Task)
	PostTaskWithTraits(task Task, traits TaskTraits)
	PostDelayedTask(task Task, delay time.Duration)

	// [v2.1 New] Support delayed tasks with specific traits
	PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)

	// PostRepeatingTask submits a task that repeats at a fixed interval.
	// The interval is measured from the end of one execution to the start of the next
	// (fixed-delay semantics), not from start-to-start (fixed-rate).
	PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
	PostRepeatingTaskWithTraits(task Task, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
	PostRepeatingTaskWithInitialDelay(task Task, initialDelay, interval time.Duration, traits TaskTraits) RepeatingTaskHandle

	// [v2.3 New] Support task and reply pattern
	// PostTaskAndReply executes task on this runner, then posts reply to replyRunner
	PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
	// PostTaskAndReplyWithTraits allows specifying traits for both task and reply
	PostTaskAndReplyWithTraits(task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, replyRunner TaskRunner)

	// [v2.4 New] Synchronization and lifecycle management
	// WaitIdle blocks until all currently queued tasks have completed execution
	// Tasks posted after WaitIdle is called are not waited for
	// Returns error if context is cancelled or runner is closed
	WaitIdle(ctx context.Context) error

	// FlushAsync posts a barrier task that executes callback when all prior tasks complete
	// This is a non-blocking alternative to WaitIdle
	FlushAsync(callback func())

	// WaitShutdown blocks until Shutdown() is called on this runner
	// Returns error if context is cancelled
	WaitShutdown(ctx context.Context) error

	// Shutdown marks the runner as closed and clears all pending tasks
	// This method is non-blocking and can be safely called from within a task
	Shutdown()

	// IsClosed returns true if the runner has been shut down
	IsClosed() bool

	// [v2.5 New] Identification and Metadata
	// Name returns the name of the task runner
	Name() string
	// Metadata returns the metadata associated with the task runner
	Metadata() map[string]any

	// [v2.6 New] Thread Pool Access
	// GetThreadPool returns the underlying ThreadPool used by this runner
	// Returns nil for runners that don't use a thread pool (e.g., SingleThreadTaskRunner)
	GetThreadPool() ThreadPool
}

func GetCurrentTaskRunner

func GetCurrentTaskRunner(ctx context.Context) TaskRunner

type TaskScheduler

type TaskScheduler struct {
	// contains filtered or unexported fields
}

func NewFIFOTaskScheduler

func NewFIFOTaskScheduler(workerCount int) *TaskScheduler

func NewFIFOTaskSchedulerWithConfig added in v0.3.0

func NewFIFOTaskSchedulerWithConfig(workerCount int, config *TaskSchedulerConfig) *TaskScheduler

func NewPriorityTaskScheduler

func NewPriorityTaskScheduler(workerCount int) *TaskScheduler

func NewPriorityTaskSchedulerWithConfig added in v0.3.0

func NewPriorityTaskSchedulerWithConfig(workerCount int, config *TaskSchedulerConfig) *TaskScheduler

func (*TaskScheduler) ActiveTaskCount

func (s *TaskScheduler) ActiveTaskCount() int

func (*TaskScheduler) DelayedTaskCount

func (s *TaskScheduler) DelayedTaskCount() int

func (*TaskScheduler) GetMetrics added in v0.3.0

func (s *TaskScheduler) GetMetrics() Metrics

GetMetrics returns the metrics collector for this scheduler

func (*TaskScheduler) GetPanicHandler added in v0.3.0

func (s *TaskScheduler) GetPanicHandler() PanicHandler

GetPanicHandler returns the panic handler for this scheduler

func (*TaskScheduler) GetWork

func (s *TaskScheduler) GetWork(stopCh <-chan struct{}) (TaskItem, bool)

GetWork (Called by Worker)

func (*TaskScheduler) OnTaskEnd

func (s *TaskScheduler) OnTaskEnd()

func (*TaskScheduler) OnTaskStart

func (s *TaskScheduler) OnTaskStart()

func (*TaskScheduler) PostDelayedInternal

func (s *TaskScheduler) PostDelayedInternal(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)

PostDelayedInternal

func (*TaskScheduler) PostInternal

func (s *TaskScheduler) PostInternal(task Task, traits TaskTraits)

PostInternal

func (*TaskScheduler) QueuedTaskCount

func (s *TaskScheduler) QueuedTaskCount() int

func (*TaskScheduler) Shutdown

func (s *TaskScheduler) Shutdown()

func (*TaskScheduler) ShutdownGraceful added in v0.2.0

func (s *TaskScheduler) ShutdownGraceful(timeout time.Duration) error

ShutdownGraceful waits for all queued and active tasks to complete Returns error if timeout is exceeded before tasks complete

func (*TaskScheduler) WorkerCount

func (s *TaskScheduler) WorkerCount() int

Metrics

type TaskSchedulerConfig added in v0.3.0

type TaskSchedulerConfig struct {
	// PanicHandler is called when a task panics. Defaults to DefaultPanicHandler.
	PanicHandler PanicHandler

	// Metrics is called to record task execution metrics. Defaults to NilMetrics.
	Metrics Metrics

	// RejectedTaskHandler is called when a task is rejected. Defaults to DefaultRejectedTaskHandler.
	RejectedTaskHandler RejectedTaskHandler
}

TaskSchedulerConfig holds configuration options for TaskScheduler. All handlers are optional; if not provided, default implementations will be used.

Example
// Create custom handlers
panicHandler := &DefaultPanicHandler{}
metrics := &NilMetrics{}
rejectedHandler := &DefaultRejectedTaskHandler{}

// Create config
config := &TaskSchedulerConfig{
	PanicHandler:        panicHandler,
	Metrics:             metrics,
	RejectedTaskHandler: rejectedHandler,
}

// Create scheduler with config
_ = NewFIFOTaskSchedulerWithConfig(4, config)

func DefaultTaskSchedulerConfig added in v0.3.0

func DefaultTaskSchedulerConfig() *TaskSchedulerConfig

DefaultTaskSchedulerConfig returns a config with default handlers.

Example
// Use default config
config := DefaultTaskSchedulerConfig()

// Create scheduler with default config
_ = NewFIFOTaskSchedulerWithConfig(4, config)

type TaskTraits

type TaskTraits struct {
	Priority TaskPriority
}

func DefaultTaskTraits

func DefaultTaskTraits() TaskTraits

func TraitsBestEffort

func TraitsBestEffort() TaskTraits

func TraitsUserBlocking

func TraitsUserBlocking() TaskTraits

func TraitsUserVisible

func TraitsUserVisible() TaskTraits

type TaskWithResult

type TaskWithResult[T any] func(ctx context.Context) (T, error)

TaskWithResult defines a task that returns a result of type T and an error. This is used with PostTaskAndReplyWithResult to pass data from task to reply.

type ThreadPool

type ThreadPool interface {
	PostInternal(task Task, traits TaskTraits)
	PostDelayedInternal(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)

	Start(ctx context.Context)
	Stop()

	ID() string
	IsRunning() bool

	WorkerCount() int
	QueuedTaskCount() int  // In queue
	ActiveTaskCount() int  // Executing
	DelayedTaskCount() int // Delayed
}

============================================================================= ThreadPool: Define task execution interface =============================================================================

type WorkSource

type WorkSource interface {
	GetWork(stopCh <-chan struct{}) (TaskItem, bool)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL