async

package
v0.0.0-...-32c7374 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AwaitAll

func AwaitAll[F FutureDyn](ctx context.Context, futures ...F) status.Status

AwaitAll waits for the completion of all futures. The method the context status if the context is cancelled.

func AwaitAny

func AwaitAny[F Future[T], T any](ctx context.Context, futures ...F) (T, int, status.Status)

AwaitAny waits for the completion of any future, and returns its result. The method returns -1 and the context status if the context is cancelled.

func AwaitAnyDyn

func AwaitAnyDyn[F FutureDyn](ctx context.Context, futures ...F) (int, status.Status)

AwaitAnyDyn awaits completion of any future, and returns its result. The method returns -1 and the context status if the context is cancelled.

func AwaitError

func AwaitError[F FutureDyn](ctx context.Context, futures ...F) (int, status.Status)

AwaitError awaits failure of any future, and returns its error. The method returns -1 and the context status if the context is cancelled.

func StdContext

func StdContext(ctx Context) context_.Context

StdContext returns a standard library context from an async one.

func StopAll

func StopAll[R Stopper](routines ...R)

StopAll stops all routines, but does not await their stop.

func StopWait

func StopWait[R Stopper](r R)

StopWait stops a routine and awaits it stop.

Usually used with defer:

w := runWorker()
defer StopWait(w)

func StopWaitAll

func StopWaitAll[R Stopper](routines ...R)

StopWaitAll stops and awaits all routines.

Usually used with defer:

w0 := runWorker()
w1 := runWorker()
defer StopWaitAll(w0, w1)

Types

type CancelContext

type CancelContext = context.CancelContext

CancelContext is a cancellable async context.

func CancelledContext

func CancelledContext() CancelContext

CancelledContext returns a cancelled context.

func NewContext

func NewContext() CancelContext

NewContext returns a new cancellable context.

func NextContext

func NextContext(parent Context) CancelContext

NextContext returns a child context.

type Context

type Context = context.Context

Context is an async cancellation context.

func DeadlineContext

func DeadlineContext(deadline time.Time) Context

DeadlineContext returns a context with a deadline.

func NextDeadlineContext

func NextDeadlineContext(parent Context, deadline time.Time) Context

NextDeadlineContext returns a child context with a deadline.

func NextTimeoutContext

func NextTimeoutContext(parent Context, timeout time.Duration) Context

NextTimeoutContext returns a child context with a timeout.

func NoContext

func NoContext() Context

NoContext returns a non-cancellable background context.

func TimeoutContext

func TimeoutContext(timeout time.Duration) Context

TimeoutContext returns a context with a timeout.

type ContextCallback

type ContextCallback = context.Callback

ContextCallback is called when the context is cancelled.

type Flag

type Flag = flag.Flag

Flag is a read-only boolean flag that can be waited on until set.

Example:

serving := async.UnsetFlag()

func handle(ctx Context, req *request) {
	if !serving.Get() { // just to show Get in example
		select {
		case <-ctx.Wait():
			return ctx.Status()
		case <-serving.Wait():
		}
	}

	// ... handle request
}

func ReverseFlag

func ReverseFlag(f Flag) Flag

ReverseFlag returns a new flag which reverses the original one.

type Func

type Func[T any] func(ctx Context) (T, status.Status)

Func is a function which returns the result.

type Func1

type Func1[T any, A any] func(ctx Context, arg A) (T, status.Status)

Func1 is a single argument function which returns the result.

type FuncVoid

type FuncVoid = func(ctx Context) status.Status

FuncVoid is a function which returns no result.

type FuncVoid1

type FuncVoid1[A any] func(ctx Context, arg A) status.Status

FuncVoid1 is a single argument function which returns no result

type Future

type Future[T any] interface {
	// Done returns true if the future is complete.
	Done() bool

	// Wait returns a channel which is closed when the future is complete.
	Wait() <-chan struct{}

	// Result returns a value and a status.
	Result() (T, status.Status)

	// Status returns a status or none.
	Status() status.Status
}

Future represents a result available in the future.

func Completed

func Completed[T any](result T, st status.Status) Future[T]

Completed returns a completed future.

func Rejected

func Rejected[T any](st status.Status) Future[T]

Rejected returns a rejected future.

func Resolved

func Resolved[T any](result T) Future[T]

Resolved returns a successful future.

type FutureDyn

type FutureDyn interface {
	// Done returns true if the future is complete.
	Done() bool

	// Wait returns a channel which is closed when the future is complete.
	Wait() <-chan struct{}

	// Status returns a status or none.
	Status() status.Status
}

FutureDyn is a future interface without generics, i.e. Future[?].

type FutureGroup

type FutureGroup[T any] []Future[T]

FutureGroup is a group of futures of the same type. Use FutureGroupDyn for a group of futures of different types.

func (FutureGroup[T]) Await

func (g FutureGroup[T]) Await(ctx context.Context) status.Status

Await waits for the completion of all futures in the group. The method returns the context status if the context is cancelled.

func (FutureGroup[T]) AwaitAny

func (g FutureGroup[T]) AwaitAny(ctx context.Context) (T, int, status.Status)

AwaitAny waits for the completion of any future in the group, and returns its result. The method returns -1 and the context status if the context is cancelled.

func (FutureGroup[T]) AwaitError

func (g FutureGroup[T]) AwaitError(ctx context.Context) (int, status.Status)

AwaitError waits for any failure of the futures in the group, and returns the error. The method returns ok if all futures are successful.

func (FutureGroup[T]) AwaitResults

func (g FutureGroup[T]) AwaitResults(ctx context.Context) ([]Result[T], status.Status)

AwaitResults waits for the completion of all futures in the group, and returns the results. The method returns nil and the context status if the context is cancelled.

func (FutureGroup[T]) Results

func (g FutureGroup[T]) Results() []Result[T]

Results returns the results of all futures in the group.

func (FutureGroup[T]) Statuses

func (g FutureGroup[T]) Statuses() []status.Status

Statuses returns the statuses of all futures in the group.

type FutureGroupDyn

type FutureGroupDyn []FutureDyn

FutureGroupDyn is a group of futures of different types. Use FutureGroup for a group of futures of the same type.

func (FutureGroupDyn) Await

Await waits for the completion of all futures in the group. The method returns nil and the context status if the context is cancelled.

func (FutureGroupDyn) AwaitAny

func (g FutureGroupDyn) AwaitAny(ctx context.Context) (int, status.Status)

AwaitAny waits for the completion of any future in the group, and returns its status. The method returns -1 and the context status if the context is cancelled.

func (FutureGroupDyn) AwaitError

func (g FutureGroupDyn) AwaitError(ctx context.Context) (int, status.Status)

AwaitError waits for any failure of the futures in the group, and returns the error. The method returns ok if all futures are successful.

func (FutureGroupDyn) Statuses

func (g FutureGroupDyn) Statuses() []status.Status

Statuses returns the statuses of all futures in the group.

type Lock

type Lock = lock.Lock

Lock is a channel-based lock, which be used directly as a channel, or via the Lock/Unlock methods. To lock a lock receive an empty struct from it, to unlock a lock send an empty struct to it.

Example:

lock := async.NewLock()
select {
case <-lock:
case <-cancel:
	return status.Cancelled
}
defer lock.Unlock()

func NewLock

func NewLock() Lock

NewLock returns a new unlocked lock.

type MutFlag

type MutFlag = flag.MutFlag

MutFlag is a routine-safe boolean flag that can be set, reset, and waited on until set.

Example:

serving := async.UnsetFlag()

func serve() {
	s.serving.Set()
	defer s.serving.Unset()

	// ... start server ...
}

func handle(ctx Context, req *request) {
	select {
	case <-ctx.Wait():
		return ctx.Status()
	case <-serving.Wait():
	}

	// ... handle request
}

func SetFlag

func SetFlag() MutFlag

SetFlag returns a new set flag.

