async

package
v0.3.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrGetTimeout = errors.New("future.TimedGet timeout")
)

Functions

func CapturePanic added in v0.3.9

func CapturePanic[T any](op func() (T, error)) (T, error)

func CapturePanicErr added in v0.3.9

func CapturePanicErr(op func()) error

func FallbackCallerRun

func FallbackCallerRun() asyncPoolOption

Fallback to caller runs when pool is full.

func FallbackDropTask

func FallbackDropTask() asyncPoolOption

Drop task when pool is full.

func NewSubmitAsyncFunc

func NewSubmitAsyncFunc[T any](pool AsyncPool) func(task func() (T, error)) Future[T]

Create func that calls SubmitAsync(...) with the given pool.

func PanicSafeErrFunc

func PanicSafeErrFunc(op func() error) func() error

func PanicSafeFunc

func PanicSafeFunc(op func()) func()

func PanicSafeRun

func PanicSafeRun(op func())

func PanicSafeRunErr

func PanicSafeRunErr(op func() error) error

func RunCancellable

func RunCancellable(f func()) (cancel func())

func RunCancellableChan

func RunCancellableChan[T any](ch <-chan T, f func(t T) (stop bool)) (cancel func())

func RunUntil

func RunUntil[T any](wait time.Duration, f func() (stop bool, t T, e error)) (T, error)

Types

type AntsAsyncPool

type AntsAsyncPool struct {
	// contains filtered or unexported fields
}

func (*AntsAsyncPool) Go

func (a *AntsAsyncPool) Go(f func())

func (*AntsAsyncPool) Run

func (p *AntsAsyncPool) Run(f func() error) Future[struct{}]

func (*AntsAsyncPool) Stop

func (a *AntsAsyncPool) Stop()

func (*AntsAsyncPool) StopAndWait

func (a *AntsAsyncPool) StopAndWait()

type AsyncPool

type AsyncPool interface {
	Go(f func())
	Stop()
	StopAndWait()
	Run(f func() error) Future[struct{}]
}

Async Pool Interface

func NewAntsAsyncPool

func NewAntsAsyncPool(maxWorkers int, opts ...asyncPoolOption) AsyncPool

Create a bounded pool of goroutines.

The maxTasks determines the capacity of the task queues.

The maxWorkers determines the max number of workers.

By default, if the task queue is full and all workers are busy, the caller of AsyncPool.Go() is blocked indefinitively until the task can be processed. You can use FallbackDropTask or FallbackCallerRun to change this behaviour.

func NewCpuAsyncPool

func NewCpuAsyncPool() AsyncPool

Create AsyncPool with number of workers equals to 4 * GOMAXPROCS.

func NewIOAsyncPool

func NewIOAsyncPool() AsyncPool

Create AsyncPool with number of workers equals to 8 * GOMAXPROCS and a task queue of size 100.

type AwaitFutures

type AwaitFutures[T any] struct {
	// contains filtered or unexported fields
}

AwaitFutures represent tasks that are submitted to the pool asynchronously whose results are awaited together.

AwaitFutures should only be used once for the same group of tasks.

Use miso.NewAwaitFutures() to create one.

func NewAwaitFutures

func NewAwaitFutures[T any](pool AsyncPool) *AwaitFutures[T]

Create new AwaitFutures for a group of tasks.

*AsyncPool is optional, provide nil if not needed.

func (*AwaitFutures[T]) Await

func (a *AwaitFutures[T]) Await() []Future[T]

Await results of all tasks.

func (*AwaitFutures[T]) AwaitAnyErr

func (a *AwaitFutures[T]) AwaitAnyErr() error

Await results of all tasks and return any error that is found in the task Futures.

func (*AwaitFutures[T]) AwaitResultAnyErr

func (a *AwaitFutures[T]) AwaitResultAnyErr() ([]T, error)

Await results of all tasks and return any error that is found in the task Futures.

func (*AwaitFutures[T]) SubmitAsync

func (a *AwaitFutures[T]) SubmitAsync(task func() (T, error))

Submit task to AwaitFutures.

type BatchTask

type BatchTask[T any, V any] struct {
	// contains filtered or unexported fields
}

func NewBatchTask

func NewBatchTask[T any, V any](parallel int, bufferSize int, consumer func(T) (V, error)) *BatchTask[T, V]

Create a batch of concurrent task for one time use.

func (*BatchTask[T, V]) Close

func (b *BatchTask[T, V]) Close()

Close underlying pipeline channel without waiting.

func (*BatchTask[T, V]) Generate

func (b *BatchTask[T, V]) Generate(task T)

Generate task.

func (*BatchTask[T, V]) Wait

func (b *BatchTask[T, V]) Wait() []BatchTaskResult[V]

Wait until all generated tasks are completed and close pipeline channel.

type BatchTaskResult

type BatchTaskResult[V any] struct {
	Result V
	Err    error
}

type BoundedAsyncPool

type BoundedAsyncPool struct {
	// contains filtered or unexported fields
}

A long live, bounded pool of goroutines.

Use [miso.NewBoundedAsyncPool] to create a new pool.

BoundedAsyncPool internally maintains a task queue with limited size and limited number of workers.

By default, if the task queue is full and all workers are busy, the caller of *BoundedAsyncPool.Go is blocked indefinitively until the task can be processed. You can use FallbackDropTask or FallbackCallerRun to change this behaviour.

func NewBoundedAsyncPool

func NewBoundedAsyncPool(maxTasks int, maxWorkers int, opts ...asyncPoolOption) *BoundedAsyncPool

Create a bounded pool of goroutines.

The maxTasks determines the capacity of the task queues.

The maxWorkers determines the max number of workers.

By default, if the task queue is full and all workers are busy, the caller of *AsyncPool.Go is blocked indefinitively until the task can be processed. You can use FallbackDropTask or FallbackCallerRun to change this behaviour.

func (*BoundedAsyncPool) Go

func (p *BoundedAsyncPool) Go(f func())

Submit task to the pool.

If the pool is closed, caller will execute the submitted task directly.

func (*BoundedAsyncPool) Run

func (p *BoundedAsyncPool) Run(f func() error) Future[struct{}]

func (*BoundedAsyncPool) Stop

func (p *BoundedAsyncPool) Stop()

Stop the pool.

Once the pool is stopped, new tasks submitted are executed directly by the caller.

func (*BoundedAsyncPool) StopAndWait

func (p *BoundedAsyncPool) StopAndWait()

Stop the pool and wait until existing workers drain all the remaining tasks.

Once the pool is stopped, new tasks submitted are executed directly by the caller.

type Future

type Future[T any] interface {

	// Get result without timeout.
	Get() (T, error)

	// Get result with timeout, returns ErrGetTimeout if timeout exceeded.
	TimedGet(timeout int) (T, error)

	// Then callback to be invoked when the Future is completed.
	//
	// Then callback should only be set once for every Future.
	Then(tf func(T, error))

	// Then callback to be invoked when the Future is completed.
	//
	// Then callback should only be set once for every Future.
	ThenErr(tf func(error))
}

Result of a asynchronous task.

func NewCompletedFuture

func NewCompletedFuture[T any](t T, err error) Future[T]

func Run

func Run[T any](task func() (T, error)) Future[T]

Create Future, once the future is created, it starts running on a new goroutine.

func Submit

func Submit[T any](pool AsyncPool, task func() (T, error)) Future[T]

Create Future, once the future is created, it starts running on a saperate goroutine from the pool.

type SignalOnce

type SignalOnce struct {
	// contains filtered or unexported fields
}

func NewSignalOnce

func NewSignalOnce() *SignalOnce

func (*SignalOnce) Closed

func (s *SignalOnce) Closed() bool

func (*SignalOnce) Notify

func (s *SignalOnce) Notify()

func (*SignalOnce) TimedWait

func (s *SignalOnce) TimedWait(timeout time.Duration) (isTimeout bool)

func (*SignalOnce) Wait

func (s *SignalOnce) Wait()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL