example

package
v0.18.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2026 License: MIT Imports: 14 Imported by: 0

README

/examples/WASM

cd /

[!NOTE] asyncmachine-go is a pathless control-flow graph with a consensus (AOP, actor model, state-machine).

WASM Workflow

summary diagram

An example workflow with 4 browser threads and 1 external Orchestrator, performing DAG-shaped hashing computations. To increase unpredictability, the workflow produces random errors - corrupted hashes and plain failures. The Orchestrator then retries a specific node or re-starts the whole workflow.

Machines:

  • orchestrator (server)
  • browser1 (browser UI thread)
  • browser2 (browser WebWorker, Dispatcher)
  • browser3 (browser WebWorker)
  • browser4 (browser WebWorker)

Implementation

Single direction aRPC connections starting from the Orchestrator, which mutates workers. The Dispacher has prefixed states piped from leaf workers. WebWorkers talk to each other over MessageChannel, ports, and transfered ArrayBuffer (copy-less). The data comes back to the Orchestrator via SendPayload()-ServerPayloadState() flow. Technically, the data retuned by each node always passes through the Orchestrator to simplify the example. In real world workflows, the connection would be full-duplex and data flow without detours.

All the browser threads run the same Golang binary.

   - .
   - ├── cmd
   - │   ├── browser
9.0k │   │   ├── browser.go
5.2k │   │   ├── browser_machs.go
2.2k │   │   └── browser_test.go
   - │   └── orchestrator
 12k │       └── orchestrator.go
 780 ├── config.env
1.0k ├── conn_nowasm.go
2.4k ├── conn_wasm.go
3.7k ├── example_workflow.go
4.1k ├── README.md
   - ├── states
9.5k │   ├── ss_workflow.go
 851 │   └── states_utils.go
1.5k ├── Taskfile.yml
   - ├── tmp
  15 │   ├── orchestrator.addr
  15 │   ├── repl-browser1.addr
  15 │   ├── repl-browser3.addr
  15 │   └── repl-browser4.addr
   - └── web
 914     ├── index.html
 32M     ├── main.wasm
 17k     ├── wasm_exec.js
 291     └── worker.js

Traces

OpenTelemetry traces are generated automatically for each state-machine, with nonimportant ones being filtered out. You can browse the results in Jaeger.

Traces

Debugger

Each node connects directly to the debugger and tunnels a TCP port for incoming REPL connections.

Debugger

REPL

Interactively use the TUI debugger with data pre-generated by this example:

go run github.com/pancsta/asyncmachine-go/tools/cmd/am-dbg@latest \
  --import-data https://assets.asyncmachine.dev/am-dbg-exports/wasm-workflow.gob.br \
  mach://orchestrator

Configuration

# CONFIG

RELAY_ADDR=localhost:14050
REPL_DIR=tmp

ORCHESTRATOR_TCP_ADDR=localhost:14000
ORCHESTRATOR_REPL_ADDR=localhost:14001

BROWSER_1_TCP_ADDR=localhost:14010
BROWSER_1_REPL_ADDR=localhost:14011

BROWSER_2_TCP_ADDR=localhost:14020
BROWSER_2_REPL_ADDR=localhost:14021

BROWSER_3_TCP_ADDR=localhost:14030
BROWSER_3_REPL_ADDR=localhost:14031

BROWSER_4_TCP_ADDR=localhost:14040
BROWSER_4_REPL_ADDR=localhost:14041

HASH_ITERATIONS=10000
EXIT_ON_COMPLETED=1
SUCCESS_RATE=5
#SUCCESS_RATE=500

# DEBUG

AM_DEBUG=1
AM_DBG_ADDR=1
AM_LOG=3
AM_LOG_FULL=1
AM_HEALTHCHECK=1
AM_SERVICE=orchestrator
AM_OTEL_TRACE=1
AM_OTEL_TRACE_TXS=1
AM_OTEL_TRACE_SKIP_STATES_RE=(^rm-|relay-|rc-)|(RegisterDisposal$)
#
AM_RPC_LOG_SERVER=1
AM_RPC_LOG_CLIENT=1
AM_RPC_LOG_MUX=1
AM_RPC_DBG=1
#AM_RELAY_DBG=1

Machine Diagram

This diagram was generated with am-vis and contains minor errors (eg missing piping for browser2). Refer to the debugger as the source of truth until fixed.

am-vis --bird --render-detailed-pipes --render-pipes render-dump tmp/am-dbg-dump.gob.br

Machine Diagram

$ task --list-all
task: Available tasks for this project:
* build:
* build-browser:
* build-browser-prod:
* build-orchestrator:
* deps:                     List dependencies in deps.md
* deps-graph:               Render dependencies graph in deps-graph.svg
* pprof:
* repl:                     Start REPL
* start:                    Build and start server

monorepo

Go back to the monorepo root to continue reading.

import _ "github.com/stealthrocket/net/wasip1"


Documentation

Index

Constants

View Source
const APrefix = "wasm"

Variables

View Source
var (
	EnvReplDir              string
	EnvRelayHttpAddr        string
	EnvOrchestratorTcpAddr  string
	EnvOrchestratorReplAddr string
	EnvBrowser1TcpAddr      string
	EnvBrowser1ReplAddr     string
	EnvBrowser2TcpAddr      string
	EnvBrowser2ReplAddr     string
	EnvBrowser3ReplAddr     string
	EnvBrowser4ReplAddr     string
	EnvHashIterations       int
	EnvExitOnCompleted      bool
	EnvSuccessRate          int
)

Functions

func ComputeHash

func ComputeHash(payloads ...[]byte) [64]byte

func LogArgs

func LogArgs(args am.A) map[string]string

LogArgs is an args logger for A.

func ParseRpc

func ParseRpc(args am.A) am.A

ParseRpc parses am.A to *ARpc wrapped in am.A. Useful for REPLs.

func Pass

func Pass(args *A) am.A

Pass prepares am.A from A to pass to further mutations.

func PassRpc

func PassRpc(args *A) am.A

PassRpc prepares am.A from A to pass over RPC.

func SimulateLoad

func SimulateLoad(iterations int)

SimulateLoad hashes a payload thousands of times to burn CPU cycles.

Types

type A

type A struct {
	Id      string `log:"id"`
	Boot    *Boot
	Payload []byte
	Failed  bool `log:"failed"`
	Retry   bool `log:"retry"`

	ReturnCh chan<- []string
}

A is a struct for node arguments. It's a typesafe alternative to am.A.

func ParseArgs

func ParseArgs(args am.A) *A

ParseArgs extracts A or ARpc from am.Event.ArgsAPrefix.

type ARpc

type ARpc struct {
	Id      string `log:"id"`
	Boot    *Boot
	Payload []byte
	Failed  bool `log:"failed"`
	Retry   bool `log:"retry"`
}

ARpc is a subset of A that can be passed over RPC.

type Boot

type Boot struct {
	TraceId string
	SpanId  string
}

type PortConn

type PortConn struct {
	// contains filtered or unexported fields
}

PortConn adapts a JS MessagePort to net.Conn

func (*PortConn) Close

func (c *PortConn) Close() error

func (*PortConn) LocalAddr

func (c *PortConn) LocalAddr() net.Addr

Boilerplate stubs...

func (*PortConn) Read

func (c *PortConn) Read(b []byte) (int, error)

func (*PortConn) RemoteAddr

func (c *PortConn) RemoteAddr() net.Addr

func (*PortConn) SetDeadline

func (c *PortConn) SetDeadline(t time.Time) error

func (*PortConn) SetReadDeadline

func (c *PortConn) SetReadDeadline(t time.Time) error

func (*PortConn) SetWriteDeadline

func (c *PortConn) SetWriteDeadline(t time.Time) error

func (*PortConn) Write

func (c *PortConn) Write(b []byte) (int, error)

Directories

Path Synopsis
cmd
browser command
orchestrator command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL