fnx

package
v0.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: Apache-2.0 Imports: 11 Imported by: 15

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Converter

type Converter[T any, O any] func(context.Context, T) (O, error)

Converter is a function type that converts T objects int objects of type O.

func MakeConverter

func MakeConverter[T any, O any](op func(T) O) Converter[T, O]

MakeConverter builds a Transform function out of an equivalent function that doesn't take a context or return an error.

func MakeConverterErr

func MakeConverterErr[T any, O any](op func(T) (O, error)) Converter[T, O]

MakeConverterErr constructs a Transform function from an analogous function that does not take a context.

func MakeCovnerterOk

func MakeCovnerterOk[T any, O any](op func(T) (O, bool)) Converter[T, O]

MakeCovnerterOk builds a Transform function from a function that converts between types T and O, but that returns a boolean/check value. When the converter function returns false the transform function returns a ers.ErrCurrentOpSkip error.

func (Converter[T, O]) Convert

func (mpf Converter[T, O]) Convert(ctx context.Context, in T) (O, error)

Convert uses the converter function to transform a value from one type (T) to another (O).

func (Converter[T, O]) Lock

func (mpf Converter[T, O]) Lock() Converter[T, O]

Lock returns a Transform function that's executed the root function inside of the sope of a mutex.

func (Converter[T, O]) Wait

func (mpf Converter[T, O]) Wait(in T) (O, error)

Wait calls the transform function passing a context that cannot expire.

func (Converter[T, O]) WithLock

func (mpf Converter[T, O]) WithLock(mu *sync.Mutex) Converter[T, O]

WithLock returns a Transform function inside of the scope of the provided mutex.

func (Converter[T, O]) WithRecover

func (mpf Converter[T, O]) WithRecover() Converter[T, O]

WithRecover returns a Transform function that catches a panic, converts the panic object to an error if needed, and aggregates that with the Transform function's error.

type Future

type Future[T any] func(context.Context) (T, error)

Future is a function type that is a failrly common constructor. It's signature is used to create iterators/streams, as a future, and functions like a Future.

func CheckedFuture

func CheckedFuture[T any](op func() (T, bool)) Future[T]

CheckedFuture wraps a function object that uses the second ("OK") value to indicate that no more values will be produced. Errors returned from the resulting produce are always either the context cancellation error or io.EOF.

func MakeFuture

func MakeFuture[T any](fn func() (T, error)) Future[T]

MakeFuture constructs a future that wraps a similar function that does not take a context.

func NewFuture

func NewFuture[T any](fn func(ctx context.Context) (T, error)) Future[T]

NewFuture returns a future as a convenience function to avoid the extra cast when creating new function objects.

func PtrFuture

func PtrFuture[T any](fn func() *T) Future[T]

PtrFuture uses a function that returns a pointer to a value and converts that into a future that de-references and returns non-nil values of the pointer, and returns EOF for nil values of the pointer.

func StaticFuture

func StaticFuture[T any](val T, err error) Future[T]

StaticFuture returns a future function that always returns the provided values.

func ValueFuture

func ValueFuture[T any](val T) Future[T]

ValueFuture returns a future function that always returns the provided value, and a nill error.

func WrapFuture

func WrapFuture[T any](f fn.Future[T]) Future[T]

WrapFuture creates a future for the fn.Future function. The underlying Future's panics are converted to errors.

func (Future[T]) After

func (pf Future[T]) After(ts time.Time) Future[T]

After will return a Future that will block until the provided time is in the past, and then execute normally.

func (Future[T]) Check

func (pf Future[T]) Check(ctx context.Context) (T, bool)

Check converts the error into a boolean, with true indicating success and false indicating (but not propagating it.).

func (Future[T]) Delay

func (pf Future[T]) Delay(d time.Duration) Future[T]

Delay wraps a Future in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Future[T]) Filter

func (pf Future[T]) Filter(fl func(T) bool) Future[T]

Filter creates a function that passes the output of the future to the filter function, which, if it returns true. is returned to the caller, otherwise the Future returns the zero value of type T and ers.ErrCurrentOpSkip error (e.g. continue), which streams and other future-consuming functions can respect.

func (Future[T]) Force

func (pf Future[T]) Force() fn.Future[T]

Force combines the semantics of Must and Wait as a future: when the future is resolved, the future executes with a context that never expires and panics in the case of an error.

func (Future[T]) Future

func (pf Future[T]) Future(ctx context.Context, ob fn.Handler[error]) fn.Future[T]

Future creates a future function using the context provided and error observer to collect the error.

func (Future[T]) If

func (pf Future[T]) If(cond bool) Future[T]

If returns a future that will execute the root future only if the cond value is true. Otherwise, If will return the zero value for T and a nil error.

func (Future[T]) Ignore

func (pf Future[T]) Ignore(ctx context.Context) fn.Future[T]

Ignore creates a future that runs the future and returns the value, ignoring the error.

func (Future[T]) Jitter

func (pf Future[T]) Jitter(jf func() time.Duration) Future[T]

Jitter wraps a Future that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Future.

If the function produces a negative duration, there is no delay.

func (Future[T]) Join

func (pf Future[T]) Join(next Future[T]) Future[T]

Join runs the first future until it returns an io.EOF error, and then returns the results of the second future. A future will be re-run until it returns a terminating error (e.g. io.EOF, ers.ErrContainerClosed, or ers.ErrCurrentOpAbort). If the first future returns any other error, that error is returned to the caller and the second future never runs.

When the second function returns a terminating error, or any other error, all successive calls return io.EOF.

func (Future[T]) Limit

func (pf Future[T]) Limit(in int) Future[T]

Limit runs the future a specified number of times, and caches the result of the last execution and returns that value for any subsequent executions.

func (Future[T]) Lock

func (pf Future[T]) Lock() Future[T]

Lock creates a future that runs the root mutex as per normal, but under the protection of a mutex so that there's only one execution of the future at a time.

func (Future[T]) Must

func (pf Future[T]) Must(ctx context.Context) fn.Future[T]

Must returns a future that resolves the future returning the constructed value and panicing if the future errors.

func (Future[T]) Once

func (pf Future[T]) Once() Future[T]

Once returns a future that only executes ones, and caches the return values, so that subsequent calls to the output future will return the same values.

func (Future[T]) PostHook

func (pf Future[T]) PostHook(op func()) Future[T]

PostHook appends a function to the execution of the future. If the function panics it is converted to an error and aggregated with the error of the future.

Useful for calling context.CancelFunc, closers, or incrementing counters as necessary.

func (Future[T]) PreHook

func (pf Future[T]) PreHook(op Operation) Future[T]

PreHook configures an operation function to run before the returned future. If the pre-hook panics, it is converted to an error which is aggregated with the (potential) error from the future, and returned with the future's output.

func (Future[T]) Read

func (pf Future[T]) Read(ctx context.Context) (T, error)

Read executes the future and returns the result.

func (Future[T]) Resolve

func (pf Future[T]) Resolve() T

Resolve calls the function returned by Force resolving the future. This ignores all errors and provides a the underlying future with a context that will never be canceled.

func (Future[T]) Retry

func (pf Future[T]) Retry(n int) Future[T]

Retry constructs a worker function that takes runs the underlying future until the error value is nil, or it encounters a terminating error (io.EOF, ers.ErrAbortCurrentOp, or context cancellation.) In all cases, unless the error value is nil (e.g. the retry succeeds)

Context cancellation errors are returned to the caller, other terminating errors are not, with any other errors encountered during retries. ers.ErrCurrentOpSkip is always ignored and not aggregated. All errors are discarded if the retry operation succeeds in the provided number of retries.

Except for ers.ErrCurrentOpSkip, which is ignored, all other errors are aggregated and returned to the caller only if the retry fails. It's possible to return a nil error and a zero value, if the future only returned ers.ErrCurrentOpSkip values.

func (Future[T]) TTL

func (pf Future[T]) TTL(dur time.Duration) Future[T]

TTL runs the future only one time per specified interval. The interval must me greater than 0.

func (Future[T]) Wait

func (pf Future[T]) Wait() (T, error)

Wait runs the future with a context that will ever expire.

func (Future[T]) When

func (pf Future[T]) When(cond func() bool) Future[T]

When constructs a future that will call the cond upon every execution, and when true, will run and return the results of the root future. Otherwise When will return the zero value of T and a nil error.

func (Future[T]) WithCancel

func (pf Future[T]) WithCancel() (Future[T], context.CancelFunc)

WithCancel creates a Future and a cancel function which will terminate the context that the root Future is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Future is canceled.)

func (Future[T]) WithErrorCheck

func (pf Future[T]) WithErrorCheck(ef fn.Future[error]) Future[T]

WithErrorCheck takes an error future, and checks it before executing the future function. If the error future returns an error (any error), the future propagates that error, rather than running the underying future. Useful for injecting an abort into an existing pipleine or chain.

func (Future[T]) WithErrorFilter

func (pf Future[T]) WithErrorFilter(ef erc.Filter) Future[T]

WithErrorFilter passes the error of the root Future function with the erc.Filter.

func (Future[T]) WithLock

func (pf Future[T]) WithLock(mtx *sync.Mutex) Future[T]

WithLock uses the provided mutex to protect the execution of the future.

func (Future[T]) WithLocker

func (pf Future[T]) WithLocker(mtx sync.Locker) Future[T]

WithLocker uses the provided mutex to protect the execution of the future.

func (Future[T]) WithRecover

func (pf Future[T]) WithRecover() Future[T]

WithRecover returns a wrapped future with a panic handler that converts any panic to an error.

func (Future[T]) WithoutErrors

func (pf Future[T]) WithoutErrors(errs ...error) Future[T]

WithoutErrors returns a Future function that wraps the root future and, after running the root future, and makes the error value of the future nil if the error returned is in the error list. The produced value in these cases is almost always the zero value for the type.

type Handler

type Handler[T any] func(context.Context, T) error

Handler are generic functions that take an argument (and a context) and return an error. They're the type of function used in various stream methods to implement worker pools, service management, and stream processing.

func FromHandler

func FromHandler[T any](f fn.Handler[T]) Handler[T]

FromHandler up-converts a fn.Handler (e.g. a more simple handler function that doesn't return an error or take a context,) into a fun.Handler. the underlying function runs with a panic handler (so fn.Handler panics are converted to errors.)

func JoinHandlers

func JoinHandlers[T any](pfs ...Handler[T]) Handler[T]

JoinHandlers takes a collection of Handler functions and merges them into a single chain, eliding any nil processors.

func MakeHandler

func MakeHandler[T any](fn func(T) error) Handler[T]

MakeHandler converts a function with the Handler signature (minus the context) for easy conversion.

func NewHandler

func NewHandler[T any](fn func(context.Context, T) error) Handler[T]

NewHandler returns a Handler Function. This is a convenience function to avoid the extra cast when creating new function objects.

func (Handler[T]) After

func (pf Handler[T]) After(ts time.Time) Handler[T]

After produces a Handler that will execute after the provided timestamp.

func (Handler[T]) Capture

func (pf Handler[T]) Capture() fn.Handler[T]

Capture creates a handler function that like, Handler.Force, passes a background context and ignores the processors error.

func (Handler[T]) Check

func (pf Handler[T]) Check(ctx context.Context, in T) bool

Check processes the input and returns true when the error is nil, and false when there was an error.

func (Handler[T]) Delay

func (pf Handler[T]) Delay(dur time.Duration) Handler[T]

Delay wraps a Handler in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Handler[T]) Filter

func (pf Handler[T]) Filter(fl func(T) bool) Handler[T]

Filter returns a wrapping processor that takes a function a function that only calls the processor when the filter function returns true, and returns ers.ErrCurrentOpSkip otherwise.

func (Handler[T]) Force

func (pf Handler[T]) Force(in T)

Force processes the input, but discards the error and uses a context that will not expire.

func (Handler[T]) Handler

func (pf Handler[T]) Handler(ctx context.Context, oe fn.Handler[error]) fn.Handler[T]

Handler converts a processor into an observer, handling the error with the error observer and using the provided context.

func (Handler[T]) If

func (pf Handler[T]) If(c bool) Handler[T]

If runs the processor function if, and only if the condition is true. Otherwise the function does not run and the processor returns nil.

The resulting processor can be used more than once.

func (Handler[T]) Ignore

func (pf Handler[T]) Ignore(ctx context.Context, in T)

Ignore runs the process function and discards the error.

func (Handler[T]) Jitter

func (pf Handler[T]) Jitter(jf func() time.Duration) Handler[T]

Jitter wraps a Handler that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the processor.

If the function produces a negative duration, there is no delay.

func (Handler[T]) Join

func (pf Handler[T]) Join(pfs ...Handler[T]) Handler[T]

Join combines a sequence of processors on the same input, calling each function in order, as long as there is no error and the context does not expire. Context expiration errors are not propagated.

func (Handler[T]) Limit

func (pf Handler[T]) Limit(n int) Handler[T]

Limit ensures that the processor is called at most n times.

func (Handler[T]) Lock

func (pf Handler[T]) Lock() Handler[T]

Lock wraps the Handler and protects its execution with a mutex.

func (Handler[T]) Must

func (pf Handler[T]) Must(ctx context.Context, in T)

Must calls the underlying handler, but converts all errors returned by the handler into panics.

func (Handler[T]) Once

func (pf Handler[T]) Once() Handler[T]

Once make a processor that can only run once. Subsequent calls to the processor return the cached error of the original run.

func (Handler[T]) PostHook

func (pf Handler[T]) PostHook(op func()) Handler[T]

PostHook produces an amalgamated processor that runs after the processor completes. Panics are caught, converted to errors, and aggregated with the processors error. The hook operation is unconditionally called after the processor function (except in the case of a processor panic.)

func (Handler[T]) PreHook

func (pf Handler[T]) PreHook(op Operation) Handler[T]

PreHook creates an amalgamated Handler that runs the operation before the root processor. If the operation panics that panic is converted to an error and merged with the processor's error. Use with Operation.Once() to create an "init" function that runs once before a processor is called the first time.

func (Handler[T]) Read

func (pf Handler[T]) Read(ctx context.Context, in T) error

Read executes the Handler once.

func (Handler[T]) TTL

func (pf Handler[T]) TTL(dur time.Duration) Handler[T]

TTL returns a Handler that runs once in the specified window, and returns the error from the last run in between this interval. While the executions of the underlying function happen in isolation, in between, the processor is concurrently accessible.

func (Handler[T]) Wait

func (pf Handler[T]) Wait(in T) error

Wait runs the Handler with a context that will never be canceled.

func (Handler[T]) When

func (pf Handler[T]) When(c func() bool) Handler[T]

When returns a processor function that runs if the conditional function returns true, and does not run otherwise. The conditional function is evaluated every time the returned processor is run.

func (Handler[T]) WithCancel

func (pf Handler[T]) WithCancel() (Handler[T], context.CancelFunc)

WithCancel creates a Handler and a cancel function which will terminate the context that the root proccessor is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Handler is canceled.)

func (Handler[T]) WithErrorFilter

func (pf Handler[T]) WithErrorFilter(ef erc.Filter) Handler[T]

WithErrorFilter uses an erc.Filter to process the error respose from the processor.

func (Handler[T]) WithLock

func (pf Handler[T]) WithLock(mtx *sync.Mutex) Handler[T]

WithLock wraps the Handler and ensures that the mutex is always held while the root Handler is called.

func (Handler[T]) WithLocker

func (pf Handler[T]) WithLocker(mtx sync.Locker) Handler[T]

WithLocker wraps the Handler and ensures that the sync.Locker instance is always held while the root Handler is called.

func (Handler[T]) WithRecover

func (pf Handler[T]) WithRecover() Handler[T]

WithRecover runs the producer, converted all panics into errors. WithRecover is itself a processor.

func (Handler[T]) WithoutErrors

func (pf Handler[T]) WithoutErrors(errs ...error) Handler[T]

WithoutErrors returns a producer that will convert a non-nil error of the provided types to a nil error.

type Operation

type Operation func(context.Context)

Operation is a type of function object that will block until an operation returns or the context is canceled.

func MakeOperation

func MakeOperation(in func()) Operation

MakeOperation converts a function that takes no arguments into an Operation.

func WaitChannel

func WaitChannel[T any](ch <-chan T) Operation

WaitChannel converts a channel (typically, a `chan struct{}`) to a Operation. The Operation blocks till it's context is canceled or the channel is either closed or returns one item.

func WaitContext

func WaitContext(ctx context.Context) Operation

WaitContext wait's for the context to be canceled before returning. The Operation that's return also respects it's own context. Use this Operation and it's own context to wait for a context to be cacneled with a timeout, for instance.

func (Operation) Add

func (wf Operation) Add(ctx context.Context, wg *WaitGroup)

Add starts a the operation in a goroutine incrementing and decrementing the WaitGroup as appropriate.

func (Operation) After

func (wf Operation) After(ts time.Time) Operation

After provides an operation that will only run if called after the specified clock time. When called after this time, the operation blocks until that time passes (or the context is canceled.)

func (Operation) Background

func (wf Operation) Background(ctx context.Context)

Background launches the operation in a go routine. There is no panic-safety provided.

func (Operation) Delay

func (wf Operation) Delay(dur time.Duration) Operation

Delay wraps a Operation in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Operation) Go

func (wf Operation) Go() Operation

Go provides access to the Go method (e.g. starting this operation in a go routine.) as a method that can be used as an operation itself.

func (Operation) Group

func (wf Operation) Group(n int) Operation

Group makes an operation that runs n copies of the underlying worker, in different go routines. Work does not start until the resulting worker is called.

func (Operation) If

func (wf Operation) If(cond bool) Operation

If provides a static version of the When that only runs if the condition is true, and is otherwise a noop.

func (Operation) Interval

func (wf Operation) Interval(dur time.Duration) Operation

Interval runs the operation with a timer that resets to the provided duration. The operation runs immediately, and then the time is reset to the specified interval after the base operation is completed. Which is to say that the runtime of the operation itself is effectively added to the interval.

func (Operation) Jitter

func (wf Operation) Jitter(dur func() time.Duration) Operation

Jitter wraps a Operation that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Operation operation.

If the function produces a negative duration, there is no delay.

func (Operation) Join

func (wf Operation) Join(ops ...Operation) Operation

Join combines a sequence of operations, calling the Operations in order as long as the context does not expire. If the context expires, the combined operation aborts early.

func (Operation) Launch

func (wf Operation) Launch(ctx context.Context) Operation

Launch starts the operation in a background go routine and returns an operation which blocks until it's context is canceled or the underlying operation returns.

func (Operation) Limit

func (wf Operation) Limit(in int) Operation

Limit returns an operation that will only run the specified number of times. The resulting operation is safe for concurrent use, but operations can run concurrently. All limits less that 1 are treated as 1.

func (Operation) Lock

func (wf Operation) Lock() Operation

Lock constructs a mutex that ensure that the underlying operation (when called through the output operation,) only runs within the scope of the lock.

func (Operation) Once

func (wf Operation) Once() Operation

Once produces an operation that will only execute the root operation once, no matter how many times it's called.

func (Operation) PostHook

func (wf Operation) PostHook(hook func()) Operation

PostHook unconditionally runs the post-hook operation after the operation returns, as a defer. Use the hook to run cleanup operations. The Operation returned from this method runs both the original hook, and the hook function.

func (Operation) PreHook

func (wf Operation) PreHook(hook Operation) Operation

PreHook unconditionally runs the hook operation before the underlying operation. Use Operaiton.Once() operations for the hook to initialize resources for use by the operation, or without Once to provide reset semantics. The Operation returned from this method runs both the original hook, and the hook function.

func (Operation) Run

func (wf Operation) Run(ctx context.Context)

Run is equivalent to calling the operation directly.

func (Operation) Signal

func (wf Operation) Signal(ctx context.Context) <-chan struct{}

Signal starts the operation in a go routine, and provides a signal channel which will be closed when the operation is complete.

func (Operation) StartGroup

func (wf Operation) StartGroup(ctx context.Context, n int) Operation

StartGroup runs n operations, incrementing the WaitGroup to account for the job. Callers must wait on the WaitGroup independently.

func (Operation) TTL

func (wf Operation) TTL(dur time.Duration) Operation

TTL runs an operation, and if the operation is called before the specified duration, the operation is a noop.

func (Operation) Wait

func (wf Operation) Wait()

Wait runs the operation with a background context.

func (Operation) When

func (wf Operation) When(cond func() bool) Operation

When runs the condition function, and if it returns true,.

func (Operation) While

func (wf Operation) While() Operation

While runs the operation in a tight loop, until the context expires.

func (Operation) WithCancel

func (wf Operation) WithCancel() (Operation, context.CancelFunc)

WithCancel creates a Operation and a cancel function which will terminate the context that the root Operation is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Operation is canceled.)

func (Operation) WithContextHook

func (wf Operation) WithContextHook(hook func(context.Context) context.Context) Operation

WithContextHook returns an wrapped operation where the provided context hook intercepts the context that the worker function is called with. This makes a kind of rudamentary middleware possible.

func (Operation) WithErrorHook

func (wf Operation) WithErrorHook(ef fn.Handler[error]) Worker

WithErrorHook prodces a Worker function that runs the operation--potentially catching a panic and converting it to an error--and then passes that error to the error handler, before returning the original error. These errors are either context cancilations or error that were converted from panics.

func (Operation) WithLock

func (wf Operation) WithLock(mtx *sync.Mutex) Operation

WithLock ensures that the underlying operation, when called through the output operation, will holed the mutex while running.

func (Operation) WithRecover

func (wf Operation) WithRecover() Worker

WithRecover converts the Operation into a Worker function that catchers panics and returns them as errors using fun.Check.

func (Operation) Worker

func (wf Operation) Worker() Worker

Worker converts a wait function into a fun.Worker. If the context is canceled, the worker function returns the context's error. Does not handle panics, use WithRecover() to convert panics to errors.

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

WaitGroup works like sync.WaitGroup, except that the Wait method takes a context (and can be passed as a fnx.Operation). The implementation is exceptionally simple. The only constraint, like sync.WaitGroup, is that you can never modify the value of the internal counter such that it is negative, event transiently. The implementation does not require background resources aside from Wait, which creates a single goroutine that lives for the entire time that Wait is running, but no other background resources are created. Multiple copies of Wait can be safely called at once, and the WaitGroup is reusable more than once.

This implementation is about 50% slower than sync.WaitGroup after informal testing. It provides a little extra flexiblity and introspection, with similar semantics, that may be worth the additional performance hit.

func (*WaitGroup) Add

func (wg *WaitGroup) Add(num int)

Add modifies the internal counter. Raises an ErrInvariantViolation error if any modification causes the internal coutner to be less than 0.

func (*WaitGroup) Done

func (wg *WaitGroup) Done()

Done marks a single operation as done.

func (*WaitGroup) Group

func (wg *WaitGroup) Group(n int, op Operation) Operation

Group returns an operation that, when executed, starts <n> copies of the operation and blocks until all have finished.

func (*WaitGroup) Inc

func (wg *WaitGroup) Inc()

Inc adds one item to the wait group.

func (*WaitGroup) IsDone

func (wg *WaitGroup) IsDone() bool

IsDone returns true if there is pending work, and false otherwise.

func (*WaitGroup) Launch

func (wg *WaitGroup) Launch(ctx context.Context, op Operation)

Launch increments the WaitGroup and starts the operation in a go routine.

func (*WaitGroup) Num

func (wg *WaitGroup) Num() int

Num returns the number of pending workers.

func (*WaitGroup) Operation

func (wg *WaitGroup) Operation() Operation

Operation returns with WaitGroups Wait method as a Operation.

func (*WaitGroup) OperationHandler

func (wg *WaitGroup) OperationHandler(ctx context.Context) fn.Handler[Operation]

OperationHandler returns a handelr function that accepts new Operation functions, starts them in their own Go routine, with the provided context. These handler functions are reusable.

func (*WaitGroup) StartGroup

func (wg *WaitGroup) StartGroup(ctx context.Context, n int, op Operation) Operation

StartGroup starts <n> copies of the operation in separate threads and returns an operation that waits on the wait group.

func (*WaitGroup) Wait

func (wg *WaitGroup) Wait(ctx context.Context)

Wait blocks until either the context is canceled or all items have completed.

Wait is pasable or usable as a fnx.Operation.

In many cases, callers should not rely on the Wait operation returning after the context expires: If Done() calls are used in situations that respect a context cancellation, aborting the Wait on a context cancellation, particularly when Wait gets a context that has the same lifecycle as the operations its waiting on, the result is that worker routines will leak. Nevertheless, in some situations, when workers may take a long time to respond to a context cancellation, being able to set a second deadline on Waiting may be useful.

Consider using `fnx.Operation(wg.Wait).Block()` if you want blocking semantics with the other features of this WaitGroup implementation.

func (*WaitGroup) Worker

func (wg *WaitGroup) Worker() Worker

Worker returns a worker that will block on the wait group returning and return the conbext's error if one exits.

func (*WaitGroup) WorkerHandler

func (wg *WaitGroup) WorkerHandler(ctx context.Context, eh fn.Handler[error]) fn.Handler[Worker]

WorkerHandler returns a handelr function that accepts new Worker functions, starts them in their own Go routine, with the provided context. Errors are collected by the error handler, eventually. These handler functions are reusable.

type Worker

type Worker func(context.Context) error

Worker represents a basic function used in worker pools and other similar situations.

func MakeWorker

func MakeWorker(fn func() error) Worker

MakeWorker converts a non-context worker function into a worker for compatibility with tooling.

func (Worker) After

func (wf Worker) After(ts time.Time) Worker

After returns a Worker that blocks until the timestamp provided is in the past. Additional calls to this worker will run immediately. If the timestamp is in the past the resulting worker will run immediately.

func (Worker) Background

func (wf Worker) Background(ctx context.Context, ob fn.Handler[error]) Operation

Background starts the worker function in a go routine, passing the error to the provided observer function.

func (Worker) Check

func (wf Worker) Check(ctx context.Context) bool

Check runs the worker and returns true (ok) if there was no error, and false otherwise.

func (Worker) Delay

func (wf Worker) Delay(dur time.Duration) Worker

Delay wraps a Worker in a function that will always wait for the specified duration before running.

If the value is negative, then there is always zero delay.

func (Worker) Group

func (wf Worker) Group(n int) Worker

Group makes a worker that runs n copies of the underlying worker, in different go routines and aggregates their output. Work does not start until the resulting worker is called.

func (Worker) If

func (wf Worker) If(cond bool) Worker

If returns a Worker function that runs only if the condition is true. The error is always nil if the condition is false. If-ed functions may be called more than once, and will run multiple times potentiall.y.

func (Worker) Ignore

func (wf Worker) Ignore() Operation

Ignore converts the worker into a Operation that discards the error produced by the worker.

func (Worker) Interval

func (wf Worker) Interval(dur time.Duration) Worker

Interval runs the worker with a timer that resets to the provided duration. The worker runs immediately, and then the time is reset to the specified interval after the base worker has. Which is to say that the runtime of the worker's operation is effectively added to the interval.

The interval worker will run until the context is canceled or the worker returns an error.

func (Worker) Jitter

func (wf Worker) Jitter(jf func() time.Duration) Worker

Jitter wraps a Worker that runs the jitter function (jf) once before every execution of the resulting function, and waits for the resulting duration before running the Worker.

If the function produces a negative duration, there is no delay.

func (Worker) Join

func (wf Worker) Join(wfs ...Worker) Worker

Join combines a sequence of workers, calling the workers in order, as long as there is no error and the context does not expire. Context expiration errors are not propagated. Does not skip nil workers.

func (Worker) Launch

func (wf Worker) Launch(ctx context.Context) Worker

Launch runs the worker function in a go routine and returns a new fun.Worker which will block for the context to expire or the background worker to completes, which returns the error from the background request.

The underlying worker begins executing before future returns.

func (Worker) Limit

func (wf Worker) Limit(n int) Worker

Limit produces a worker than runs exactly n times. Each execution is isolated from every other, but once the limit is exceeded, the result of the *last* worker to execute is cached concurrent access to that value is possible.

func (Worker) Lock

func (wf Worker) Lock() Worker

Lock produces a Worker that will be executed within the scope of a (managed) mutex.

func (Worker) Must

func (wf Worker) Must() Operation

Must converts a Worker function into a wait function; however, if the worker produces an error Must converts the error into a panic.

func (Worker) Once

func (wf Worker) Once() Worker

Once wraps the Worker in a function that will execute only once. The return value (error) is cached, and can be accessed many times without re-running the worker.

func (Worker) Operation

func (wf Worker) Operation(ob fn.Handler[error]) Operation

Operation converts a worker function into a wait function, passing any error to the handler function.

func (Worker) PostHook

func (wf Worker) PostHook(post func()) Worker

PostHook runs hook operation after the worker function completes. The Worker function's panics are converted to errors and the hook will continue to run.

func (Worker) PreHook

func (wf Worker) PreHook(pre Operation) Worker

PreHook returns a Worker that runs an operatio unconditionally before running the underlying worker. If the hook function panics it is converted to an error and aggregated with the worker's error.

func (Worker) Retry

func (wf Worker) Retry(n int) Worker

Retry constructs a worker function that takes runs the underlying worker until the return value is nil, or it encounters a terminating error (io.EOF, ers.ErrAbortCurrentOp, or context cancellation.) Context cancellation errors are returned to the caller with any other errors encountered in previous retries, other terminating errors are not. All errors are discarded if the retry operation succeeds in the provided number of retries.

Except for ErrStreamContinue, which is ignored, all other errors are aggregated and returned to the caller only if the retry fails.

func (Worker) Run

func (wf Worker) Run(ctx context.Context) error

Run is equivalent to calling the worker function directly.

func (Worker) Signal

func (wf Worker) Signal(ctx context.Context) <-chan error

Signal runs the worker function in a background goroutine and returns the error in an error channel, that returns when the worker function returns. If Signal is called with a canceled context the worker is still executed (with that context.)

A value, possibly nil, is always sent through the channel. Panics are not caught or handled.

func (Worker) StartGroup

func (wf Worker) StartGroup(ctx context.Context, n int) Worker

StartGroup starts n copies of the worker operation and returns a future/worker that returns the aggregated errors from all workers

The operation is fundamentally continue-on-error. To get abort-on-error semantics, use the Filter() method on the input worker, that cancels the context on when it sees an error.

func (Worker) TTL

func (wf Worker) TTL(dur time.Duration) Worker

TTL produces a worker that will only run once during every specified duration, when called more than once. During the interval between calls, the previous error is returned. While each execution of the root worker is protected by a mutex, the resulting worker can be used in parallel during the intervals between calls.

func (Worker) Wait

func (wf Worker) Wait() error

Wait runs the worker with a background context and returns its error.

func (Worker) When

func (wf Worker) When(cond func() bool) Worker

When wraps a Worker function that will only run if the condition function returns true. If the condition is false the worker does not execute. The condition function is called in between every operation.

When worker functions may be called more than once, and will run multiple times potentially.

func (Worker) While

func (wf Worker) While() Worker

While runs the Worker in a continuous while loop, returning only if the underlying worker returns an error or if the context is cancled.

func (Worker) WithCancel

func (wf Worker) WithCancel() (Worker, context.CancelFunc)

WithCancel creates a Worker and a cancel function which will terminate the context that the root Worker is running with. This context isn't canceled *unless* the cancel function is called (or the context passed to the Worker is canceled.)

func (Worker) WithContextHook

func (wf Worker) WithContextHook(hook func(context.Context) context.Context) Worker

WithContextHook wraps the worker function, and passes the context received by that function through the hook function before calling the underlying worker.

func (Worker) WithErrorFilter

func (wf Worker) WithErrorFilter(ef erc.Filter) Worker

WithErrorFilter wraps the worker with a Worker that passes the output of the root Worker's error and returns the output of the filter.

The ers package provides a number of filter implementations but any function in the following form works:

func(error) error

func (Worker) WithErrorHook

func (wf Worker) WithErrorHook(eh fn.Handler[error]) Worker

WithErrorHook runs the worker, potentially catching a panic and converting that to an error. The output of the worker function is passed to the error hook (regardless of the test's error status) before returning the original error. Panics in the error handler are not handled.

func (Worker) WithLock

func (wf Worker) WithLock(mtx sync.Locker) Worker

WithLock produces a Worker that will be executed within the scope of the provided mutex.

func (Worker) WithRecover

func (wf Worker) WithRecover() Worker

WithRecover produces a worker function that converts the worker function's panics to errors.

func (Worker) WithoutErrors

func (wf Worker) WithoutErrors(errs ...error) Worker

WithoutErrors returns a worker that will return nil if the error returned by the worker is one of the errors passed to WithoutErrors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL