Documentation
¶
Index ¶
- func PostDelayedTaskAndReplyWithResult[T any](targetRunner TaskRunner, task TaskWithResult[T], delay time.Duration, ...)
- func PostDelayedTaskAndReplyWithResultAndTraits[T any](targetRunner TaskRunner, task TaskWithResult[T], delay time.Duration, ...)
- func PostTaskAndReplyWithResult[T any](targetRunner TaskRunner, task TaskWithResult[T], reply ReplyWithResult[T], ...)
- func PostTaskAndReplyWithResultAndTraits[T any](targetRunner TaskRunner, task TaskWithResult[T], taskTraits TaskTraits, ...)
- type DelayManager
- type DelayedTask
- type DelayedTaskHeap
- type FIFOTaskQueue
- func (q *FIFOTaskQueue) Clear()
- func (q *FIFOTaskQueue) IsEmpty() bool
- func (q *FIFOTaskQueue) Len() int
- func (q *FIFOTaskQueue) MaybeCompact()
- func (q *FIFOTaskQueue) PeekTraits() (TaskTraits, bool)
- func (q *FIFOTaskQueue) Pop() (TaskItem, bool)
- func (q *FIFOTaskQueue) PopUpTo(max int) []TaskItem
- func (q *FIFOTaskQueue) Push(t Task, traits TaskTraits)
- type PriorityTaskQueue
- func (q *PriorityTaskQueue) Clear()
- func (q *PriorityTaskQueue) IsEmpty() bool
- func (q *PriorityTaskQueue) Len() int
- func (q *PriorityTaskQueue) MaybeCompact()
- func (q *PriorityTaskQueue) PeekTraits() (TaskTraits, bool)
- func (q *PriorityTaskQueue) Pop() (TaskItem, bool)
- func (q *PriorityTaskQueue) PopUpTo(max int) []TaskItem
- func (q *PriorityTaskQueue) Push(t Task, traits TaskTraits)
- type RepeatingTaskHandle
- type ReplyWithResult
- type SequencedTaskRunner
- func (r *SequencedTaskRunner) FlushAsync(callback func())
- func (r *SequencedTaskRunner) GetThreadPool() ThreadPool
- func (r *SequencedTaskRunner) IsClosed() bool
- func (r *SequencedTaskRunner) Metadata() map[string]any
- func (r *SequencedTaskRunner) Name() string
- func (r *SequencedTaskRunner) PostDelayedTask(task Task, delay time.Duration)
- func (r *SequencedTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
- func (r *SequencedTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
- func (r *SequencedTaskRunner) PostRepeatingTaskWithInitialDelay(task Task, initialDelay, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *SequencedTaskRunner) PostRepeatingTaskWithTraits(task Task, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *SequencedTaskRunner) PostTask(task Task)
- func (r *SequencedTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
- func (r *SequencedTaskRunner) PostTaskAndReplyWithTraits(task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, ...)
- func (r *SequencedTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
- func (r *SequencedTaskRunner) SetMetadata(key string, value interface{})
- func (r *SequencedTaskRunner) SetName(name string)
- func (r *SequencedTaskRunner) Shutdown()
- func (r *SequencedTaskRunner) WaitIdle(ctx context.Context) error
- func (r *SequencedTaskRunner) WaitShutdown(ctx context.Context) error
- type SingleThreadTaskRunner
- func (r *SingleThreadTaskRunner) FlushAsync(callback func())
- func (r *SingleThreadTaskRunner) GetThreadPool() ThreadPool
- func (r *SingleThreadTaskRunner) IsClosed() bool
- func (r *SingleThreadTaskRunner) Metadata() map[string]interface{}
- func (r *SingleThreadTaskRunner) Name() string
- func (r *SingleThreadTaskRunner) PostDelayedTask(task Task, delay time.Duration)
- func (r *SingleThreadTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
- func (r *SingleThreadTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
- func (r *SingleThreadTaskRunner) PostRepeatingTaskWithInitialDelay(task Task, initialDelay, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *SingleThreadTaskRunner) PostRepeatingTaskWithTraits(task Task, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
- func (r *SingleThreadTaskRunner) PostTask(task Task)
- func (r *SingleThreadTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
- func (r *SingleThreadTaskRunner) PostTaskAndReplyWithTraits(task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, ...)
- func (r *SingleThreadTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
- func (r *SingleThreadTaskRunner) SetMetadata(key string, value interface{})
- func (r *SingleThreadTaskRunner) SetName(name string)
- func (r *SingleThreadTaskRunner) Shutdown()
- func (r *SingleThreadTaskRunner) Stop()
- func (r *SingleThreadTaskRunner) WaitIdle(ctx context.Context) error
- func (r *SingleThreadTaskRunner) WaitShutdown(ctx context.Context) error
- type Task
- type TaskItem
- type TaskPriority
- type TaskQueue
- type TaskRunner
- type TaskScheduler
- func (s *TaskScheduler) ActiveTaskCount() int
- func (s *TaskScheduler) DelayedTaskCount() int
- func (s *TaskScheduler) GetWork(stopCh <-chan struct{}) (Task, bool)
- func (s *TaskScheduler) OnTaskEnd()
- func (s *TaskScheduler) OnTaskStart()
- func (s *TaskScheduler) PostDelayedInternal(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)
- func (s *TaskScheduler) PostInternal(task Task, traits TaskTraits)
- func (s *TaskScheduler) QueuedTaskCount() int
- func (s *TaskScheduler) Shutdown()
- func (s *TaskScheduler) WorkerCount() int
- type TaskTraits
- type TaskWithResult
- type ThreadPool
- type WorkSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PostDelayedTaskAndReplyWithResult ¶
func PostDelayedTaskAndReplyWithResult[T any]( targetRunner TaskRunner, task TaskWithResult[T], delay time.Duration, reply ReplyWithResult[T], replyRunner TaskRunner, )
PostDelayedTaskAndReplyWithResult is similar to PostTaskAndReplyWithResult, but delays the execution of the task.
The reply is NOT delayed - it executes immediately after the task completes. Only the initial task execution is delayed by the specified duration.
Example:
PostDelayedTaskAndReplyWithResult(
runner,
func(ctx context.Context) (string, error) {
return "delayed result", nil
},
2*time.Second, // Wait 2 seconds before starting task
func(ctx context.Context, result string, err error) {
fmt.Println(result) // Executes immediately after task completes
},
replyRunner,
)
func PostDelayedTaskAndReplyWithResultAndTraits ¶
func PostDelayedTaskAndReplyWithResultAndTraits[T any]( targetRunner TaskRunner, task TaskWithResult[T], delay time.Duration, taskTraits TaskTraits, reply ReplyWithResult[T], replyTraits TaskTraits, replyRunner TaskRunner, )
PostDelayedTaskAndReplyWithResultAndTraits is the full-featured delayed version with separate traits for task and reply.
func PostTaskAndReplyWithResult ¶
func PostTaskAndReplyWithResult[T any]( targetRunner TaskRunner, task TaskWithResult[T], reply ReplyWithResult[T], replyRunner TaskRunner, )
PostTaskAndReplyWithResult executes a task that returns a result of type T and an error, then passes that result to a reply callback on the replyRunner.
This function uses closure capture to safely pass the result across goroutines. The captured variables (result and err) will escape to the heap, ensuring thread safety.
Execution guarantee (Happens-Before): - The task ALWAYS completes before the reply starts - The reply ALWAYS sees the final values written by the task - This is guaranteed by the sequential execution in wrappedTask
Example:
PostTaskAndReplyWithResult(
backgroundRunner,
func(ctx context.Context) (int, error) {
return len("Hello"), nil
},
func(ctx context.Context, length int, err error) {
fmt.Printf("Length: %d\n", length)
},
uiRunner,
)
func PostTaskAndReplyWithResultAndTraits ¶
func PostTaskAndReplyWithResultAndTraits[T any]( targetRunner TaskRunner, task TaskWithResult[T], taskTraits TaskTraits, reply ReplyWithResult[T], replyTraits TaskTraits, replyRunner TaskRunner, )
PostTaskAndReplyWithResultAndTraits is the full-featured version that allows specifying different traits for the task and reply separately.
This is useful when: - Task is background work (BestEffort) but reply is UI update (UserVisible/UserBlocking) - Task has different priority requirements than the reply
Example:
PostTaskAndReplyWithResultAndTraits(
backgroundRunner,
func(ctx context.Context) (*UserData, error) {
return fetchUserFromDB(ctx)
},
TraitsBestEffort(), // Background work, low priority
func(ctx context.Context, user *UserData, err error) {
updateUI(user)
},
TraitsUserVisible(), // UI update, higher priority
uiRunner,
)
Types ¶
type DelayManager ¶
type DelayManager struct {
// contains filtered or unexported fields
}
func NewDelayManager ¶
func NewDelayManager() *DelayManager
func (*DelayManager) AddDelayedTask ¶
func (dm *DelayManager) AddDelayedTask(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)
func (*DelayManager) Stop ¶
func (dm *DelayManager) Stop()
func (*DelayManager) TaskCount ¶
func (dm *DelayManager) TaskCount() int
type DelayedTask ¶
type DelayedTask struct {
RunAt time.Time
Task Task
Traits TaskTraits
Target TaskRunner
// contains filtered or unexported fields
}
DelayedTask represents a task scheduled for the future
type DelayedTaskHeap ¶
type DelayedTaskHeap []*DelayedTask
DelayedTaskHeap implements heap.Interface
func (DelayedTaskHeap) Len ¶
func (h DelayedTaskHeap) Len() int
func (DelayedTaskHeap) Less ¶
func (h DelayedTaskHeap) Less(i, j int) bool
func (*DelayedTaskHeap) Peek ¶
func (h *DelayedTaskHeap) Peek() *DelayedTask
func (*DelayedTaskHeap) Pop ¶
func (h *DelayedTaskHeap) Pop() any
func (*DelayedTaskHeap) Push ¶
func (h *DelayedTaskHeap) Push(x any)
func (DelayedTaskHeap) Swap ¶
func (h DelayedTaskHeap) Swap(i, j int)
type FIFOTaskQueue ¶
type FIFOTaskQueue struct {
// contains filtered or unexported fields
}
func NewFIFOTaskQueue ¶
func NewFIFOTaskQueue() *FIFOTaskQueue
func (*FIFOTaskQueue) Clear ¶
func (q *FIFOTaskQueue) Clear()
Clear removes all tasks from the queue and releases references
func (*FIFOTaskQueue) IsEmpty ¶
func (q *FIFOTaskQueue) IsEmpty() bool
func (*FIFOTaskQueue) Len ¶
func (q *FIFOTaskQueue) Len() int
func (*FIFOTaskQueue) MaybeCompact ¶
func (q *FIFOTaskQueue) MaybeCompact()
func (*FIFOTaskQueue) PeekTraits ¶
func (q *FIFOTaskQueue) PeekTraits() (TaskTraits, bool)
func (*FIFOTaskQueue) Pop ¶
func (q *FIFOTaskQueue) Pop() (TaskItem, bool)
func (*FIFOTaskQueue) PopUpTo ¶
func (q *FIFOTaskQueue) PopUpTo(max int) []TaskItem
func (*FIFOTaskQueue) Push ¶
func (q *FIFOTaskQueue) Push(t Task, traits TaskTraits)
type PriorityTaskQueue ¶
type PriorityTaskQueue struct {
// contains filtered or unexported fields
}
func NewPriorityTaskQueue ¶
func NewPriorityTaskQueue() *PriorityTaskQueue
func (*PriorityTaskQueue) Clear ¶
func (q *PriorityTaskQueue) Clear()
Clear removes all tasks from the queue and releases references
func (*PriorityTaskQueue) IsEmpty ¶
func (q *PriorityTaskQueue) IsEmpty() bool
func (*PriorityTaskQueue) Len ¶
func (q *PriorityTaskQueue) Len() int
func (*PriorityTaskQueue) MaybeCompact ¶
func (q *PriorityTaskQueue) MaybeCompact()
func (*PriorityTaskQueue) PeekTraits ¶
func (q *PriorityTaskQueue) PeekTraits() (TaskTraits, bool)
func (*PriorityTaskQueue) Pop ¶
func (q *PriorityTaskQueue) Pop() (TaskItem, bool)
func (*PriorityTaskQueue) PopUpTo ¶
func (q *PriorityTaskQueue) PopUpTo(max int) []TaskItem
func (*PriorityTaskQueue) Push ¶
func (q *PriorityTaskQueue) Push(t Task, traits TaskTraits)
type RepeatingTaskHandle ¶
type RepeatingTaskHandle interface {
// Stop stops the repeating task. It will not interrupt a currently executing task,
// but will prevent future executions from being scheduled.
Stop()
// IsStopped returns true if the task has been stopped.
IsStopped() bool
}
RepeatingTaskHandle controls the lifecycle of a repeating task.
type ReplyWithResult ¶
ReplyWithResult defines a reply callback that receives a result of type T and an error. This is the counterpart to TaskWithResult, receiving the values returned by the task.
type SequencedTaskRunner ¶
type SequencedTaskRunner struct {
// contains filtered or unexported fields
}
func NewSequencedTaskRunner ¶
func NewSequencedTaskRunner(threadPool ThreadPool) *SequencedTaskRunner
func (*SequencedTaskRunner) FlushAsync ¶
func (r *SequencedTaskRunner) FlushAsync(callback func())
FlushAsync posts a barrier task that executes the callback when all prior tasks complete. This is a non-blocking alternative to WaitIdle.
The callback will be executed on this runner's thread, after all tasks posted before FlushAsync have completed.
Example:
runner.PostTask(task1)
runner.PostTask(task2)
runner.FlushAsync(func() {
fmt.Println("task1 and task2 completed!")
})
func (*SequencedTaskRunner) GetThreadPool ¶
func (r *SequencedTaskRunner) GetThreadPool() ThreadPool
GetThreadPool returns the underlying ThreadPool used by this runner
func (*SequencedTaskRunner) IsClosed ¶
func (r *SequencedTaskRunner) IsClosed() bool
IsClosed returns true if the runner has been shut down.
func (*SequencedTaskRunner) Metadata ¶
func (r *SequencedTaskRunner) Metadata() map[string]any
Metadata returns the metadata associated with the task runner
func (*SequencedTaskRunner) Name ¶
func (r *SequencedTaskRunner) Name() string
Name returns the name of the task runner
func (*SequencedTaskRunner) PostDelayedTask ¶
func (r *SequencedTaskRunner) PostDelayedTask(task Task, delay time.Duration)
PostDelayedTask submits a task to execute after a delay
func (*SequencedTaskRunner) PostDelayedTaskWithTraits ¶
func (r *SequencedTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
PostDelayedTaskWithTraits submits a delayed task with specified traits
func (*SequencedTaskRunner) PostRepeatingTask ¶
func (r *SequencedTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
PostRepeatingTask submits a task that repeats at a fixed interval
func (*SequencedTaskRunner) PostRepeatingTaskWithInitialDelay ¶
func (r *SequencedTaskRunner) PostRepeatingTaskWithInitialDelay( task Task, initialDelay, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithInitialDelay submits a repeating task with an initial delay The task will first execute after initialDelay, then repeat every interval.
func (*SequencedTaskRunner) PostRepeatingTaskWithTraits ¶
func (r *SequencedTaskRunner) PostRepeatingTaskWithTraits( task Task, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithTraits submits a repeating task with specific traits
func (*SequencedTaskRunner) PostTask ¶
func (r *SequencedTaskRunner) PostTask(task Task)
PostTask submits a task with default traits
func (*SequencedTaskRunner) PostTaskAndReply ¶
func (r *SequencedTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
PostTaskAndReply executes task on this runner, then posts reply to replyRunner. If task panics, reply will not be executed.
func (*SequencedTaskRunner) PostTaskAndReplyWithTraits ¶
func (r *SequencedTaskRunner) PostTaskAndReplyWithTraits( task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, replyRunner TaskRunner, )
PostTaskAndReplyWithTraits allows specifying different traits for task and reply. This is useful when task is background work (BestEffort) but reply is UI update (UserVisible).
func (*SequencedTaskRunner) PostTaskWithTraits ¶
func (r *SequencedTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
PostTaskWithTraits submits a task with specified traits
func (*SequencedTaskRunner) SetMetadata ¶
func (r *SequencedTaskRunner) SetMetadata(key string, value interface{})
SetMetadata sets a metadata key-value pair
func (*SequencedTaskRunner) SetName ¶
func (r *SequencedTaskRunner) SetName(name string)
SetName sets the name of the task runner
func (*SequencedTaskRunner) Shutdown ¶
func (r *SequencedTaskRunner) Shutdown()
Shutdown gracefully stops the runner by: 1. Marking it as closed (stops accepting new tasks) 2. Clearing all pending tasks in the queue 3. All repeating tasks will automatically stop on their next execution 4. Signaling all WaitShutdown() waiters
Note: This method is non-blocking and can be safely called from within a task. Note: This will not interrupt currently executing tasks.
func (*SequencedTaskRunner) WaitIdle ¶
func (r *SequencedTaskRunner) WaitIdle(ctx context.Context) error
WaitIdle blocks until all currently queued tasks have completed execution. This is implemented by posting a barrier task and waiting for it to execute.
Due to the sequential nature of SequencedTaskRunner, when the barrier task executes, all tasks posted before WaitIdle are guaranteed to have completed.
Returns error if: - Context is cancelled or deadline exceeded - Runner is closed when WaitIdle is called
Note: Tasks posted after WaitIdle is called are not waited for. Note: Repeating tasks will continue to repeat and are not waited for.
func (*SequencedTaskRunner) WaitShutdown ¶
func (r *SequencedTaskRunner) WaitShutdown(ctx context.Context) error
WaitShutdown blocks until Shutdown() is called on this runner.
This is useful for waiting for the runner to be shut down, either by an external caller or by a task running on the runner itself.
Returns error if context is cancelled or deadline exceeded.
Example:
// Task shuts down the runner when condition is met
runner.PostTask(func(ctx context.Context) {
if conditionMet() {
me := GetCurrentTaskRunner(ctx)
me.Shutdown()
}
})
// Main thread waits for shutdown
runner.WaitShutdown(context.Background())
type SingleThreadTaskRunner ¶
type SingleThreadTaskRunner struct {
// contains filtered or unexported fields
}
SingleThreadTaskRunner binds a dedicated Goroutine to execute tasks sequentially. It guarantees that all tasks submitted to it run on the same Goroutine (Thread Affinity).
Use cases: 1. Blocking IO operations (e.g., NetworkReceiver) 2. CGO calls that require Thread Local Storage 3. Simulating Main Thread / UI Thread behavior
Key differences from SequencedTaskRunner: - SequencedTaskRunner: Tasks execute sequentially but may run on different worker goroutines - SingleThreadTaskRunner: Tasks execute sequentially AND always on the same dedicated goroutine
func NewSingleThreadTaskRunner ¶
func NewSingleThreadTaskRunner() *SingleThreadTaskRunner
NewSingleThreadTaskRunner creates and starts a new SingleThreadTaskRunner. It immediately spawns a dedicated goroutine for task execution.
func (*SingleThreadTaskRunner) FlushAsync ¶
func (r *SingleThreadTaskRunner) FlushAsync(callback func())
FlushAsync posts a barrier task that executes the callback when all prior tasks complete. This is a non-blocking alternative to WaitIdle.
The callback will be executed on this runner's dedicated goroutine, after all tasks posted before FlushAsync have completed.
Example:
runner.PostTask(task1)
runner.PostTask(task2)
runner.FlushAsync(func() {
fmt.Println("task1 and task2 completed!")
})
func (*SingleThreadTaskRunner) GetThreadPool ¶
func (r *SingleThreadTaskRunner) GetThreadPool() ThreadPool
GetThreadPool returns nil because SingleThreadTaskRunner doesn't use a thread pool
func (*SingleThreadTaskRunner) IsClosed ¶
func (r *SingleThreadTaskRunner) IsClosed() bool
IsClosed returns true if the runner has been stopped
func (*SingleThreadTaskRunner) Metadata ¶
func (r *SingleThreadTaskRunner) Metadata() map[string]interface{}
Metadata returns the metadata associated with the task runner
func (*SingleThreadTaskRunner) Name ¶
func (r *SingleThreadTaskRunner) Name() string
Name returns the name of the task runner
func (*SingleThreadTaskRunner) PostDelayedTask ¶
func (r *SingleThreadTaskRunner) PostDelayedTask(task Task, delay time.Duration)
PostDelayedTask submits a delayed task
func (*SingleThreadTaskRunner) PostDelayedTaskWithTraits ¶
func (r *SingleThreadTaskRunner) PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
PostDelayedTaskWithTraits submits a delayed task with traits. Uses time.AfterFunc which is independent of the global TaskScheduler, ensuring IO-related timers are not affected by scheduler load.
func (*SingleThreadTaskRunner) PostRepeatingTask ¶
func (r *SingleThreadTaskRunner) PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
PostRepeatingTask submits a task that repeats at a fixed interval
func (*SingleThreadTaskRunner) PostRepeatingTaskWithInitialDelay ¶
func (r *SingleThreadTaskRunner) PostRepeatingTaskWithInitialDelay( task Task, initialDelay, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithInitialDelay submits a repeating task with an initial delay
func (*SingleThreadTaskRunner) PostRepeatingTaskWithTraits ¶
func (r *SingleThreadTaskRunner) PostRepeatingTaskWithTraits( task Task, interval time.Duration, traits TaskTraits, ) RepeatingTaskHandle
PostRepeatingTaskWithTraits submits a repeating task with traits
func (*SingleThreadTaskRunner) PostTask ¶
func (r *SingleThreadTaskRunner) PostTask(task Task)
PostTask submits a task for execution
func (*SingleThreadTaskRunner) PostTaskAndReply ¶
func (r *SingleThreadTaskRunner) PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
PostTaskAndReply executes task on this runner, then posts reply to replyRunner. If task panics, reply will not be executed. Both task and reply will execute on the same dedicated goroutine if replyRunner is this runner.
func (*SingleThreadTaskRunner) PostTaskAndReplyWithTraits ¶
func (r *SingleThreadTaskRunner) PostTaskAndReplyWithTraits( task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, replyRunner TaskRunner, )
PostTaskAndReplyWithTraits allows specifying different traits for task and reply. This is useful when task is background work (BestEffort) but reply is UI update (UserVisible). Note: For SingleThreadTaskRunner, traits don't affect execution order since all tasks run sequentially on the same goroutine, but they may be used for logging or metrics.
func (*SingleThreadTaskRunner) PostTaskWithTraits ¶
func (r *SingleThreadTaskRunner) PostTaskWithTraits(task Task, traits TaskTraits)
PostTaskWithTraits submits a task with traits (traits are ignored for single-threaded execution)
func (*SingleThreadTaskRunner) SetMetadata ¶
func (r *SingleThreadTaskRunner) SetMetadata(key string, value interface{})
SetMetadata sets a metadata key-value pair
func (*SingleThreadTaskRunner) SetName ¶
func (r *SingleThreadTaskRunner) SetName(name string)
SetName sets the name of the task runner
func (*SingleThreadTaskRunner) Shutdown ¶
func (r *SingleThreadTaskRunner) Shutdown()
Shutdown marks the runner as closed and signals shutdown waiters. Unlike Stop(), this method does NOT immediately terminate the runLoop. This allows tasks to call Shutdown() from within themselves.
After calling Shutdown(): - WaitShutdown() will return - IsClosed() will return true - New tasks posted will be ignored - Existing queued tasks will still execute - Call Stop() to actually terminate the runLoop
func (*SingleThreadTaskRunner) Stop ¶
func (r *SingleThreadTaskRunner) Stop()
Stop stops the runner and releases resources
func (*SingleThreadTaskRunner) WaitIdle ¶
func (r *SingleThreadTaskRunner) WaitIdle(ctx context.Context) error
WaitIdle blocks until all currently queued tasks have completed execution. This is implemented by posting a barrier task and waiting for it to execute.
Since SingleThreadTaskRunner executes tasks sequentially on a dedicated goroutine, when the barrier task executes, all tasks posted before WaitIdle are guaranteed to have completed.
Returns error if: - Context is cancelled or deadline exceeded - Runner is closed when WaitIdle is called
Note: Tasks posted after WaitIdle is called are not waited for. Note: Repeating tasks will continue to repeat and are not waited for.
func (*SingleThreadTaskRunner) WaitShutdown ¶
func (r *SingleThreadTaskRunner) WaitShutdown(ctx context.Context) error
WaitShutdown blocks until Shutdown() is called on this runner.
This is useful for waiting for the runner to be shut down, either by an external caller or by a task running on the runner itself.
Returns error if context is cancelled or deadline exceeded.
Example:
// IO thread: receives messages and posts shutdown when condition met
ioRunner.PostTask(func(ctx context.Context) {
for {
msg := receiveMessage()
mainRunner.PostTask(func(ctx context.Context) {
if shouldShutdown(msg) {
me := GetCurrentTaskRunner(ctx)
me.Shutdown()
}
})
}
})
// Main thread waits for shutdown
mainRunner.WaitShutdown(context.Background())
type TaskItem ¶
type TaskItem struct {
Task Task
Traits TaskTraits
}
type TaskPriority ¶
type TaskPriority int
const ( // TaskPriorityBestEffort: Lowest priority TaskPriorityBestEffort TaskPriority = iota // TaskPriorityUserVisible: Default priority TaskPriorityUserVisible // TaskPriorityUserBlocking: Highest priority // `UserBlocking` means the task may block the main thread. // If main thread is blocked, the UI will be unresponsive. // The user experience will be affected if the task blocks the main thread. TaskPriorityUserBlocking )
type TaskQueue ¶
type TaskQueue interface {
Push(t Task, traits TaskTraits)
Pop() (TaskItem, bool)
PopUpTo(max int) []TaskItem
PeekTraits() (TaskTraits, bool)
Len() int
IsEmpty() bool
MaybeCompact()
Clear() // Clear all tasks from the queue
}
TaskQueue defines the interface for different queue implementations
type TaskRunner ¶
type TaskRunner interface {
PostTask(task Task)
PostTaskWithTraits(task Task, traits TaskTraits)
PostDelayedTask(task Task, delay time.Duration)
// [v2.1 New] Support delayed tasks with specific traits
PostDelayedTaskWithTraits(task Task, delay time.Duration, traits TaskTraits)
// [v2.2 New] Support repeating tasks
PostRepeatingTask(task Task, interval time.Duration) RepeatingTaskHandle
PostRepeatingTaskWithTraits(task Task, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
PostRepeatingTaskWithInitialDelay(task Task, initialDelay, interval time.Duration, traits TaskTraits) RepeatingTaskHandle
// [v2.3 New] Support task and reply pattern
// PostTaskAndReply executes task on this runner, then posts reply to replyRunner
PostTaskAndReply(task Task, reply Task, replyRunner TaskRunner)
// PostTaskAndReplyWithTraits allows specifying traits for both task and reply
PostTaskAndReplyWithTraits(task Task, taskTraits TaskTraits, reply Task, replyTraits TaskTraits, replyRunner TaskRunner)
// [v2.4 New] Synchronization and lifecycle management
// WaitIdle blocks until all currently queued tasks have completed execution
// Tasks posted after WaitIdle is called are not waited for
// Returns error if context is cancelled or runner is closed
WaitIdle(ctx context.Context) error
// FlushAsync posts a barrier task that executes callback when all prior tasks complete
// This is a non-blocking alternative to WaitIdle
FlushAsync(callback func())
// WaitShutdown blocks until Shutdown() is called on this runner
// Returns error if context is cancelled
WaitShutdown(ctx context.Context) error
// Shutdown marks the runner as closed and clears all pending tasks
// This method is non-blocking and can be safely called from within a task
Shutdown()
// IsClosed returns true if the runner has been shut down
IsClosed() bool
// [v2.5 New] Identification and Metadata
// Name returns the name of the task runner
Name() string
// Metadata returns the metadata associated with the task runner
Metadata() map[string]any
// [v2.6 New] Thread Pool Access
// GetThreadPool returns the underlying ThreadPool used by this runner
// Returns nil for runners that don't use a thread pool (e.g., SingleThreadTaskRunner)
GetThreadPool() ThreadPool
}
func GetCurrentTaskRunner ¶
func GetCurrentTaskRunner(ctx context.Context) TaskRunner
type TaskScheduler ¶
type TaskScheduler struct {
// contains filtered or unexported fields
}
func NewFIFOTaskScheduler ¶
func NewFIFOTaskScheduler(workerCount int) *TaskScheduler
func NewPriorityTaskScheduler ¶
func NewPriorityTaskScheduler(workerCount int) *TaskScheduler
func (*TaskScheduler) ActiveTaskCount ¶
func (s *TaskScheduler) ActiveTaskCount() int
func (*TaskScheduler) DelayedTaskCount ¶
func (s *TaskScheduler) DelayedTaskCount() int
func (*TaskScheduler) GetWork ¶
func (s *TaskScheduler) GetWork(stopCh <-chan struct{}) (Task, bool)
GetWork (Called by Worker)
func (*TaskScheduler) OnTaskEnd ¶
func (s *TaskScheduler) OnTaskEnd()
func (*TaskScheduler) OnTaskStart ¶
func (s *TaskScheduler) OnTaskStart()
func (*TaskScheduler) PostDelayedInternal ¶
func (s *TaskScheduler) PostDelayedInternal(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)
PostDelayedInternal
func (*TaskScheduler) PostInternal ¶
func (s *TaskScheduler) PostInternal(task Task, traits TaskTraits)
PostInternal
func (*TaskScheduler) QueuedTaskCount ¶
func (s *TaskScheduler) QueuedTaskCount() int
func (*TaskScheduler) Shutdown ¶
func (s *TaskScheduler) Shutdown()
type TaskTraits ¶
type TaskTraits struct {
Priority TaskPriority
MayBlock bool
Category string
}
func DefaultTaskTraits ¶
func DefaultTaskTraits() TaskTraits
func TraitsBestEffort ¶
func TraitsBestEffort() TaskTraits
func TraitsUserBlocking ¶
func TraitsUserBlocking() TaskTraits
func TraitsUserVisible ¶
func TraitsUserVisible() TaskTraits
type TaskWithResult ¶
TaskWithResult defines a task that returns a result of type T and an error. This is used with PostTaskAndReplyWithResult to pass data from task to reply.
type ThreadPool ¶
type ThreadPool interface {
PostInternal(task Task, traits TaskTraits)
PostDelayedInternal(task Task, delay time.Duration, traits TaskTraits, target TaskRunner)
Start(ctx context.Context)
Stop()
ID() string
IsRunning() bool
WorkerCount() int
QueuedTaskCount() int // In queue
ActiveTaskCount() int // Executing
DelayedTaskCount() int // Delayed
}
============================================================================= ThreadPool: Define task execution interface =============================================================================