asyncmachine-go is a minimal implementation of AsyncMachine
in Golang using channels and context. It aims at simplicity and speed.
It can be used as a lightweight in-memory Temporal
alternative, worker for Asynq, or to create simple consensus engines, stateful
firewalls, telemetry, bots, etc.
asyncmachine-go is a general purpose state machine for managing complex asynchronous workflows in a safe and
structured way
Comparison
Common differences from other state machines:
many states can be active at the same time
transitions between all the states are allowed
states are connected by relations
every transition can be rejected
error is a state
Buzzwords
AM technically is: event emitter, queue, dependency graph, AOP, logical clocks, ~2.5k LoC, no deps
Flow constraints are: state mutations, negotiation, relations, "when" methods, state contexts, external contexts
Usage
Basics
// ProcessingFile -> FileProcessed (1 async and 1 sync state)
package main
import am "github.com/pancsta/asyncmachine-go/pkg/machine"
func main() {
// init the machine
mach := am.New(nil, am.Struct{
"ProcessingFile": {
Add: am.S{"InProgress"},
Remove: am.S{"FileProcessed"},
},
"FileProcessed": {
Remove: am.S{"ProcessingFile", "InProgress"},
},
"InProgress": {},
}, nil)
mach.BindHandlers(&Handlers{
Filename: "README.md",
})
// change the state
mach.Add1("ProcessingFile", nil)
// wait for completed
select {
case <-time.After(5 * time.Second):
println("timeout")
case <-mach.WhenErr(nil):
println("err:", mach.Err)
case <-mach.When1("FileProcessed", nil):
println("done")
}
}
type Handlers struct {
Filename string
}
// negotiation handler
func (h *Handlers) ProcessingFileEnter(e *am.Event) bool {
// read-only ops
// decide if moving fwd is ok
// no blocking
// lock-free critical zone
return true
}
// final handler
func (h *Handlers) ProcessingFileState(e *am.Event) {
// read & write ops
// no blocking
// lock-free critical zone
mach := e.Machine
// tick-based context
stateCtx := mach.NewStateCtx("ProcessingFile")
go func() {
// block in the background, locks needed
if stateCtx.Err() != nil {
return // expired
}
// blocking call
err := processFile(h.Filename, stateCtx)
if err != nil {
mach.AddErr(err)
return
}
// re-check the tick ctx after a blocking call
if stateCtx.Err() != nil {
return // expired
}
// move to the next state in the flow
mach.Add1("FileProcessed", nil)
}()
}
Waiting
// wait until FileDownloaded becomes active
<-mach.When1("FileDownloaded", nil)
// wait until FileDownloaded becomes inactive
<-mach.WhenNot1("DownloadingFile", args, nil)
// wait for EventConnected to be activated with an arg ID=123
<-mach.WhenArgs("EventConnected", am.A{"ID": 123}, nil)
// wait for Foo to have a tick >= 6 and Bar tick >= 10
<-mach.WhenTime(am.S{"Foo", "Bar"}, am.T{6, 10}, nil)
// wait for DownloadingFile to have a tick increased by 2 since now
<-mach.WhenTick("DownloadingFile", 2, nil)
// wait for an error
<-mach.WhenErr()
am-gen will quickly bootstrap a typesafe states file for you.
$ am-gen states-file Foo,Bar
See the result for Foo and Bar
package states
import am "github.com/pancsta/asyncmachine-go/pkg/machine"
// S is a type alias for a list of state names.
type S = am.S
// States map defines relations and properties of states.
var States = am.Struct{
Foo: {},
Bar: {},
}
// Groups of mutually exclusive states.
//var (
// GroupPlaying = S{Playing, Paused}
//)
//#region boilerplate defs
// Names of all the states (pkg enum).
const (
Foo = "Foo"
Bar = "Bar"
)
// Names is an ordered list of all the state names.
var Names = S{
Foo,
Bar,
am.Exception,
}
//#endregion
am-dbg is a lightweight, multi-client debugger for AM. It easily handles >100
client machines simultaneously streaming telemetry data (and potentially many more). Some features include:
pkg/telemetry/prometheus binds to machine's transactions and averages the
values withing an interval window and exposes various metrics. Combined with Grafana, it can be
used to monitor the metrics of you machines.
Several case studies are available to show how to implement various types of machines, measure performance and produce
a lot of inspectable data.
libp2p-pubsub benchmark
pubsub host - eg ps-17 (20 states)
PubSub machine is a simple event loop with Multi states which get responses via arg channels. Heavy use of Machine.Eval().
discovery - eg ps-17-disc (10 states)
Discovery machine is a simple event loop with Multi states and a periodic refresh state.
discovery bootstrap - eg ps-17-disc-bf3 (5 states) BootstrapFlow is a non-linear flow for topic bootstrapping with some retry logic.
simulatorsim (14 states)
Root simulator machine, initializes the network and manipulates it during heartbeats according to frequency
definitions. Heavily dependent on state negotiation.
simulator's peer - eg sim-p17 (17 states)
Handles peer's connections, topics and messages. This machine has a decent amount of relations. Each sim peer has its
own pubsub host.
topics - eg sim-t-se7ev (5 states)
State-only machine (no handlers, no goroutine). States represent correlations with peer machines.
am-dbg is a cview TUI app with a single machine consisting of:
input events (7 states)
external state (11 states)
actions (14 states)
This machine features a decent amount of relations within a large number of states and 4 state groups. It's also a good
example to see how easily an AM-based program can be controller with a script in tools/cmd/am-dbg-demo.