Documentation
¶
Index ¶
- Variables
- func FallbackCallerRun() asyncPoolOption
- func FallbackDropTask() asyncPoolOption
- func NewSubmitAsyncFunc[T any](pool AsyncPool) func(task func() (T, error)) Future[T]
- func PanicSafeErrFunc(op func() error) func() error
- func PanicSafeFunc(op func()) func()
- func PanicSafeRun(op func())
- func PanicSafeRunErr(op func() error) error
- func RunCancellable(f func()) (cancel func())
- func RunCancellableChan[T any](ch <-chan T, f func(t T) (stop bool)) (cancel func())
- func RunUntil[T any](wait time.Duration, f func() (stop bool, t T, e error)) (T, error)
- type AntsAsyncPool
- type AsyncPool
- type AwaitFutures
- type BatchTask
- type BatchTaskResult
- type BoundedAsyncPool
- type Future
- type SignalOnce
Constants ¶
This section is empty.
Variables ¶
var (
ErrGetTimeout = errors.New("future.TimedGet timeout")
)
Functions ¶
func FallbackCallerRun ¶
func FallbackCallerRun() asyncPoolOption
Fallback to caller runs when pool is full.
func NewSubmitAsyncFunc ¶
Create func that calls SubmitAsync(...) with the given pool.
func PanicSafeErrFunc ¶
func PanicSafeFunc ¶
func PanicSafeFunc(op func()) func()
func PanicSafeRun ¶
func PanicSafeRun(op func())
func PanicSafeRunErr ¶
func RunCancellable ¶
func RunCancellable(f func()) (cancel func())
func RunCancellableChan ¶
Types ¶
type AntsAsyncPool ¶
type AntsAsyncPool struct {
// contains filtered or unexported fields
}
func (*AntsAsyncPool) Go ¶
func (a *AntsAsyncPool) Go(f func())
func (*AntsAsyncPool) Run ¶
func (p *AntsAsyncPool) Run(f func() error) Future[struct{}]
func (*AntsAsyncPool) Stop ¶
func (a *AntsAsyncPool) Stop()
func (*AntsAsyncPool) StopAndWait ¶
func (a *AntsAsyncPool) StopAndWait()
type AsyncPool ¶
Async Pool Interface
func NewAntsAsyncPool ¶
Create a bounded pool of goroutines.
The maxTasks determines the capacity of the task queues.
The maxWorkers determines the max number of workers.
By default, if the task queue is full and all workers are busy, the caller of AsyncPool.Go() is blocked indefinitively until the task can be processed. You can use FallbackDropTask or FallbackCallerRun to change this behaviour.
func NewCpuAsyncPool ¶
func NewCpuAsyncPool() AsyncPool
Create AsyncPool with number of workers equals to 4 * GOMAXPROCS.
func NewIOAsyncPool ¶
func NewIOAsyncPool() AsyncPool
Create AsyncPool with number of workers equals to 8 * GOMAXPROCS and a task queue of size 100.
type AwaitFutures ¶
type AwaitFutures[T any] struct { // contains filtered or unexported fields }
AwaitFutures represent tasks that are submitted to the pool asynchronously whose results are awaited together.
AwaitFutures should only be used once for the same group of tasks.
Use miso.NewAwaitFutures() to create one.
func NewAwaitFutures ¶
func NewAwaitFutures[T any](pool AsyncPool) *AwaitFutures[T]
Create new AwaitFutures for a group of tasks.
*AsyncPool is optional, provide nil if not needed.
func (*AwaitFutures[T]) Await ¶
func (a *AwaitFutures[T]) Await() []Future[T]
Await results of all tasks.
func (*AwaitFutures[T]) AwaitAnyErr ¶
func (a *AwaitFutures[T]) AwaitAnyErr() error
Await results of all tasks and return any error that is found in the task Futures.
func (*AwaitFutures[T]) AwaitResultAnyErr ¶
func (a *AwaitFutures[T]) AwaitResultAnyErr() ([]T, error)
Await results of all tasks and return any error that is found in the task Futures.
func (*AwaitFutures[T]) SubmitAsync ¶
func (a *AwaitFutures[T]) SubmitAsync(task func() (T, error))
Submit task to AwaitFutures.
type BatchTask ¶
func NewBatchTask ¶
func NewBatchTask[T any, V any](parallel int, bufferSize int, consumer func(T) (V, error)) *BatchTask[T, V]
Create a batch of concurrent task for one time use.
func (*BatchTask[T, V]) Close ¶
func (b *BatchTask[T, V]) Close()
Close underlying pipeline channel without waiting.
func (*BatchTask[T, V]) Wait ¶
func (b *BatchTask[T, V]) Wait() []BatchTaskResult[V]
Wait until all generated tasks are completed and close pipeline channel.
type BatchTaskResult ¶
type BoundedAsyncPool ¶
type BoundedAsyncPool struct {
// contains filtered or unexported fields
}
A long live, bounded pool of goroutines.
Use [miso.NewBoundedAsyncPool] to create a new pool.
BoundedAsyncPool internally maintains a task queue with limited size and limited number of workers.
By default, if the task queue is full and all workers are busy, the caller of *BoundedAsyncPool.Go is blocked indefinitively until the task can be processed. You can use FallbackDropTask or FallbackCallerRun to change this behaviour.
func NewBoundedAsyncPool ¶
func NewBoundedAsyncPool(maxTasks int, maxWorkers int, opts ...asyncPoolOption) *BoundedAsyncPool
Create a bounded pool of goroutines.
The maxTasks determines the capacity of the task queues.
The maxWorkers determines the max number of workers.
By default, if the task queue is full and all workers are busy, the caller of *AsyncPool.Go is blocked indefinitively until the task can be processed. You can use FallbackDropTask or FallbackCallerRun to change this behaviour.
func (*BoundedAsyncPool) Go ¶
func (p *BoundedAsyncPool) Go(f func())
Submit task to the pool.
If the pool is closed, caller will execute the submitted task directly.
func (*BoundedAsyncPool) Run ¶
func (p *BoundedAsyncPool) Run(f func() error) Future[struct{}]
func (*BoundedAsyncPool) Stop ¶
func (p *BoundedAsyncPool) Stop()
Stop the pool.
Once the pool is stopped, new tasks submitted are executed directly by the caller.
func (*BoundedAsyncPool) StopAndWait ¶
func (p *BoundedAsyncPool) StopAndWait()
Stop the pool and wait until existing workers drain all the remaining tasks.
Once the pool is stopped, new tasks submitted are executed directly by the caller.
type Future ¶
type Future[T any] interface { // Get result without timeout. Get() (T, error) // Get result with timeout, returns ErrGetTimeout if timeout exceeded. TimedGet(timeout int) (T, error) // Then callback to be invoked when the Future is completed. // // Then callback should only be set once for every Future. Then(tf func(T, error)) // Then callback to be invoked when the Future is completed. // // Then callback should only be set once for every Future. ThenErr(tf func(error)) }
Result of a asynchronous task.
func NewCompletedFuture ¶
type SignalOnce ¶
type SignalOnce struct {
// contains filtered or unexported fields
}
func NewSignalOnce ¶
func NewSignalOnce() *SignalOnce
func (*SignalOnce) Closed ¶
func (s *SignalOnce) Closed() bool
func (*SignalOnce) Notify ¶
func (s *SignalOnce) Notify()
func (*SignalOnce) TimedWait ¶
func (s *SignalOnce) TimedWait(timeout time.Duration) (isTimeout bool)
func (*SignalOnce) Wait ¶
func (s *SignalOnce) Wait()