Documentation
¶
Overview ¶
Package for async task processing.
Core types in this package are: AsyncPool, Future, AwaitFutures.
AsyncPool internally maintain a pool of goroutines. Use NewAsyncPool create a new AsyncPool, customize the pool with options like WithTaskQueue, FallbackCallerRun, FallbackDropTask and FallbackNewGoroutine.
Use CalcPoolSize to estimate optimal worker count in the pool.
Future represents the result of an async task, similar to Future in Java and Promise in Javascript.
Use [AsyncPool.Run], Submit or Run to create an async task, and obtain task result through the returned Future, e.g., [Future.Get] and [Future.TimedGet].
For cases where you need to await for a group of Futures, use NewAwaitFutures. AwaitFutures represent a group of tasks that are triggered at the same time and awaited together.
See BatchTask if you need to manage a group of task generator and consumers without using a goroutine pool.
See SignalOnce for one-time signal based communication.
async package also provides various convenience funcs to run async task until certain condition match, or help you capture potential panic in async task. E.g., RunCancellable, RunCancellableChan, RunUntil, CapturePanic, CapturePanicErr, PanicSafeFunc and so on.
Index ¶
- Variables
- func CalcPoolSize(multi int, min int, max int) int
- func CapturePanic[T any](op func() (T, error)) (T, error)
- func CapturePanicErr(op func()) error
- func FallbackCallerRun() asyncPoolOption
- func FallbackDropTask() asyncPoolOption
- func FallbackNewGoroutine() asyncPoolOption
- func Fire(rail interface{ ... }, task func() error, runner ...func(func()))
- func MaxProcs() int
- func NewSubmitAsyncFunc[T any](pool AsyncPool) func(task func() (T, error)) Future[T]
- func PanicSafeErrFunc(op func() error) func() error
- func PanicSafeFunc(op func()) func()
- func PanicSafeRun(op func())
- func PanicSafeRunErr(op func() error) error
- func RunCancellable(f func()) (cancel func())
- func RunCancellableChan[T any](ch <-chan T, f func(t T) (stop bool)) (cancel func())
- func RunUntil[T any](dur time.Duration, f func() (stop bool, t T, e error)) (T, error)
- func WithTaskQueue(queueSize int) asyncPoolOption
- type AntsAsyncPool
- type AsyncPool
- type AwaitFutures
- type BatchTask
- type BatchTaskResult
- type BoundedAsyncPool
- type DoneWatcher
- type Future
- type SignalOnce
- type Synced
- type TickRunner
Constants ¶
This section is empty.
Variables ¶
var (
ErrGetTimeout = errors.New("future.TimedGet timeout")
)
Functions ¶
func CalcPoolSize ¶ added in v0.3.11
Return multi * GOMAXPROCS or min whichever is greater, if multi * GOMAXPROCS > max, max is returned instead.
If max is unlimited, use -1 as placeholder.
func CapturePanic ¶ added in v0.3.9
func CapturePanicErr ¶ added in v0.3.9
func CapturePanicErr(op func()) error
func FallbackCallerRun ¶
func FallbackCallerRun() asyncPoolOption
Fallback to caller runs when pool is full.
func FallbackNewGoroutine ¶ added in v0.4.0
func FallbackNewGoroutine() asyncPoolOption
Create new Goroutine to run the task when pool is full.
func NewSubmitAsyncFunc ¶
Create func that calls SubmitAsync(...) with the given pool.
func PanicSafeErrFunc ¶
func PanicSafeFunc ¶
func PanicSafeFunc(op func()) func()
func PanicSafeRun ¶
func PanicSafeRun(op func())
func PanicSafeRunErr ¶
func RunCancellableChan ¶
Keep consuming from ch until cancelled.
func WithTaskQueue ¶ added in v0.3.11
func WithTaskQueue(queueSize int) asyncPoolOption
With task queue
Types ¶
type AntsAsyncPool ¶
type AntsAsyncPool struct {
// contains filtered or unexported fields
}
func NewAntsAsyncPool
deprecated
func NewAntsAsyncPool(maxWorkers int, opts ...asyncPoolOption) *AntsAsyncPool
Create a bounded pool of goroutines without extra task queue.
The maxWorkers determines the max number of workers.
AntsAsyncPool does not maintain task queue. By default, if all workers are busy, the caller of [AsyncPool.Go] is blocked indefinitively until the task can be processed.
You can use FallbackDropTask or FallbackCallerRun to change this behaviour.
AntsAsyncPool is good for cases where you want back pressure, i.e., stop producing tasks when all workers are busy.
In cases where you want to have an extra queue of tasks that do not always block task producers, use NewBoundedAsyncPool intead.
Deprecated: Since v0.3.10, migrate to NewAsyncPool if possible.
func (*AntsAsyncPool) Go ¶
func (a *AntsAsyncPool) Go(f func())
func (*AntsAsyncPool) Run ¶
func (p *AntsAsyncPool) Run(f func() error) Future[struct{}]
func (*AntsAsyncPool) Stop ¶
func (a *AntsAsyncPool) Stop()
func (*AntsAsyncPool) StopAndWait ¶
func (a *AntsAsyncPool) StopAndWait()
type AsyncPool ¶
Async Pool Interface
func NewAsyncPool ¶ added in v0.3.11
Create a bounded pool of goroutines.
By default, the created AsyncPool does not maintain an extra task queue. If all workers are busy, the caller of [AsyncPool.Go] is blocked indefinitively until a worker is free.
This is good for cases where you want back pressure, i.e., stop producing tasks when all workers are busy.
In cases where you want an extra task queue, e.g., so that the task producers won't block so frequently when pool is exhausted, use WithTaskQueue to specify the task queue size. Decide whether you need a task queue based on your use case.
You can also use FallbackDropTask or FallbackCallerRun to customize fallback behaviour.
Find proper worker pool size based on N * GOMAXPROCS, e.g., in Redis connection pool, N might be 10; in web server connection pool, N can be 258 and so on.
When the tasks are CPU intensive, N should be relatively small, e.g., N=1 or N=2.
See CalcPoolSize.
func NewCpuAsyncPool
deprecated
func NewCpuAsyncPool() AsyncPool
Create AsyncPool with number of workers equals to 4 * GOMAXPROCS.
Deprecated: Since v0.3.10. use NewAsyncPool instead.
func NewIOAsyncPool
deprecated
func NewIOAsyncPool() AsyncPool
Create AsyncPool with number of workers equals to 8 * GOMAXPROCS and a task queue of size 100.
Deprecated: Since v0.3.10. use NewAsyncPool instead.
type AwaitFutures ¶
type AwaitFutures[T any] struct { // contains filtered or unexported fields }
AwaitFutures represent tasks that are submitted to the pool asynchronously whose results are awaited together.
AwaitFutures should only be used once for the same group of tasks.
Use NewAwaitFutures to create one.
func NewAwaitFutures ¶
func NewAwaitFutures[T any](pool AsyncPool) *AwaitFutures[T]
Create new AwaitFutures for a group of tasks.
*AsyncPool is optional, provide nil if not needed.
func (*AwaitFutures[T]) Await ¶
func (a *AwaitFutures[T]) Await() []Future[T]
Await results of all tasks.
func (*AwaitFutures[T]) AwaitAnyErr ¶
func (a *AwaitFutures[T]) AwaitAnyErr() error
Await results of all tasks and return any error that is found in the task Futures.
func (*AwaitFutures[T]) AwaitResultAll ¶ added in v0.4.0
func (a *AwaitFutures[T]) AwaitResultAll() []pair.Pair[T, error]
Await results of all tasks.
func (*AwaitFutures[T]) AwaitResultAnyErr ¶
func (a *AwaitFutures[T]) AwaitResultAnyErr() ([]T, error)
Await results of all tasks and return any error that is found in the task Futures.
func (*AwaitFutures[T]) SubmitAsync ¶
func (a *AwaitFutures[T]) SubmitAsync(task func() (T, error))
Submit task to AwaitFutures.
type BatchTask ¶
func NewBatchTask ¶
func NewBatchTask[T any, V any](parallel int, bufferSize int, consumer func(T) (V, error)) *BatchTask[T, V]
Create a batch of concurrent task for one time use.
func (*BatchTask[T, V]) Close ¶
func (b *BatchTask[T, V]) Close()
Close underlying pipeline channel without waiting.
func (*BatchTask[T, V]) Wait ¶
func (b *BatchTask[T, V]) Wait() []BatchTaskResult[V]
Wait until all generated tasks are completed and close pipeline channel.
type BatchTaskResult ¶
type BoundedAsyncPool ¶
type BoundedAsyncPool struct {
// contains filtered or unexported fields
}
A long live, bounded pool of goroutines.
Use NewBoundedAsyncPool to create a new pool.
BoundedAsyncPool internally maintains a task queue with limited size and limited number of workers.
By default, if the task queue is full and all workers are busy, the caller of *BoundedAsyncPool.Go is blocked indefinitively until the task can be processed. You can use FallbackDropTask or FallbackCallerRun to change this behaviour.
func NewBoundedAsyncPool
deprecated
func NewBoundedAsyncPool(maxTasks int, maxWorkers int, opts ...asyncPoolOption) *BoundedAsyncPool
Create a bounded pool of goroutines.
The maxTasks determines the capacity of the task queues.
The maxWorkers determines the max number of workers.
By default, if the task queue is full and all workers are busy, the caller of *AsyncPool.Go is blocked indefinitively until the task can be processed. You can use FallbackDropTask or FallbackCallerRun to change this behaviour.
Deprecated: Since v0.3.10, migrate to NewAsyncPool if possible.
func (*BoundedAsyncPool) Go ¶
func (p *BoundedAsyncPool) Go(f func())
Submit task to the pool.
If the pool is closed, caller will execute the submitted task directly.
func (*BoundedAsyncPool) Run ¶
func (p *BoundedAsyncPool) Run(f func() error) Future[struct{}]
func (*BoundedAsyncPool) Stop ¶
func (p *BoundedAsyncPool) Stop()
Stop the pool.
Once the pool is stopped, new tasks submitted are executed directly by the caller.
func (*BoundedAsyncPool) StopAndWait ¶
func (p *BoundedAsyncPool) StopAndWait()
Stop the pool and wait until existing workers drain all the remaining tasks.
Once the pool is stopped, new tasks submitted are executed directly by the caller.
type DoneWatcher ¶ added in v0.4.2
type DoneWatcher struct {
// contains filtered or unexported fields
}
func NewDoneWatcher ¶ added in v0.4.2
func NewDoneWatcher(interval time.Duration, onEveryCheck func(), onFinished func()) *DoneWatcher
func (*DoneWatcher) Done ¶ added in v0.4.2
func (w *DoneWatcher) Done()
type Future ¶
type Future[T any] interface { // Get result without timeout. Get() (T, error) // Get result with timeout, returns ErrGetTimeout if timeout exceeded. TimedGet(timeout int) (T, error) // Then callback to be invoked when the Future is completed. // // Then callback should only be set once for every Future. Then(tf func(T, error)) // Then callback to be invoked when the Future is completed. // // Then callback should only be set once for every Future. ThenErr(tf func(error)) }
Result of a asynchronous task.
func NewCompletedFuture ¶
type SignalOnce ¶
type SignalOnce struct {
// contains filtered or unexported fields
}
func NewSignalOnce ¶
func NewSignalOnce() *SignalOnce
func (*SignalOnce) Closed ¶
func (s *SignalOnce) Closed() bool
func (*SignalOnce) Notify ¶
func (s *SignalOnce) Notify()
func (*SignalOnce) TimedWait ¶
func (s *SignalOnce) TimedWait(timeout time.Duration) (isTimeout bool)
func (*SignalOnce) Wait ¶
func (s *SignalOnce) Wait()
type TickRunner ¶ added in v0.4.2
type TickRunner struct {
// contains filtered or unexported fields
}
Runner that triggers run on every tick.
Create TickRunner using func NewTickRunner(...).
func NewTickRuner ¶ added in v0.4.2
func NewTickRuner(freq time.Duration, run func()) *TickRunner
func (*TickRunner) Start ¶ added in v0.4.2
func (t *TickRunner) Start()
func (*TickRunner) Stop ¶ added in v0.4.2
func (t *TickRunner) Stop()