func UnsetFlag

func UnsetFlag() MutFlag

UnsetFlag returns a new unset flag.

type NextFunc

type NextFunc[T any] func(ctx context.Context) (T, bool, status.Status)

NextFunc is a function that returns the next value from a stream.

type Pool

type Pool = pool.Pool

Pool allows to reuse goroutines with preallocated big stacks.

func NewPool

func NewPool() Pool

NewPool returns a new goroutine pool.

type Promise

type Promise[T any] interface {
	Future[T]

	// Reject rejects the promise with a status.
	Reject(st status.Status) bool

	// Resolve completes the promise with a result and ok.
	Resolve(result T) bool

	// Complete completes the promise with a status and a result.
	Complete(result T, st status.Status) bool
}

Promise is a future which can be completed.

func NewPromise

func NewPromise[T any]() Promise[T]

NewPromise returns a pending promise.

type Queue

type Queue[T any] interface {
	// Len returns the number of elements in the queue.
	Len() int

	// Clear clears the queue.
	Clear()

	// Push adds an element to the queue.
	Push(v T)

	// Poll removes an element from the queue, returns false if the queue is empty.
	Poll() (T, bool)

	// Wait returns a channel which is notified on new elements.
	Wait() <-chan struct{}
}

Queue is an unbounded FIFO queue guarded by a mutex.

func NewQueue

func NewQueue[T any](items ...T) Queue[T]

NewQueue returns a new queue.

type Result

type Result[T any] struct {
	Value  T
	Status status.Status
}

Result is an async result which combines a value and a status.

func AwaitResults

func AwaitResults[F Future[T], T any](ctx context.Context, futures ...F) ([]Result[T], status.Status)

AwaitResults waits for the completion of all futures, and returns the results. The method returns nil and the context status if the context is cancelled.

func (Result[T]) Unwrap

func (r Result[T]) Unwrap() (T, status.Status)

Unwrap returns the value and the status.

type Routine

type Routine[T any] interface {
	Future[T]

	// Start start the routine, if not started or stopped yet.
	Start()

	// Stop requests the routine to stop and returns a wait channel.
	// The method does not call the on-stop callbacks if the routine has not started.
	Stop() <-chan struct{}

	// OnStop adds a callback which is called when the routine stops.
	// The callback is called by the routine goroutine, and is not called
	// if the routine has not started.
	OnStop(fn func(Routine[T])) bool
}

Routine is an async routine which returns the result as a future, recovers on panics, and can be cancelled.

func NewRoutine

func NewRoutine[T any](fn Func[T]) Routine[T]

NewRoutine returns a new routine, but does not start it.

func Run

func Run[T any](fn Func[T]) Routine[T]

Run runs a function in a new routine, and returns the result, recovers on panics.

func Run1

func Run1[T any, A any](fn Func1[T, A], arg A) Routine[T]

Run1 runs a function in a new routine, and returns the result, recovers on panics.

func Stopped

func Stopped[T any](result T, st status.Status) Routine[T]

Stopped returns a routine which has stopped with the given result and status.

type RoutineDyn

type RoutineDyn interface {
	FutureDyn

	// Start starts the routine, if not stopped already.
	Start()

	// Stop requests the routine to stop and returns a wait channel.
	Stop() <-chan struct{}
}

RoutineDyn is a routine interface without generics, i.e. Routine[?].

type RoutineGroup

type RoutineGroup[T any] []Routine[T]

RoutineGroup is a group of routines of the same type. Use RoutineGroupDyn for a group of routines of different types.

func (RoutineGroup[T]) Await

func (g RoutineGroup[T]) Await(ctx Context) status.Status

Await waits for the completion of all routines in the group. The method returns the context status if the context is cancelled.

func (RoutineGroup[T]) AwaitAny

func (g RoutineGroup[T]) AwaitAny(ctx Context) (T, int, status.Status)

AwaitAny waits for the completion of any routine in the group, and returns its result. The method returns -1 and the context status if the context is cancelled.

func (RoutineGroup[T]) AwaitError

func (g RoutineGroup[T]) AwaitError(ctx Context) (int, status.Status)

AwaitError waits for any failure of the routines in the group, and returns the error. The method returns ok if all routines are successful.

func (RoutineGroup[T]) AwaitResults

func (g RoutineGroup[T]) AwaitResults(ctx Context) ([]Result[T], status.Status)

AwaitResults waits for the completion of all routines in the group, and returns the results. The method returns nil and the context status if the context is cancelled.

func (RoutineGroup[T]) Results

func (g RoutineGroup[T]) Results() []Result[T]

Results returns the results of all routines in the group.

func (RoutineGroup[T]) Statuses

func (g RoutineGroup[T]) Statuses() []status.Status

Statuses returns the statuses of all routines in the group.

func (RoutineGroup[T]) Stop

func (g RoutineGroup[T]) Stop()

Stop stops all routines in the group.

func (RoutineGroup[T]) StopWait

func (g RoutineGroup[T]) StopWait()

StopWait stops and awaits all routines in the group.

func (RoutineGroup[T]) Values

func (g RoutineGroup[T]) Values() []T

Values returns the values of all results in the group.

type RoutineGroupDyn

type RoutineGroupDyn []RoutineDyn

RoutineGroupDyn is a group of routines of different types. Use RoutineGroup for a group of routines of the same type.

func (RoutineGroupDyn) Await

func (g RoutineGroupDyn) Await(ctx Context) status.Status

Await waits for the completion of all routines in the group. The method returns the context status if the context is cancelled.

func (RoutineGroupDyn) AwaitAny

func (g RoutineGroupDyn) AwaitAny(ctx Context) (int, status.Status)

AwaitAny waits for the completion of any routine in the group, and returns its result. The method returns -1 and the context status if the context is cancelled.

func (RoutineGroupDyn) AwaitError

func (g RoutineGroupDyn) AwaitError(ctx Context) (int, status.Status)

AwaitError waits for any failure of the routines in the group, and returns the error. The method returns ok if all routines are successful.

func (RoutineGroupDyn) Statuses

func (g RoutineGroupDyn) Statuses() []status.Status

Statuses returns the statuses of all routines in the group.

func (RoutineGroupDyn) Stop

func (g RoutineGroupDyn) Stop()

Stop stops all routines in the group.

func (RoutineGroupDyn) StopWait

func (g RoutineGroupDyn) StopWait()

StopWait stops and awaits all routines in the group.

type RoutineVoid

type RoutineVoid = Routine[struct{}]

RoutineVoid is a routine which has no result.

func NewRoutineVoid

func NewRoutineVoid(fn func(ctx Context) status.Status) RoutineVoid

NewRoutineVoid returns a new routine without a result, but does not start it.

func RunVoid

func RunVoid(fn FuncVoid) RoutineVoid

RunVoid runs a procedure in a new routine, recovers on panics.

func RunVoid1

func RunVoid1[A any](fn FuncVoid1[A], arg A) RoutineVoid

RunVoid1 runs a procedure in a new routine, recovers on panics.

func StoppedVoid

func StoppedVoid(st status.Status) RoutineVoid

StoppedVoid returns a routine without a result which has stopped with the given status.

type Runner

type Runner interface {
	// Run runs the function.
	Run()
}

Runner is an interface that provides a run method.

func RunnerFunc

func RunnerFunc(fn func()) Runner

RunnerFunc returns a new runner from a function.

type Service

type Service interface {
	// Status returns the stop status or none.
	Status() status.Status

	// Running indicates that the service routine is running.
	Running() Flag

	// Stopped indicates that the service is stopped.
	Stopped() Flag

	// Start starts the service if not running.
	Start() status.Status

	// Stop requests the service to stop and returns its routine or a stopped routine.
	Stop() <-chan struct{}

	// Wait returns a channel which is closed when the service is stopped.
	Wait() <-chan struct{}
}

Service is a service which can be started and stopped.

func NewService

func NewService(fn func(ctx Context) status.Status) Service

NewService returns a new stopped service.

type StopGroup

type StopGroup struct {
	// contains filtered or unexported fields
}

StopGroup stops all operations in the group and awaits their completion.

func NewStopGroup

func NewStopGroup() *StopGroup

NewStopGroup creates a new stop group.

func (*StopGroup) Add

func (g *StopGroup) Add(s Stopper)

Add adds a stopper to the group, or immediately stops it if the group is stopped.

func (*StopGroup) AddMany

func (g *StopGroup) AddMany(stoppers ...Stopper)

AddMany adds multiple stoppers to the group, or immediatelly stops them if the group is stopped.

func (*StopGroup) AddService

func (g *StopGroup) AddService(s Service)

AddService adds a service to the group, or immediately stops it if the group is stopped.

func (*StopGroup) AddTicker

func (g *StopGroup) AddTicker(t *time.Ticker)

AddTicker adds a ticker to the group, or immediately stops it if the group is stopped.

func (*StopGroup) AddTimer

func (g *StopGroup) AddTimer(t *time.Timer)

AddTimer adds a timer to the group, or immediately stops it if the group is stopped.

func (*StopGroup) Stop

func (g *StopGroup) Stop()

Stop stops all operations in the group.

func (*StopGroup) StopWait

func (g *StopGroup) StopWait()

StopWait stops all operations in the group and awaits them.

func (*StopGroup) Wait

func (g *StopGroup) Wait()

Wait awaits all operations in the group.

type Stopper

type Stopper interface {
	// Wait returns a channel which is closed when the future is complete.
	Wait() <-chan struct{}

	// Stop requests the routine to stop and returns a wait channel.
	Stop() <-chan struct{}
}

Stopper stops a routine and awaits its stop.

type Stream

type Stream[T any] interface {
	// Next returns the next value from the stream, or false if the stream has ended.
	Next(ctx context.Context) (T, bool, status.Status)

	// Free frees the stream.
	Free()
}

Stream is an async stream of values.

func NewStream

func NewStream[T any](next NextFunc[T]) Stream[T]

NewStream returns a new stream.

func NewStreamFree

func NewStreamFree[T any](next NextFunc[T], free func()) Stream[T]

NewStreamFree returns a new stream with a free function.

func NewStreamFreer

func NewStreamFreer[T any](next NextFunc[T], freer ref.Freer) Stream[T]

NewStreamFreer returns a new stream with a freer.

type Variable

type Variable[T any] interface {
	// Get returns the current value/error, or false if pending.
	Get() (T, bool, status.Status)

	// GetWait returns the current value or waits for it or an error.
	GetWait(ctx context.Context) (T, status.Status)

	// Clear clears the variable.
	Clear()

	// Complete sets the value and status.
	Complete(value T, st status.Status)

	// Fail sets the error.
	Fail(st status.Status)

	// Set sets the value.
	Set(value T)

	// Wait returns a channel that is closed when the variable is set or failed.
	Wait() <-chan struct{}
}

[Experimental] Variable is an asynchronous variable which can be set, cleared, or failed.

func NewVariable

func NewVariable[T any]() Variable[T]

NewVariable returns a new pending async variable.

type WaitLock

type WaitLock = lock.WaitLock

WaitLock is a lock which allows others to wait until it is unlocked.

WaitLock does not guarantee that the lock is not acquired by another writer after its waiters are notified.

WaitLock can be used, for example, to execute a single operation by one writer, while other writers wait until the operation is completed.

Example:

lock := async.NewWaitLock()

func flush(cancel <-chan struct{}) {
	select {
	case <-lock.Lock():
		// Acquired lock
	default:
		// Await flushing end
		select {
		case <-lock.Wait():
		case <-cancel:
			return status.Cancelled
		}
	}
	defer lock.Unlock()

	// ... Do work ...
}

func NewWaitLock

func NewWaitLock() WaitLock

NewWaitLock returns a new unlocked lock.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL