Documentation
¶
Overview ¶
Package helpers is a set of useful functions when working with async state machines.
Index ¶
- Constants
- Variables
- func Activations(u uint64) int
- func Add1Async(ctx context.Context, mach am.Api, waitState string, addState string, args am.A) am.Result
- func Add1Block(ctx context.Context, mach am.Api, state string, args am.A) am.Result
- func Add1Sync(ctx context.Context, mach am.Api, state string, args am.A) am.Result
- func ArgsFromMap[T any](args am.A, dest T) T
- func ArgsToArgs[T any](src interface{}, dest T) T
- func ArgsToLogMap(args interface{}, maxLen int) map[string]string
- func AskAdd(mach am.Api, states am.S, args am.A) am.Result
- func AskAdd1(mach am.Api, state string, args am.A) am.Result
- func AskEvAdd(e *am.Event, mach am.Api, states am.S, args am.A) am.Result
- func AskEvAdd1(e *am.Event, mach am.Api, state string, args am.A) am.Result
- func AskEvRemove(e *am.Event, mach am.Api, states am.S, args am.A) am.Result
- func AskEvRemove1(e *am.Event, mach am.Api, state string, args am.A) am.Result
- func AskRemove(mach am.Api, states am.S, args am.A) am.Result
- func AskRemove1(mach am.Api, state string, args am.A) am.Result
- func CantAdd(mach am.Api, states am.S, args am.A) bool
- func CantAdd1(mach am.Api, state string, args am.A) bool
- func CantRemove(mach am.Api, states am.S, args am.A) bool
- func CantRemove1(mach am.Api, state string, args am.A) bool
- func CopySchema(source am.Schema, target *am.Machine, states am.S) error
- func CountRelations(state *am.State) int
- func Dispose(mach am.Api)
- func DisposeBind(mach am.Api, handler am.HandlerDispose)
- func EnableDebugging(stdout bool)
- func EvalGetter[T any](ctx context.Context, source string, maxTries int, mach *am.Machine, ...) (T, error)
- func ExecAndClose(fn func()) <-chan struct{}
- func GetTransitionStates(tx *am.Transition, index am.S) (added am.S, removed am.S, touched am.S)
- func GroupWhen1(machs []am.Api, state string, ctx context.Context) ([]<-chan struct{}, error)
- func Hash(in string, l int) string
- func Healthcheck(mach am.Api)
- func Implements(statesChecked, statesNeeded am.S) error
- func IndexesToStates(allStates am.S, indexes []int) am.S
- func Interval(ctx context.Context, length time.Duration, interval time.Duration, ...) error
- func IsDebug() bool
- func IsMulti(mach am.Api, state string) bool
- func IsTelemetry() bool
- func IsTestRunner() bool
- func MachDebug(mach am.Api, amDbgAddr string, logLvl am.LogLevel, stdout bool, ...) error
- func MachDebugEnv(mach am.Api) error
- func NewMirror(id string, flat bool, source *am.Machine, handlers any, states am.S) (*am.Machine, error)
- func Pool(limit int) *errgroup.Group
- func PrefixStates(schema am.Schema, prefix string, removeDups bool, optWhitelist, optBlacklist S) am.Schema
- func RemoveMulti(mach am.Api, state string) am.HandlerFinal
- func ResultToErr(result am.Result) error
- func SchemaHash(schema am.Schema) string
- func SchemaImplements(schema am.Schema, states am.S) error
- func SchemaStates(schema am.Schema) am.S
- func SemConfigEnv(forceFull bool) *am.SemConfig
- func SetEnvLogLevel(level am.LogLevel)
- func StatesToIndexes(allStates am.S, states am.S) []int
- func TagValue(tags []string, key string) string
- func TagValueInt(tags []string, key string) int
- func Wait(ctx context.Context, length time.Duration) bool
- func WaitForAll(ctx context.Context, timeout time.Duration, chans ...<-chan struct{}) error
- func WaitForAny(ctx context.Context, timeout time.Duration, chans ...<-chan struct{}) error
- func WaitForErrAll(ctx context.Context, timeout time.Duration, mach am.Api, ...) error
- func WaitForErrAny(ctx context.Context, timeout time.Duration, mach *am.Machine, ...) error
- type A
- type Cond
- type MachGroup
- type MutRequest
- func NewMutRequest(mach am.Api, mutType am.MutationType, states am.S, args am.A) *MutRequest
- func NewReqAdd(mach am.Api, states am.S, args am.A) *MutRequest
- func NewReqAdd1(mach am.Api, state string, args am.A) *MutRequest
- func NewReqRemove(mach am.Api, states am.S, args am.A) *MutRequest
- func NewReqRemove1(mach am.Api, state string, args am.A) *MutRequest
- func (r *MutRequest) Backoff(backoff time.Duration) *MutRequest
- func (r *MutRequest) Clone(mach am.Api, mutType am.MutationType, states am.S, args am.A) *MutRequest
- func (r *MutRequest) Delay(delay time.Duration) *MutRequest
- func (r *MutRequest) MaxDuration(maxDuration time.Duration) *MutRequest
- func (r *MutRequest) Retries(retries int) *MutRequest
- func (r *MutRequest) Run(ctx context.Context) (am.Result, error)
- type S
- type Schema
- type SlogToMachLog
- type StateLoop
Constants ¶
const ( // EnvAmLogPrint prints machine log to stdout. EnvAmLogPrint = "AM_LOG_PRINT" // EnvAmHealthcheck enables a healthcheck ticker for every debugged machine. EnvAmHealthcheck = "AM_HEALTHCHECK" // EnvAmTestRunner indicates the main test tunner, disables any telemetry. EnvAmTestRunner = "AM_TEST_RUNNER" // EnvAmLogFull enables all the features of [am.SemLogger]. EnvAmLogFull = "AM_LOG_FULL" // EnvAmLogSteps logs transition steps. // See [am.SemLogger.EnableSteps]. // "1" | "" (default) EnvAmLogSteps = "AM_LOG_STEPS" // EnvAmLogGraph logs the graph structure (mutation traces, pipes, RPC, etc). // See [am.SemLogger.EnableGraph]. // "1" | "" (default) EnvAmLogGraph = "AM_LOG_GRAPH" // EnvAmLogChecks logs Can methods. See [am.SemLogger.EnableCan]. // "1" | "" (default) EnvAmLogChecks = "AM_LOG_CHECKS" // EnvAmLogQueued logs queued mutations. See [am.SemLogger.EnableQueued]. // "1" | "" (default) EnvAmLogQueued = "AM_LOG_QUEUED" // EnvAmLogArgs logs mutation args. See [am.SemLogger.EnableArgs]. // "1" | "" (default) EnvAmLogArgs = "AM_LOG_ARGS" // EnvAmLogWhen logs When methods. See [am.SemLogger.EnableWhen]. // "1" | "" (default) EnvAmLogWhen = "AM_LOG_WHEN" // EnvAmLogStateCtx logs state ctxs. See [am.SemLogger.EnableStateCtx]. // "1" | "" (default) EnvAmLogStateCtx = "AM_LOG_STATE_CTX" // EnvAmLogFile enables file logging (using machine ID as the name). // "1" | "" (default) EnvAmLogFile = "AM_LOG_FILE" )
Variables ¶
var ErrTestAutoDisable = errors.New("feature disabled when AM_TEST_RUNNER")
var SlogToMachLogOpts = &slog.HandlerOptions{ ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { if a.Key == slog.TimeKey || a.Key == slog.LevelKey { return slog.Attr{} } return a }, }
SlogToMachLogOpts complements SlogToMachLog.
Functions ¶
func Activations ¶ added in v0.8.0
Activations return the number of state activations from the number of ticks passed.
func Add1Async ¶ added in v0.15.0
func Add1Async( ctx context.Context, mach am.Api, waitState string, addState string, args am.A, ) am.Result
Add1Async adds a state from an async op and waits for another one from the op to become active. Theoretically, it should work with any state pair, including Multi states (assuming they remove themselves). Not compatible with queued negotiation at the moment.
func Add1Block ¶
Add1Block activates a state and waits until it becomes active. If it's a multi-state, it also waits for it to deactivate. Returns early if a non-multi state is already active. Useful to avoid the queue but can't handle a rejected negotiation. Deprecated: use Add1Sync instead.
func Add1Sync ¶ added in v0.15.0
Add1Sync activates a state and waits until it becomes activate, or canceled. Add1Sync is a newer version of Add1Block that supports queued rejections, but at the moment it is not compatible with RPC. This method checks expiration ctx and returns as am.Canceled.
func ArgsFromMap ¶ added in v0.17.1
ArgsFromMap takes an arguments map am.A and copies the fields into a typed argument struct value. Useful for typed args via RPC.
func ArgsToArgs ¶ added in v0.8.0
func ArgsToArgs[T any](src interface{}, dest T) T
ArgsToArgs converts a typed arguments struct into an overlapping typed arguments struct. Useful for removing fields which can't be passed over RPC, and back. Both params should be pointers to a struct and share at least one field.
func ArgsToLogMap ¶ added in v0.8.0
ArgsToLogMap converts an A (arguments) struct to a map of strings using `log` tags as keys, and their cased string values.
func AskAdd ¶ added in v0.15.0
AskAdd will first check if a mutation isn't impossible and only then try to mutate the state machine. Causes the negotiation phase to execute twice. AskAdd BLOCKS. Useful to avoid canceled transitions.
See also am.Machine.CanAdd and CantAdd.
func AskEvRemove ¶ added in v0.15.0
AskEvRemove is a traced version of AskRemove.
func AskEvRemove1 ¶ added in v0.15.0
AskEvRemove1 is a traced version of AskRemove for a single state.
func AskRemove ¶ added in v0.15.0
AskRemove will first check if a mutation isn't impossible and only then try to mutate the state machine. Causes the negotiation phase to execute twice. AskRemove BLOCKS. Useful to avoid canceled transitions.
See also am.Machine.CanRemove and CantRemove.
func AskRemove1 ¶ added in v0.15.0
AskRemove1 is a single-state version of AskRemove.
func CantRemove ¶ added in v0.15.0
CantRemove will confirm that the mutation is impossible. Blocks.
func CantRemove1 ¶ added in v0.15.0
CantRemove1 is a single-state version of CantRemove.
func CopySchema ¶ added in v0.13.0
CopySchema copies states from the source to target schema, from the passed list of states. Returns a list of copied states, and an error. CopySchema verifies states.
func CountRelations ¶ added in v0.12.0
CountRelations will count all referenced states in all relations of the given state.
func Dispose ¶ added in v0.17.0
Dispose triggers a machine disposal, using either the dedicated state set or the machine directly.
func DisposeBind ¶ added in v0.17.0
func DisposeBind(mach am.Api, handler am.HandlerDispose)
DisposeBind registers a disposal handler, using either the dedicated state set or the machine directly.
func EnableDebugging ¶ added in v0.8.0
func EnableDebugging(stdout bool)
EnableDebugging sets env vars for debugging tested machines with am-dbg on port 6831.
func EvalGetter ¶ added in v0.15.0
func EvalGetter[T any]( ctx context.Context, source string, maxTries int, mach *am.Machine, eval func() (T, error), ) (T, error)
EvalGetter is a syntax sugar for creating getters via Eval functions. Like any eval, it can end with ErrEvalTimeout. Getting values via channels passed to mutations is recommended and allows for a custom timeout.
func ExecAndClose ¶ added in v0.8.0
func ExecAndClose(fn func()) <-chan struct{}
ExecAndClose closes the chan when the function ends.
func GetTransitionStates ¶ added in v0.8.0
GetTransitionStates will extract added, removed, and touched states from transition's clock values and steps. Requires a state names index. Collecting touched states requires transition steps.
func GroupWhen1 ¶ added in v0.8.0
GroupWhen1 will create wait channels for the same state in a group of machines, or return a am.ErrStateMissing.
func Healthcheck ¶ added in v0.8.0
Healthcheck adds a state to a machine every 5 seconds, until the context is done. This makes sure all the logs are pushed to the telemetry server. TODO use machine scheduler when ready
func Implements ¶ added in v0.8.0
Implements checks is statesChecked implement statesNeeded. It's an equivalent of Machine.Has(), but for slices of state names, and with better error msgs.
func IndexesToStates ¶
IndexesToStates converts a list of state indexes to a list of state names, for a given machine.
func Interval ¶ added in v0.8.0
func Interval( ctx context.Context, length time.Duration, interval time.Duration, fn func() bool, ) error
Interval runs a function at a given interval, for a given duration, or until the context is done. Returns nil if the duration has passed, or err is ctx is done. The function should return false to stop the interval.
func IsDebug ¶ added in v0.8.0
func IsDebug() bool
IsDebug returns true if the process is in a "simple debug mode" via AM_DEBUG.
func IsTelemetry ¶ added in v0.8.0
func IsTelemetry() bool
IsTelemetry returns true if the process is in telemetry debug mode.
func IsTestRunner ¶ added in v0.8.0
func IsTestRunner() bool
func MachDebug ¶
func MachDebug( mach am.Api, amDbgAddr string, logLvl am.LogLevel, stdout bool, semConfig *am.SemConfig, ) error
MachDebug exports transition telemetry to an am-dbg instance listening at [amDbgAddr].
func MachDebugEnv ¶
MachDebugEnv sets up a machine for debugging, based on env vars only: AM_DBG_ADDR, AM_LOG, AM_LOG_*, and AM_DEBUG. This function should be called right after the machine is created (to catch all the log entries).
func NewMirror ¶ added in v0.13.0
func NewMirror( id string, flat bool, source *am.Machine, handlers any, states am.S, ) (*am.Machine, error)
NewMirror creates a submachine which mirrors the given source machine. If [flat] is true, only mutations changing the state will be propagated, along with the currently active states.
At this point, the handlers' struct needs to be defined manually with fields of type `am.HandlerFinal`.
[id] is optional.
func PrefixStates ¶ added in v0.12.0
func PrefixStates( schema am.Schema, prefix string, removeDups bool, optWhitelist, optBlacklist S, ) am.Schema
PrefixStates will prefix all state names with [prefix]. removeDups will skip overlaps eg "FooFooName" will be "Foo".
func RemoveMulti ¶ added in v0.8.0
func RemoveMulti(mach am.Api, state string) am.HandlerFinal
RemoveMulti creates a final handler which removes a multi state from a machine. Useful to avoid FooState-Remove1-Foo repetition.
func ResultToErr ¶ added in v0.10.2
func SchemaHash ¶ added in v0.13.0
SchemaHash computes an MD5 hash of the passed schema. The order of states is not important.
func SchemaImplements ¶ added in v0.17.0
SchemaImplements checks if a given schema implements a certain set of states.
func SchemaStates ¶ added in v0.17.0
SchemaStates returns state names from a schema struct in a random order.
func SemConfigEnv ¶ added in v0.17.0
SemConfigEnv returns a SemConfigEnv based on env vars, or the [forceFull] flag.
func SetEnvLogLevel ¶ added in v0.15.0
SetEnvLogLevel sets AM_LOG env var to the passed log level. It will affect all future state machines using MachDebugEnv.
func StatesToIndexes ¶
StatesToIndexes converts a list of state names to a list of state indexes, for the given machine. It returns -1 for unknown states.
func TagValue ¶ added in v0.12.0
TagValue returns the value part from a text tag "key:value". For tag without value, it returns the tag name.
func TagValueInt ¶ added in v0.17.1
TagValueInt is like TagValue, but returns a formatted int.
func Wait ¶ added in v0.8.0
Wait waits for a duration, or until the context is done. Returns true if the duration has passed, or false if ctx is done.
func WaitForAll ¶ added in v0.8.0
WaitForAll waits for a list of channels to close, or until the context is done, or until the timeout is reached. Returns nil if all channels are closed, or ErrTimeout, or ctx.Err().
It's advised to check the state ctx after this call, as it usually means expiration and not a timeout.
func WaitForAny ¶ added in v0.8.0
WaitForAny waits for any of the channels to close, or until the context is done, or until the timeout is reached. Returns nil if any channel is closed, or ErrTimeout, or ctx.Err().
It's advised to check the state ctx after this call, as it usually means expiration and not a timeout.
This function uses reflection to wait for multiple channels at once.
func WaitForErrAll ¶ added in v0.8.0
func WaitForErrAll( ctx context.Context, timeout time.Duration, mach am.Api, chans ...<-chan struct{}, ) error
WaitForErrAll is like WaitForAll, but also waits on WhenErr of a passed machine. For state machines with error handling (like retry) it's recommended to measure machine time of am.StateException instead.
func WaitForErrAny ¶ added in v0.8.0
func WaitForErrAny( ctx context.Context, timeout time.Duration, mach *am.Machine, chans ...<-chan struct{}, ) error
WaitForErrAny is like WaitForAny, but also waits on WhenErr of a passed machine. For state machines with error handling (like retry) it's recommended to measure machine time of am.StateException instead.
Types ¶
type Cond ¶ added in v0.12.0
type Cond struct {
// Only if all these states are active.
Is S
// TODO implement
// Only if any of these groups of states are active.
Any []S
// Only if any of these states is active.
Any1 S
// Only if none of these states are active.
Not S
// Only if the clock is equal or higher then.
Clock am.Clock
}
Cond is a set of state conditions, which when all met make the condition true.
func (Cond) Check ¶ added in v0.12.0
Check compares the specified conditions against the passed machine. When mach is nil, Check returns false.
type MutRequest ¶ added in v0.8.0
type MutRequest struct {
Mach am.Api
MutType am.MutationType
States am.S
Args am.A
// PolicyRetries is the max number of retries.
PolicyRetries int
// PolicyDelay is the delay before the first retry, then doubles.
PolicyDelay time.Duration
// PolicyBackoff is the max time to wait between retries.
PolicyBackoff time.Duration
// PolicyMaxDuration is the max time to wait for the mutation to be accepted.
PolicyMaxDuration time.Duration
}
MutRequest is a failsafe request for a machine mutation. It supports retries, backoff, max duration, delay, and timeout policies. It will try to mutate the machine until the context is done, or the max duration is reached. Queued mutations are considered supported a success.
func NewMutRequest ¶ added in v0.8.0
func NewMutRequest( mach am.Api, mutType am.MutationType, states am.S, args am.A, ) *MutRequest
NewMutRequest creates a new MutRequest with defaults - 10 retries, 100ms delay, 5s backoff, and 5s max duration.
func NewReqAdd ¶ added in v0.8.0
NewReqAdd creates a new failsafe request to add states to a machine. See See MutRequest for more info and NewMutRequest for the defaults.
func NewReqAdd1 ¶ added in v0.8.0
NewReqAdd1 creates a new failsafe request to add a single state to a machine. See MutRequest for more info and NewMutRequest for the defaults.
func NewReqRemove ¶ added in v0.8.0
NewReqRemove creates a new failsafe request to remove states from a machine. See MutRequest for more info and NewMutRequest for the defaults.
func NewReqRemove1 ¶ added in v0.8.0
NewReqRemove1 creates a new failsafe request to remove a single state from a machine. See MutRequest for more info and NewMutRequest for the defaults.
func (*MutRequest) Backoff ¶ added in v0.8.0
func (r *MutRequest) Backoff(backoff time.Duration) *MutRequest
func (*MutRequest) Clone ¶ added in v0.8.0
func (r *MutRequest) Clone( mach am.Api, mutType am.MutationType, states am.S, args am.A, ) *MutRequest
func (*MutRequest) Delay ¶ added in v0.8.0
func (r *MutRequest) Delay(delay time.Duration) *MutRequest
func (*MutRequest) MaxDuration ¶ added in v0.8.0
func (r *MutRequest) MaxDuration(maxDuration time.Duration) *MutRequest
func (*MutRequest) Retries ¶ added in v0.8.0
func (r *MutRequest) Retries(retries int) *MutRequest
type SlogToMachLog ¶ added in v0.12.0
SlogToMachLog allows to use the machine logger as a slog sink.
a.loggerMach = slog.New(slog.NewTextHandler(
amhelp.SlogToMachLog{Mach: mach}, amhelp.SlogToMachLogOpts))
type StateLoop ¶ added in v0.12.0
type StateLoop struct {
ResetInterval time.Duration
Threshold int
// contains filtered or unexported fields
}
TODO thread safety via atomics
func NewStateLoop ¶ added in v0.12.0
NewStateLoop helper creates a state loop guard bound to a specific state (eg Heartbeat), preventing infinite loops. It monitors context, off states, ticks of related "context states", and an optional check function. We can adjust Threshold and ResetInterval. Not thread safe ATM.
func (*StateLoop) Ended ¶ added in v0.12.0
Ended returns the ended flag, but does not any context. Useful for negotiation handles which don't have state context yet.