rpc

package
v0.16.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2025 License: MIT Imports: 26 Imported by: 1

README

/pkg/rpc

cd /

[!NOTE] asyncmachine-go is a batteries-included graph control flow library (AOP, actor model, state-machine).

aRPC is a transparent RPC for state machines implemented using asyncmachine-go. It's clock-based and features many optimizations, e.g. having most of the API methods executed locally (as state changes are regularly pushed to the client). It's built on top of cenkalti/rpc2, net/rpc, and soheilhy/cmux. Check out a dedicated example, gRPC benchmark, and integration tests tutorial.

Features

  • mutation methods
  • wait methods
  • clock pushes (from source mutations)
  • remote contexts
  • multiplexing
  • reconnect / fail-safety
  • worker sending payloads to the client
  • REPL
  • queue ticks support
  • initial optimizations

Not implemented (yet):

  • WhenArgs, Err()
  • PushAllTicks
  • chunked payloads
  • TLS
  • compression
  • msgpack encoding

Each RPC server can handle 1 RPC client at a time, but 1 state source (asyncmachine) can have many RPC servers attached to itself (via Tracer API). Additionally, remote RPC workers can also have RPC servers attached to themselves, creating a tree structure (see /examples/benchmark_state_source).

Components

Worker

Any state machine can be exposed as an RPC worker, as long as it implements /pkg/rpc/states/WorkerStructDef. This can be done either manually, or by using state helpers (SchemaMerge, SAdd), or by generating a states file with am-gen. It's also required to have the states verified by Machine.VerifyStates. Worker can send data to the client via the SendPayload state.

import (
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

// inherit from RPC worker
ssStruct := am.SchemaMerge(ssrpc.WorkerStruct, am.Schema{
    "Foo": {Require: am.S{"Bar"}},
    "Bar": {},
})
ssNames := am.SAdd(ssrpc.WorkerStates.Names(), am.S{"Foo", "Bar"})

// init
worker := am.New(ctx, ssStruct, nil)
worker.VerifyStates(ssNames)

// ...

// send data to the client
worker.Add1(ssrpc.WorkerStates.SendPayload, arpc.Pass(&arpc.A{
    Name: "mypayload",
    Payload: &arpc.ArgsPayload{
        Name: "mypayload",
        Source: "worker1",
        Data: []byte{1,2,3},
    },
}))
Worker Schema

State schema from /pkg/rpc/states/ss_rpc_worker.go.

type WorkerStatesDef struct {
    ErrProviding string
    ErrSendPayload string
    SendPayload string
}
Server

Each RPC server can handle 1 client at a time. Both client and server need the same worker states definition (structure map and ordered list of states). After the initial handshake, server will be pushing local state changes every PushInterval, while state changes made by an RPC client are delivered synchronously. Server starts listening on either Addr, Listener, or Conn. Basic ACL is possible via AllowId.

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

// var addr string
// var worker *am.Machine

// init
s, err := arpc.NewServer(ctx, addr, worker.ID, worker, nil)
if err != nil {
    panic(err)
}

// start
s.Start()
err = amhelp.WaitForAll(ctx, 2*time.Second,
    s.Mach.When1(ssrpc.ServerStates.RpcReady, ctx))
if ctx.Err() != nil {
    return
}
if err != nil {
    return err
}

// react to the client
<-worker.When1("Foo", nil)
print("Client added Foo")
worker.Add1("Bar", nil)
Server Schema

State schema from /pkg/rpc/states/ss_rpc_server.go.

Client

Each RPC client can connect to 1 server and needs to know worker's machine schema and order. Data send by a worker via SendPayload will be received by a Consumer machine (passed via ClientOpts.Consumer) as an Add mutation of the WorkerPayload state (see a detailed diagram). Client supports fail-safety for both connection (eg ConnRetries, ConnRetryBackoff) and calls (eg CallRetries, CallRetryBackoff).

After the client's Ready state becomes active, it exposes a remote worker at client.Worker. Remote worker implements most of Machine's methods, many of which are evaluated locally (like Is, When, NewStateCtx). See machine.Api for a full list.

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

var addr string
// worker state structure
var ssStruct am.Schema
// worker state names
var ssNames am.S

// consumer
consumer := am.New(ctx, ssrpc.ConsumerStruct, nil)

// init
c, err := arpc.NewClient(ctx, addr, "clientid", ssStruct, ssNames, &arpc.ClientOpts{
    Consumer: consumer,
})
if err != nil {
    panic(err)
}

// start
c.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
    c.Mach.When1(ssrpc.ClientStates.Ready, ctx))
if ctx.Err() != nil {
    return
}
if err != nil {
    return err
}

// use the remote worker
c.Worker.Add1("Foo", nil)
<-c.Worker.When1("Bar", nil)
print("Server added Bar")
Client Schema

State schema from /pkg/rpc/states/ss_rpc_client.go.

Multiplexer

Because 1 server can serve only 1 client (for simplicity), it's often required to use a port multiplexer. It's very simple to create one using NewMux and a callback function, which returns a new server instance.

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

// new server per each new client (optional)
var newServer arpc.MuxNewServer = func(num int64, _ net.Conn) (*Server, error) {
    name := fmt.Sprintf("%s-%d", t.Name(), num)
    s, err := NewServer(ctx, "", name, w, nil)
    if err != nil {
        t.Fatal(err)
    }

    return s, nil
}

// start cmux
mux, err := arpc.NewMux(ctx, t.Name(), newServer, nil)
if err != nil {
    t.Fatal(err)
}
mux.Listener = listener // or mux.Addr := ":1234"
mux.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
    mux.Mach.When1(ssrpc.MuxStates.Ready, ctx))
if ctx.Err() != nil {
    return
}
if err != nil {
    return err
}
Multiplexer Schema

State schema from /pkg/rpc/states/ss_mux.go.

Documentation

Benchmark: aRPC vs gRPC

A simple and opinionated benchmark showing a subscribe-get-process scenario, implemented in both gRPC and aRPC. See /examples/benchmark_grpc for details and source code.

> task benchmark-grpc
...
BenchmarkClientArpc
    client_arpc_test.go:136: Transferred: 609 bytes
    client_arpc_test.go:137: Calls: 4
    client_arpc_test.go:138: Errors: 0
    client_arpc_test.go:136: Transferred: 1,149,424 bytes
    client_arpc_test.go:137: Calls: 10,003
    client_arpc_test.go:138: Errors: 0
BenchmarkClientArpc-8              10000            248913 ns/op           28405 B/op        766 allocs/op
BenchmarkClientGrpc
    client_grpc_test.go:117: Transferred: 1,113 bytes
    client_grpc_test.go:118: Calls: 9
    client_grpc_test.go:119: Errors: 0
    client_grpc_test.go:117: Transferred: 3,400,812 bytes
    client_grpc_test.go:118: Calls: 30,006
    client_grpc_test.go:119: Errors: 0
BenchmarkClientGrpc-8              10000            262693 ns/op           19593 B/op        391 allocs/op
BenchmarkClientLocal
BenchmarkClientLocal-8             10000               434.4 ns/op            16 B/op          1 allocs/op
PASS
ok      github.com/pancsta/asyncmachine-go/examples/benchmark_grpc      5.187s

API

aRPC implements /pkg/machine#Api, which is a large subset of /pkg/machine#Machine methods. Below the full list, with distinction which methods happen where (locally or on remote).

// TODO update
// A (arguments) is a map of named arguments for a Mutation.
type A map[string]any
// S (state names) is a string list of state names.
type S []string
type Time []uint64
type Clock map[string]uint64
type Result int
type Schema = map[string]State

// Api is a subset of Machine for alternative implementations.
type Api interface {
    // ///// REMOTE

    // Mutations (remote)

    Add1(state string, args A) Result
    Add(states S, args A) Result
    Remove1(state string, args A) Result
    Remove(states S, args A) Result
    AddErr(err error, args A) Result
    AddErrState(state string, err error, args A) Result
    Toggle(states S, args A) Result
    Toggle1(state string, args A) Result
    Set(states S, args A) Result

    // Traced mutations (remote)

    EvAdd1(event *Event, state string, args A) Result
    EvAdd(event *Event, states S, args A) Result
    EvRemove1(event *Event, state string, args A) Result
    EvRemove(event *Event, states S, args A) Result
    EvAddErr(event *Event, err error, args A) Result
    EvAddErrState(event *Event, state string, err error, args A) Result
    EvToggle(event *Event, states S, args A) Result
    EvToggle1(event *Event, state string, args A) Result

    // Waiting (remote)

    WhenArgs(state string, args A, ctx context.Context) <-chan struct{}

    // Getters (remote)

    Err() error

    // ///// LOCAL

    // Checking (local)

    IsErr() bool
    Is(states S) bool
    Is1(state string) bool
    Any(states ...S) bool
    Any1(state ...string) bool
    Not(states S) bool
    Not1(state string) bool
    IsTime(time Time, states S) bool
    WasTime(time Time, states S) bool
    IsClock(clock Clock) bool
    WasClock(clock Clock) bool
    Has(states S) bool
    Has1(state string) bool
    CanAdd(states S, args A) Result
    CanAdd1(state string, args A) Result
    CanRemove(states S, args A) Result
    CanRemove1(state string, args A) Result
    CountActive(states S) int

    // Waiting (local)

    When(states S, ctx context.Context) <-chan struct{}
    When1(state string, ctx context.Context) <-chan struct{}
    WhenNot(states S, ctx context.Context) <-chan struct{}
    WhenNot1(state string, ctx context.Context) <-chan struct{}
    WhenTime(states S, times Time, ctx context.Context) <-chan struct{}
    WhenTime1(state string, tick uint64, ctx context.Context) <-chan struct{}
    WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
    WhenQuery(query func(clock Clock) bool, ctx context.Context) <-chan struct{}
    WhenErr(ctx context.Context) <-chan struct{}
    WhenQueue(tick Result) <-chan struct{}

    // Getters (local)

    StateNames() S
    StateNamesMatch(re *regexp.Regexp) S
    ActiveStates() S
    Tick(state string) uint64
    Clock(states S) Clock
    Time(states S) Time
    TimeSum(states S) uint64
    QueueTick() uint64
    NewStateCtx(state string) context.Context
    Export() *Serialized
    Schema() Schema
    Switch(groups ...S) string
    Groups() (map[string][]int, []string)
    Index(states S) []int
    Index1(state string) int

    // Misc (local)

    Id() string
    ParentId() string
    Tags() []string
    Ctx() context.Context
    String() string
    StringAll() string
    Log(msg string, args ...any)
    SemLogger() SemLogger
    Inspect(states S) string
    BindHandlers(handlers any) error
    DetachHandlers(handlers any) error
    HasHandlers() bool
    StatesVerified() bool
    Tracers() []Tracer
    DetachTracer(tracer Tracer) error
    BindTracer(tracer Tracer) error
    AddBreakpoint1(added string, removed string, strict bool)
    AddBreakpoint(added S, removed S, strict bool)
    Dispose()
    WhenDisposed() <-chan struct{}
    IsDisposed() bool
}

Tests

aRPC passes the whole test suite of /pkg/machine for the exposed methods and provides a couple of optimization-focused tests (on top of tests for basic RPC).

Optimizations

aRPC implements several optimization strategies to achieve the results.

  • net/rpc method names as runes
  • binary format of encoding/gob
  • index-based clock
    • [0, 100, 0, 120]
  • diff-based clock updates
    • [0, 1, 0, 1]
  • debounced server-mutation clock pushes
    • [0, 5, 2, 1]
  • partial clock updates
    • [[1, 1], [3, 1]]

Status

Testing, not semantically versioned.

monorepo

Go back to the monorepo root to continue reading.

Documentation

Overview

Package rpc is a transparent RPC for state machines.

Index

Constants

View Source
const (
	// EnvAmRpcLogServer enables machine logging for RPC server.
	EnvAmRpcLogServer = "AM_RPC_LOG_SERVER"
	// EnvAmRpcLogClient enables machine logging for RPC client.
	EnvAmRpcLogClient = "AM_RPC_LOG_CLIENT"
	// EnvAmRpcLogMux enables machine logging for RPC multiplexers.
	EnvAmRpcLogMux = "AM_RPC_LOG_MUX"
	// EnvAmRpcDbg enables env-based debugging for RPC components.
	EnvAmRpcDbg = "AM_RPC_DBG"
	// EnvAmReplAddr is a REPL address to listen on. "1" expands to 127.0.0.1:0.
	EnvAmReplAddr = "AM_REPL_ADDR"
	// EnvAmReplDir is a dir path to save the address file as
	// $AM_REPL_DIR/mach-id.addr. Optional.
	EnvAmReplDir = "AM_REPL_DIR"
)
View Source
const APrefix = "am_rpc"

Variables

View Source
var (
	ServerAdd       = ServerMethod{"Add"}
	ServerAddNS     = ServerMethod{"AddNS"}
	ServerRemove    = ServerMethod{"Remove"}
	ServerSet       = ServerMethod{"Set"}
	ServerHello     = ServerMethod{"Hello"}
	ServerHandshake = ServerMethod{"Handshake"}
	ServerLog       = ServerMethod{"Log"}
	ServerSync      = ServerMethod{"Sync"}
	ServerBye       = ServerMethod{"Close"}

	ServerMethods = enum.New(ServerAdd, ServerAddNS, ServerRemove, ServerSet,
		ServerHello, ServerHandshake, ServerLog, ServerSync, ServerBye)

	ClientSetClock     = ClientMethod{"ClientSetClock"}
	ClientPushAllTicks = ClientMethod{"ClientPushAllTicks"}
	ClientSendPayload  = ClientMethod{"ClientSendPayload"}
	ClientBye          = ClientMethod{"ClientBye"}
	ClientSchemaChange = ClientMethod{"SchemaChange"}

	ClientMethods = enum.New(ClientSetClock, ClientPushAllTicks,
		ClientSendPayload, ClientBye, ClientSchemaChange)
)
View Source
var (
	ErrInvalidParams = errors.New("invalid params")
	ErrInvalidResp   = errors.New("invalid response")
	ErrRpc           = errors.New("rpc")
	ErrNoAccess      = errors.New("no access")
	ErrNoConn        = errors.New("not connected")
	ErrDestination   = errors.New("wrong destination")

	ErrNetwork        = errors.New("network error")
	ErrNetworkTimeout = errors.New("network timeout")
)

Functions

func AddErr added in v0.8.0

func AddErr(e *am.Event, mach *am.Machine, msg string, err error)

AddErr detects sentinels from error msgs and calls the proper error setter. TODO also return error for compat

func AddErrNetwork added in v0.8.0

func AddErrNetwork(e *am.Event, mach *am.Machine, err error)

func AddErrNoConn added in v0.8.0

func AddErrNoConn(e *am.Event, mach *am.Machine, err error)

func AddErrParams added in v0.8.0

func AddErrParams(e *am.Event, mach *am.Machine, err error)

func AddErrResp added in v0.8.0

func AddErrResp(e *am.Event, mach *am.Machine, err error)

func AddErrRpcStr added in v0.8.0

func AddErrRpcStr(e *am.Event, mach *am.Machine, msg string)

func BindServer added in v0.8.0

func BindServer(source, target *am.Machine, rpcReady, clientConn string) error

BindServer binds RpcReady and ClientConnected with Add/Remove, to custom states.

func BindServerMulti added in v0.8.0

func BindServerMulti(
	source, target *am.Machine, rpcReady, clientConn, clientDisconn string,
) error

BindServerMulti binds RpcReady, ClientConnected, and ClientDisconnected. RpcReady is Add/Remove, other two are Add-only to passed multi states.

func BindServerRpcReady added in v0.8.0

func BindServerRpcReady(source, target *am.Machine, rpcReady string) error

BindServerRpcReady bind RpcReady using Add to a custom multi state.

func Checksum added in v0.15.1

func Checksum(mTime uint64, qTick uint64) uint8

func ClockFromMsg

func ClockFromMsg(
	timeBefore am.Time, qTickBefore uint64, msg *ClockMsg,
) (am.Time, uint64)

func GetClientId added in v0.8.0

func GetClientId(name string) string

GetClientId returns an RPC Client machine ID from a name. This ID will be used to handshake the server.

func LogArgs added in v0.8.0

func LogArgs(args am.A) map[string]string

LogArgs is an args logger for A.

func MachRepl added in v0.10.2

func MachRepl(
	mach am.Api, addr, addrDir string, addrCh chan<- string, errCh chan<- error,
)

MachRepl sets up a machine for a REPL connection, which allows for mutations, like any other RPC connection. See [/tools/cmd/arpc] for usage. This function is considered a debugging helper and can panic.

addr: address to listen on, default to 127.0.0.1:0 addrDir: optional dir path to save the address file as addrDir/mach-id.addr. addrCh: optional channel to send the address to, once ready errCh: optional channel to send err to, once ready

func MachReplEnv added in v0.10.2

func MachReplEnv(mach am.Api) <-chan error

MachReplEnv sets up a machine for a REPL connection in case AM_REPL_ADDR env var is set. See MachRepl.

func Pass added in v0.8.0

func Pass(args *A) am.A

Pass prepares am.A from A to pass to further mutations.

func PassRpc added in v0.9.0

func PassRpc(args *A) am.A

PassRpc prepares am.A from A to pass over RPC.

func TrafficMeter

func TrafficMeter(
	listener net.Listener, fwdTo string, counter chan<- int64,
	end <-chan struct{},
)

Types

type A added in v0.8.0

type A struct {
	Id        string `log:"id"`
	Name      string `log:"name"`
	MachTime  am.Time
	QueueTick uint64
	Payload   *ArgsPayload
	Addr      string `log:"addr"`
	Err       error
	Method    string `log:"addr"`
	StartedAt time.Time
	Dispose   bool

	Client *rpc2.Client
}

A represents typed arguments of the RPC package. It's a typesafe alternative to am.A.

func ParseArgs added in v0.8.0

func ParseArgs(args am.A) *A

ParseArgs extracts A from am.Event.ArgsAPrefix.

type ARpc added in v0.9.0

type ARpc struct {
	Id        string `log:"id"`
	Name      string `log:"name"`
	MachTime  am.Time
	Payload   *ArgsPayload
	Addr      string `log:"addr"`
	Err       error
	Method    string `log:"addr"`
	StartedAt time.Time
	Dispose   bool
}

ARpc is a subset of A, that can be passed over RPC.

type ArgsGet

type ArgsGet struct {
	Name string
}

type ArgsHello added in v0.10.2

type ArgsHello struct {
	ReqSchema bool
}

type ArgsLog

type ArgsLog struct {
	Msg  string
	Args []any
}

type ArgsMut

type ArgsMut struct {
	States []int
	Args   am.A
	Event  *am.Event
}

ArgsMut is args for mutation methods.

type ArgsPayload

type ArgsPayload struct {
	Name string
	// Source is the machine ID that sent the payload.
	Source string
	// SourceTx is transition ID.
	SourceTx string
	// Destination is an optional machine ID that is supposed to receive the
	// payload. Useful when using rpc.Mux.
	Destination string
	// Data is the payload data. The Consumer has to know the type.
	Data any

	// Token is a unique random ID for the payload. Autofilled by the server.
	Token string
}

type Client

type Client struct {
	*ExceptionHandler

	Mach *am.Machine
	Name string

	// Addr is the address the Client will connect to.
	Addr string
	// Request the state schema from the server.
	RequestSchema bool
	// Worker is a remote am.Machine instance
	Worker *Worker
	// Consumer is the optional consumer for deliveries.
	Consumer   *am.Machine
	CallCount  uint64
	LogEnabled bool
	// DisconnCooldown is the time to wait after notifying the server about
	// disconnecting before actually disconnecting. Default 10ms.
	DisconnCooldown time.Duration
	// LastMsgAt is the last received msg from the worker TODO
	LastMsgAt time.Time
	// HelloDelay between Connected and Handshaking. Default 0, useful for
	// rpc/Mux.
	HelloDelay time.Duration
	// ReconnectOn decides if the client will try to [RetryingConn] after a
	// clean [Disconnect].
	ReconnectOn bool

	// ConnTimeout is the maximum time to wait for a connection to be established.
	// Default 3s.
	ConnTimeout time.Duration
	// ConnRetries is the number of retries for a connection. Default 15.
	ConnRetries int
	// ConnRetryTimeout is the maximum time to retry a connection. Default 1m.
	ConnRetryTimeout time.Duration
	// ConnRetryDelay is the time to wait between retries. Default 100ms. If
	// ConnRetryBackoff is set, this is the initial delay, and doubles on each
	// retry.
	ConnRetryDelay time.Duration
	// ConnRetryBackoff is the maximum time to wait between retries. Default 3s.
	ConnRetryBackoff time.Duration

	// CallTimeout is the maximum time to wait for a call to complete. Default 3s.
	CallTimeout time.Duration
	// CallRetries is the number of retries for a call. Default 15.
	CallRetries int
	// CallRetryTimeout is the maximum time to retry a call. Default 1m.
	CallRetryTimeout time.Duration
	// CallRetryDelay is the time to wait between retries. Default 100ms. If
	// CallRetryBackoff is set, this is the initial delay, and doubles on each
	// retry.
	CallRetryDelay time.Duration
	// CallRetryBackoff is the maximum time to wait between retries. Default 3s.
	CallRetryBackoff time.Duration

	DisconnTimeout time.Duration
	// contains filtered or unexported fields
}

Client is a type representing an RPC client that interacts with a remote am.Machine instance.

func NewClient

func NewClient(
	ctx context.Context, workerAddr string, name string, stateStruct am.Schema,
	stateNames am.S, opts *ClientOpts,
) (*Client, error)

NewClient creates a new RPC client and exposes a remote state machine as a remote worker, with a subst of the API under Client.Worker. Optionally takes a consumer, which is a state machine with a WorkerPayload state. See states.ConsumerStates.

func (*Client) CallRetryFailedState added in v0.8.0

func (c *Client) CallRetryFailedState(e *am.Event)

func (*Client) ConnectedState

func (c *Client) ConnectedState(e *am.Event)

func (*Client) ConnectingState

func (c *Client) ConnectingState(e *am.Event)

func (*Client) DisconnectedEnter

func (c *Client) DisconnectedEnter(e *am.Event) bool

func (*Client) DisconnectedState

func (c *Client) DisconnectedState(e *am.Event)

func (*Client) DisconnectingEnter

func (c *Client) DisconnectingEnter(e *am.Event) bool

func (*Client) DisconnectingState

func (c *Client) DisconnectingState(e *am.Event)

func (*Client) ExceptionState added in v0.8.0

func (c *Client) ExceptionState(e *am.Event)

ExceptionState handles network errors and retries the connection.

func (*Client) GetKind

func (c *Client) GetKind() Kind

GetKind returns a kind of the RPC component (server / client).

func (*Client) HandshakeDoneEnter added in v0.8.0

func (c *Client) HandshakeDoneEnter(e *am.Event) bool

func (*Client) HandshakeDoneState

func (c *Client) HandshakeDoneState(e *am.Event)

func (*Client) HandshakingState

func (c *Client) HandshakingState(e *am.Event)

func (*Client) HealthcheckState added in v0.9.0

func (c *Client) HealthcheckState(e *am.Event)

func (*Client) RemoteBye added in v0.10.1

func (c *Client) RemoteBye(
	_ *rpc2.Client, _ *Empty, _ *Empty,
) error

RemoteBye is called by the server on a planned disconnect. TODO take a reason / source event?

func (*Client) RemotePushAllTicks added in v0.8.0

func (c *Client) RemotePushAllTicks(
	_ *rpc2.Client, clocks []PushAllTicks, _ *Empty,
) error

RemotePushAllTicks log all the machine clock's ticks, so all final handlers can be executed in order. Only called by the server. TODO

func (*Client) RemoteSchemaChange added in v0.12.0

func (c *Client) RemoteSchemaChange(
	_ *rpc2.Client, msg *RespHandshake, _ *Empty,
) error

RemoteSchemaChange is called by the server on a source machine schema change.

func (*Client) RemoteSendPayload

func (c *Client) RemoteSendPayload(
	_ *rpc2.Client, payload *ArgsPayload, _ *Empty,
) error

RemoteSendPayload receives a payload from the server and triggers WorkerPayload. The Consumer should bind his handlers and handle this state to receive the data.

func (*Client) RemoteSendingPayload added in v0.8.0

func (c *Client) RemoteSendingPayload(
	_ *rpc2.Client, payload *ArgsPayload, _ *Empty,
) error

RemoteSendingPayload triggers the WorkerDelivering state, which is an optional indication that the server has started a data transmission to the Client. This payload shouldn't contain the data itself, only the name and token.

func (*Client) RemoteSetClock

func (c *Client) RemoteSetClock(
	_ *rpc2.Client, clock *ClockMsg, _ *Empty,
) error

RemoteSetClock updates the client's clock. Only called by the server.

func (*Client) RetryingCallEnter added in v0.8.0

func (c *Client) RetryingCallEnter(e *am.Event) bool

func (*Client) RetryingConnState added in v0.8.0

func (c *Client) RetryingConnState(e *am.Event)

RetryingConnState should be set without Connecting in the same tx

func (*Client) Start

func (c *Client) Start() am.Result

Start connects the client to the server and initializes the worker. Results in the Ready state.

func (*Client) StartEnd

func (c *Client) StartEnd(e *am.Event)

func (*Client) StartState added in v0.8.0

func (c *Client) StartState(e *am.Event)

func (*Client) Stop

func (c *Client) Stop(waitTillExit context.Context, dispose bool) am.Result

Stop disconnects the client from the server and disposes the worker.

waitTillExit: if passed, waits for the client to disconnect using the context.

func (*Client) WorkerPayloadEnter added in v0.8.0

func (c *Client) WorkerPayloadEnter(e *am.Event) bool

func (*Client) WorkerPayloadState added in v0.8.0

func (c *Client) WorkerPayloadState(e *am.Event)

type ClientMethod added in v0.15.1

type ClientMethod enum.Member[string]

type ClientOpts added in v0.8.0

type ClientOpts struct {
	// PayloadState is a state for the server to listen on, to deliver payloads
	// to the client. The client adds this state to request a payload from the
	// worker. Default: am/rpc/states/WorkerStates.SendPayload.
	Consumer *am.Machine
	// Parent is a parent state machine for a new Client state machine. See
	// [am.Opts].
	Parent am.Api
}

type ClockMsg

type ClockMsg struct {
	// Updates contain an incremental diffs of [stateIdx, diff].
	Updates [][2]int
	// QueueTick is an incremental diff for the queue tick.
	QueueTick int
	// Checksum is the last digit of (TimeSum + QueueTick)
	Checksum uint8
}

func NewClockMsg

func NewClockMsg(
	tSum uint64, tBefore, tAfter am.Time, qBefore, qAfter uint64,
) *ClockMsg

NewClockMsg create a new diff update based on the last and current machine time, and last and current queue tick.

type Empty

type Empty struct{}

type ExceptionHandler

type ExceptionHandler struct {
	*am.ExceptionHandler
}

ExceptionHandler is a shared exception handler for RPC server and client.

func (*ExceptionHandler) ExceptionEnter

func (h *ExceptionHandler) ExceptionEnter(e *am.Event) bool

type Kind

type Kind string
const (
	KindClient Kind = "client"
	KindServer Kind = "server"
)

type Mux added in v0.8.0

type Mux struct {
	*am.ExceptionHandler
	Mach *am.Machine
	// Source is the state source to expose via RPC. Required if NewServerFn
	// isnt provided.
	Source am.Api
	// NewServerFn creates a new instance of Server and is called for every new
	// connection.
	NewServerFn MuxNewServerFn

	Name string
	Addr string
	// The listener used by this Mux, can be set manually before Start().
	Listener   net.Listener
	LogEnabled bool
	// The last error returned by NewServerFn.
	NewServerErr error
	// contains filtered or unexported fields
}

Mux creates a new RPC server for each incoming connection.

func NewMux added in v0.8.0

func NewMux(
	ctx context.Context, name string, newServerFn MuxNewServerFn, opts *MuxOpts,
) (*Mux, error)

NewMux initializes a Mux instance to handle RPC server creation for incoming connections with the given parameters.

newServerFn: when nil, [Mux.Source] needs to be set manually before calling Mux.Start.

func (*Mux) ClientConnectedState added in v0.8.0

func (m *Mux) ClientConnectedState(e *am.Event)

func (*Mux) ExceptionState added in v0.8.0

func (m *Mux) ExceptionState(e *am.Event)

func (*Mux) HasClientsEnd added in v0.8.0

func (m *Mux) HasClientsEnd(e *am.Event) bool

func (*Mux) HealthcheckState added in v0.8.0

func (m *Mux) HealthcheckState(e *am.Event)

func (*Mux) NewServerErrEnter added in v0.8.0

func (m *Mux) NewServerErrEnter(e *am.Event) bool

func (*Mux) NewServerErrState added in v0.8.0

func (m *Mux) NewServerErrState(e *am.Event)

func (*Mux) Start added in v0.8.0

func (m *Mux) Start() am.Result

func (*Mux) StartEnd added in v0.8.0

func (m *Mux) StartEnd(e *am.Event)

func (*Mux) StartEnter added in v0.10.2

func (m *Mux) StartEnter(e *am.Event) bool

func (*Mux) StartState added in v0.8.0

func (m *Mux) StartState(e *am.Event)

func (*Mux) Stop added in v0.8.0

func (m *Mux) Stop(dispose bool) am.Result

type MuxNewServerFn added in v0.10.2

type MuxNewServerFn func(num int, conn net.Conn) (*Server, error)

MuxNewServerFn is a function to create a new RPC server for each incoming connection.

type MuxOpts added in v0.8.0

type MuxOpts struct {
	// Parent is a parent state machine for a new Mux state machine. See
	// [am.Opts].
	Parent am.Api
}

type PushAllTicks added in v0.8.0

type PushAllTicks struct {
	MutationType []am.MutationType
	CalledStates [][]int
	ClockMsg     []*ClockMsg
}

PushAllTicks TODO implement PushAllTicks should list all the mutations, with clocks and queue ticks which then will be processed as a queue. The client reconstructs the tx on his side. TODO mind partially accepted auto states (fake called states).

type RespGet

type RespGet struct {
	Value any
}

type RespHandshake

type RespHandshake struct {
	Schema     am.Schema
	Serialized *am.Serialized
}

type RespResult

type RespResult struct {
	Clock  *ClockMsg
	Result am.Result
}

type RespSync

type RespSync struct {
	Time      am.Time
	QueueTick uint64
}

type SendPayloadHandlers added in v0.8.0

type SendPayloadHandlers struct {
	SendPayloadState am.HandlerFinal
}

type Server

type Server struct {
	*ExceptionHandler
	Mach *am.Machine

	// Source is a state Source, either a local or remote RPC worker.
	Source am.Api
	// Addr is the address of the server on the network.
	Addr string
	// DeliveryTimeout is a timeout for SendPayload to the client.
	DeliveryTimeout time.Duration
	// PushInterval is the interval for clock updates, effectively throttling
	// the number of updates sent to the client within the interval window.
	// 0 means pushes are disabled. Setting to a very small value will make
	// pushes instant.
	PushInterval time.Duration
	// PushAllTicks will push all ticks to the client, enabling client-side
	// final handlers. TODO more info, implement via a queue
	PushAllTicks bool
	// Listener can be set manually before starting the server.
	Listener atomic.Pointer[net.Listener]
	// Conn can be set manually before starting the server.
	Conn net.Conn
	// NoNewListener will prevent the server from creating a new listener if
	// one is not provided, or has been closed. Useful for cmux.
	NoNewListener bool
	LogEnabled    bool
	CallCount     uint64

	// AllowId will limit clients to a specific ID, if set.
	AllowId string
	// contains filtered or unexported fields
}

Server is an RPC server that can be bound to a worker machine and provide remote access to its states and methods.

func NewServer

func NewServer(
	ctx context.Context, addr string, name string, sourceMach am.Api,
	opts *ServerOpts,
) (*Server, error)

NewServer creates a new RPC server, bound to a worker machine. The source machine has to implement am/rpc/states/WorkerStatesDef interface.

func (*Server) ClientId added in v0.9.0

func (s *Server) ClientId() string

func (*Server) GetKind

func (s *Server) GetKind() Kind

GetKind returns a kind of RPC component (server / client).

func (*Server) HandshakeDoneEnd

func (s *Server) HandshakeDoneEnd(e *am.Event)

func (*Server) RemoteAdd

func (s *Server) RemoteAdd(
	_ *rpc2.Client, args *ArgsMut, resp *RespResult,
) error

func (*Server) RemoteAddNS

func (s *Server) RemoteAddNS(
	_ *rpc2.Client, args *ArgsMut, _ *Empty,
) error

func (*Server) RemoteBye

func (s *Server) RemoteBye(
	_ *rpc2.Client, _ *Empty, _ *Empty,
) error

RemoteBye means the client says goodbye and will disconnect shortly.

func (*Server) RemoteHandshake

func (s *Server) RemoteHandshake(
	client *rpc2.Client, id *string, _ *Empty,
) error

func (*Server) RemoteHello added in v0.8.0

func (s *Server) RemoteHello(
	client *rpc2.Client, req *ArgsHello, resp *RespHandshake,
) error

func (*Server) RemoteRemove

func (s *Server) RemoteRemove(
	_ *rpc2.Client, args *ArgsMut, resp *RespResult,
) error

func (*Server) RemoteSet

func (s *Server) RemoteSet(
	_ *rpc2.Client, args *ArgsMut, resp *RespResult,
) error

func (*Server) RemoteSetPushAllTicks added in v0.8.0

func (s *Server) RemoteSetPushAllTicks(
	_ *rpc2.Client, val bool, _ *Empty,
) error

func (*Server) RemoteSync

func (s *Server) RemoteSync(
	_ *rpc2.Client, sum uint64, resp *RespSync,
) error

func (*Server) RpcReadyEnter added in v0.8.0

func (s *Server) RpcReadyEnter(e *am.Event) bool

func (*Server) RpcReadyState

func (s *Server) RpcReadyState(e *am.Event)

RpcReadyState starts a ticker to compensate for clock push debounces.

func (*Server) RpcStartingEnter added in v0.8.0

func (s *Server) RpcStartingEnter(e *am.Event) bool

func (*Server) RpcStartingState

func (s *Server) RpcStartingState(e *am.Event)

func (*Server) SendPayload

func (s *Server) SendPayload(
	ctx context.Context, event *am.Event, payload *ArgsPayload,
) error

SendPayload sends a payload to the client. It's usually called by a handler for SendPayload.

func (*Server) Start

func (s *Server) Start() am.Result

Start starts the server, optionally creating a Listener (if Addr provided). Results in either RpcReady or Exception.

func (*Server) StartEnd added in v0.8.0

func (s *Server) StartEnd(e *am.Event)

func (*Server) Stop

func (s *Server) Stop(dispose bool) am.Result

Stop stops the server, and optionally disposes resources.

type ServerMethod added in v0.15.1

type ServerMethod enum.Member[string]

type ServerOpts added in v0.8.0

type ServerOpts struct {
	// PayloadState is a state for the server to listen on, to deliver payloads
	// to the client. The client activates this state to request a payload from
	// the worker. Default: am/rpc/states/WorkerStates.SendPayload.
	PayloadState string
	// Parent is a parent state machine for a new Server state machine. See
	// [am.Opts].
	Parent am.Api
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is a subset of `pkg/machine#Machine` for RPC. Lacks the queue and other local methods. Most methods are clock-based, thus executed locally. TODO rename to NetworkMachine (netMach)

func NewWorker added in v0.10.1

func NewWorker(
	ctx context.Context, id string, c *Client, schema am.Schema, stateNames am.S,
	parent *am.Machine, tags []string,
) (*Worker, error)

NewWorker creates a new instance of a Worker. TODO link godoc to pkg/machine

func (*Worker) ActiveStates

func (w *Worker) ActiveStates(states am.S) am.S

ActiveStates returns a copy of the currently active states.

func (*Worker) Add

func (w *Worker) Add(states am.S, args am.A) am.Result

Add activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.

func (*Worker) Add1

func (w *Worker) Add1(state string, args am.A) am.Result

Add1 is a shorthand method to add a single state with the passed args.

func (*Worker) Add1NS

func (w *Worker) Add1NS(state string, args am.A) am.Result

Add1NS is a single state version of AddNS.

func (*Worker) AddBreakpoint added in v0.12.0

func (w *Worker) AddBreakpoint(added am.S, removed am.S, strict bool)

func (*Worker) AddBreakpoint1 added in v0.15.1

func (w *Worker) AddBreakpoint1(added string, removed string, strict bool)

func (*Worker) AddErr

func (w *Worker) AddErr(err error, args am.A) am.Result

AddErr is a dedicated method to add the Exception state with the passed error and optional arguments. Like every mutation method, it will resolve relations and trigger handlers. AddErr produces a stack trace of the error, if LogStackTrace is enabled.

func (*Worker) AddErrState

func (w *Worker) AddErrState(state string, err error, args am.A) am.Result

AddErrState adds a dedicated error state, along with the build in Exception state. Like every mutation method, it will resolve relations and trigger handlers. AddErrState produces a stack trace of the error, if LogStackTrace is enabled.

func (*Worker) AddNS

func (w *Worker) AddNS(states am.S, args am.A) am.Result

AddNS is a NoSync method - an efficient way for adding states, as it doesn't wait for, nor transfers a response. Because of which it doesn't update the clock. Use Sync() to update the clock after a batch of AddNS calls.

func (*Worker) Any

func (w *Worker) Any(states ...am.S) bool

Any is group call to Is, returns true if any of the params return true from Is.

machine.StringAll() // ()[Foo:0 Bar:0 Baz:0]
machine.Add(S{"Foo"})
// is(Foo, Bar) or is(Bar)
machine.Any(S{"Foo", "Bar"}, S{"Bar"}) // false
// is(Foo) or is(Bar)
machine.Any(S{"Foo"}, S{"Bar"}) // true

func (*Worker) Any1

func (w *Worker) Any1(states ...string) bool

Any1 is group call to Is1(), returns true if any of the params return true from Is1().

func (*Worker) BindHandlers added in v0.8.0

func (w *Worker) BindHandlers(handlers any) error

BindHandlers binds a struct of handler methods to machine's states, based on the naming convention, eg `FooState(e *Event)`. Negotiation handlers can optionally return bool.

RPC worker will bind handlers locally, not to the remote machine. RPC worker handlers are still TODO.

func (*Worker) BindTracer added in v0.8.0

func (w *Worker) BindTracer(tracer am.Tracer) error

BindTracer binds a Tracer to the machine.

func (*Worker) CanAdd added in v0.15.0

func (w *Worker) CanAdd(states am.S, args am.A) am.Result

func (*Worker) CanAdd1 added in v0.15.0

func (w *Worker) CanAdd1(state string, args am.A) am.Result

func (*Worker) CanRemove added in v0.15.0

func (w *Worker) CanRemove(states am.S, args am.A) am.Result

func (*Worker) CanRemove1 added in v0.15.0

func (w *Worker) CanRemove1(state string, args am.A) am.Result

func (*Worker) Clock

func (w *Worker) Clock(states am.S) am.Clock

Clock returns current machine's clock, a state-keyed map of ticks. If states are passed, only the ticks of the passed states are returned.

func (*Worker) CountActive added in v0.15.0

func (w *Worker) CountActive(states am.S) int

CountActive returns the number of active states from a passed list. Useful for state groups.

func (*Worker) Ctx

func (w *Worker) Ctx() context.Context

Ctx return worker's root context.

func (*Worker) DetachHandlers added in v0.9.0

func (w *Worker) DetachHandlers(handlers any) error

DetachHandlers detaches previously bound machine handlers.

func (*Worker) DetachTracer added in v0.8.0

func (w *Worker) DetachTracer(tracer am.Tracer) error

DetachTracer tries to remove a tracer from the machine. Returns true if the tracer was found and removed.

func (*Worker) Dispose

func (w *Worker) Dispose()

Dispose disposes the machine and all its emitters. You can wait for the completion of the disposal with `<-mach.WhenDisposed`.

func (*Worker) Err

func (w *Worker) Err() error

Err returns the last error.

func (*Worker) EvAdd added in v0.9.0

func (w *Worker) EvAdd(event *am.Event, states am.S, args am.A) am.Result

func (*Worker) EvAdd1 added in v0.9.0

func (w *Worker) EvAdd1(event *am.Event, state string, args am.A) am.Result

EvAdd1 is a shorthand method to add a single state with the passed args.

func (*Worker) EvAddErr added in v0.9.0

func (w *Worker) EvAddErr(event *am.Event, err error, args am.A) am.Result

func (*Worker) EvAddErrState added in v0.9.0

func (w *Worker) EvAddErrState(
	event *am.Event, state string, err error, args am.A,
) am.Result

func (*Worker) EvRemove added in v0.9.0

func (w *Worker) EvRemove(event *am.Event, states am.S, args am.A) am.Result

func (*Worker) EvRemove1 added in v0.9.0

func (w *Worker) EvRemove1(event *am.Event, state string, args am.A) am.Result

func (*Worker) EvToggle added in v0.15.1

func (w *Worker) EvToggle(e *am.Event, states am.S, args am.A) am.Result

EvToggle is a traced version of [Machine.Toggle].

func (*Worker) EvToggle1 added in v0.15.1

func (w *Worker) EvToggle1(e *am.Event, state string, args am.A) am.Result

EvToggle1 is a traced version of [Machine.Toggle1].

func (*Worker) Export

func (w *Worker) Export() *am.Serialized

Export exports the machine state: id, time and state names.

func (*Worker) Groups added in v0.15.0

func (w *Worker) Groups() (map[string][]int, []string)

func (*Worker) Has

func (w *Worker) Has(states am.S) bool

Has return true is passed states are registered in the machine.

func (*Worker) Has1

func (w *Worker) Has1(state string) bool

Has1 is a shorthand for Has. It returns true if the passed state is registered in the machine.

func (*Worker) HasHandlers added in v0.12.0

func (w *Worker) HasHandlers() bool

HasHandlers returns true if this machine has bound handlers, and thus an allocated goroutine. It also makes it nondeterministic.

func (*Worker) Id added in v0.8.0

func (w *Worker) Id() string

Id returns the machine's id.

func (*Worker) Index

func (w *Worker) Index(states am.S) []int

func (*Worker) Index1 added in v0.12.0

func (w *Worker) Index1(state string) int

Index1 returns the index of a state in the machine's StateNames() list.

func (*Worker) Inspect

func (w *Worker) Inspect(states am.S) string

Inspect returns a multi-line string representation of the machine (states, relations, clock). states: param for ordered or partial results.

func (*Worker) InternalUpdateClock added in v0.15.1

func (w *Worker) InternalUpdateClock(now am.Time, qTick uint64, lock bool)

InternalUpdateClock is an internal method to update the clock of this Worker. It should NOT be called by anything else then a synchronization source (eg RPC client, pubsub, etc).

func (*Worker) Is

func (w *Worker) Is(states am.S) bool

Is checks if all the passed states are currently active.

machine.StringAll() // ()[Foo:0 Bar:0 Baz:0]
machine.Add(S{"Foo"})
machine.TestIs(S{"Foo"}) // true
machine.TestIs(S{"Foo", "Bar"}) // false

func (*Worker) Is1

func (w *Worker) Is1(state string) bool

Is1 is a shorthand method to check if a single state is currently active. See Is().

func (*Worker) IsClock

func (w *Worker) IsClock(clock am.Clock) bool

IsClock checks if the machine has changed since the passed clock. Returns true if at least one state has changed.

func (*Worker) IsDisposed added in v0.8.0

func (w *Worker) IsDisposed() bool

IsDisposed returns true if the machine has been disposed.

func (*Worker) IsErr

func (w *Worker) IsErr() bool

IsErr checks if the machine has the Exception state currently active.

func (*Worker) IsTime

func (w *Worker) IsTime(t am.Time, states am.S) bool

IsTime checks if the machine has changed since the passed time (list of ticks). Returns true if at least one state has changed. The states param is optional and can be used to check only a subset of states.

func (*Worker) Log

func (w *Worker) Log(msg string, args ...any)

Log logs is a remote logger.

func (*Worker) MachineTick added in v0.16.0

func (w *Worker) MachineTick() uint32

func (*Worker) MustParseStates

func (w *Worker) MustParseStates(states am.S) am.S

MustParseStates parses the states and returns them as a list. Panics when a state is not defined. It's an usafe equivalent of VerifyStates.

func (*Worker) NewStateCtx

func (w *Worker) NewStateCtx(state string) context.Context

NewStateCtx returns a new sub-context, bound to the current clock's tick of the passed state.

Context cancels when the state has been de-activated, or right away, if it isn't currently active.

State contexts are used to check state expirations and should be checked often inside goroutines. TODO log reader

func (*Worker) Not

func (w *Worker) Not(states am.S) bool

Not checks if **none** of the passed states are currently active.

machine.StringAll()
// -> ()[A:0 B:0 C:0 D:0]
machine.Add(S{"A", "B"})

// not(A) and not(C)
machine.TestNot(S{"A", "C"})
// -> false

// not(C) and not(D)
machine.TestNot(S{"C", "D"})
// -> true

func (*Worker) Not1

func (w *Worker) Not1(state string) bool

Not1 is a shorthand method to check if a single state is currently inactive. See Not().

func (*Worker) OnDispose added in v0.16.0

func (w *Worker) OnDispose(fn am.HandlerDispose)

func (*Worker) ParentId added in v0.8.0

func (w *Worker) ParentId() string

ParentId returns the id of the parent machine (if any).

func (*Worker) ParseStates added in v0.16.0

func (w *Worker) ParseStates(states am.S) am.S

func (*Worker) QueueLen added in v0.16.0

func (w *Worker) QueueLen() uint16

func (*Worker) QueueTick added in v0.15.0

func (w *Worker) QueueTick() uint64

func (*Worker) RemoteId added in v0.10.2

func (w *Worker) RemoteId() string

RemoteId returns the ID of the remote state machine.

func (*Worker) Remove

func (w *Worker) Remove(states am.S, args am.A) am.Result

Remove deactivates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.

func (*Worker) Remove1

func (w *Worker) Remove1(state string, args am.A) am.Result

Remove1 is a shorthand method to remove a single state with the passed args. See Remove().

func (*Worker) Schema added in v0.11.0

func (w *Worker) Schema() am.Schema

Schema returns a copy of machine's state structure.

func (*Worker) SemLogger added in v0.15.0

func (w *Worker) SemLogger() am.SemLogger

func (*Worker) Set

func (w *Worker) Set(states am.S, args am.A) am.Result

Set deactivates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.

func (*Worker) StateNames

func (w *Worker) StateNames() am.S

StateNames returns a copy of all the state names.

func (*Worker) StateNamesMatch added in v0.12.0

func (w *Worker) StateNamesMatch(re *regexp.Regexp) am.S

func (*Worker) StatesVerified added in v0.8.0

func (w *Worker) StatesVerified() bool

StatesVerified returns true if the state names have been ordered using VerifyStates.

func (*Worker) String

func (w *Worker) String() string

String returns a one line representation of the currently active states, with their clock values. Inactive states are omitted. Eg: (Foo:1 Bar:3)

func (*Worker) StringAll

func (w *Worker) StringAll() string

StringAll returns a one line representation of all the states, with their clock values. Inactive states are in square brackets. Eg: (Foo:1 Bar:3)[Baz:2]

func (*Worker) Switch

func (w *Worker) Switch(groups ...am.S) string

Switch returns the first state from the passed list that is currently active, making it useful for switch statements.

switch mach.Switch(ss.GroupPlaying) {
case "Playing":
case "Paused":
case "Stopped":
}

func (*Worker) Sync

func (w *Worker) Sync() am.Time

Sync requests fresh clock values from the remote machine. Useful to call after a batch of no-sync methods, eg AddNS.

func (*Worker) Tags added in v0.9.0

func (w *Worker) Tags() []string

Tags returns machine's tags, a list of unstructured strings without spaces.

func (*Worker) Tick

func (w *Worker) Tick(state string) uint64

Tick returns the current tick for a given state.

func (*Worker) Time

func (w *Worker) Time(states am.S) am.Time

Time returns machine's time, a list of ticks per state. Returned value includes the specified states, or all the states if nil.

func (*Worker) Toggle added in v0.15.1

func (w *Worker) Toggle(states am.S, args am.A) am.Result

Toggle deactivates a list of states in case all are active, or activates them otherwise. Returns the result of the transition (Executed, Queued, Canceled).

func (*Worker) Toggle1 added in v0.15.1

func (w *Worker) Toggle1(state string, args am.A) am.Result

Toggle1 activates or deactivates a single state, depending on its current state. Returns the result of the transition (Executed, Queued, Canceled).

func (*Worker) Tracers added in v0.8.0

func (w *Worker) Tracers() []am.Tracer

Tracers return a copy of currenty attached tracers.

func (*Worker) WasClock added in v0.12.0

func (w *Worker) WasClock(clock am.Clock) bool

WasClock checks if the passed time has happened (or happening right now). Returns false if at least one state is too early.

func (*Worker) WasTime added in v0.12.0

func (w *Worker) WasTime(t am.Time, states am.S) bool

WasTime checks if the passed time has happened (or happening right now). Returns false if at least one state is too early. The states param is optional and can be used to check only a subset of states.

func (*Worker) When

func (w *Worker) When(states am.S, ctx context.Context) <-chan struct{}

When returns a channel that will be closed when all the passed states become active or the machine gets disposed.

ctx: optional context that will close the channel early.

func (*Worker) When1

func (w *Worker) When1(state string, ctx context.Context) <-chan struct{}

When1 is an alias to When() for a single state. See When.

func (*Worker) WhenArgs

func (w *Worker) WhenArgs(
	state string, args am.A, ctx context.Context,
) <-chan struct{}

WhenArgs returns a channel that will be closed when the passed state becomes active with all the passed args. Args are compared using the native '=='. It's meant to be used with async Multi states, to filter out a specific completion.

ctx: optional context that will close the channel when done.

func (*Worker) WhenDisposed

func (w *Worker) WhenDisposed() <-chan struct{}

WhenDisposed returns a channel that will be closed when the machine is disposed. Requires bound handlers. Use Machine.Disposed in case no handlers have been bound.

func (*Worker) WhenErr

func (w *Worker) WhenErr(disposeCtx context.Context) <-chan struct{}

WhenErr returns a channel that will be closed when the machine is in the StateException state.

ctx: optional context that will close the channel early.

func (*Worker) WhenNot

func (w *Worker) WhenNot(states am.S, ctx context.Context) <-chan struct{}

WhenNot returns a channel that will be closed when all the passed states become inactive or the machine gets disposed.

ctx: optional context that will close the channel early.

func (*Worker) WhenNot1

func (w *Worker) WhenNot1(state string, ctx context.Context) <-chan struct{}

WhenNot1 is an alias to WhenNot() for a single state. See WhenNot.

func (*Worker) WhenQuery added in v0.16.0

func (w *Worker) WhenQuery(
	clockCheck func(clock am.Clock) bool, ctx context.Context,
) <-chan struct{}

WhenQuery returns a channel that will be closed when the passed [clockCheck] function returns true. [clockCheck] should be a pure function and non-blocking.`

ctx: optional context that will close the channel early.

func (*Worker) WhenQueue added in v0.15.0

func (w *Worker) WhenQueue(tick am.Result) <-chan struct{}

func (*Worker) WhenTicks

func (w *Worker) WhenTicks(
	state string, ticks int, ctx context.Context,
) <-chan struct{}

WhenTicks waits N ticks of a single state (relative to now). Uses WhenTime underneath.

ctx: optional context that will close the channel early.

func (*Worker) WhenTime

func (w *Worker) WhenTime(
	states am.S, times am.Time, ctx context.Context,
) <-chan struct{}

WhenTime returns a channel that will be closed when all the passed states have passed the specified time. The time is a logical clock of the state. Machine time can be sourced from [Machine.Time](), or [Machine.Clock]().

ctx: optional context that will close the channel early.

func (*Worker) WhenTime1 added in v0.11.0

func (w *Worker) WhenTime1(
	state string, ticks uint64, ctx context.Context,
) <-chan struct{}

WhenTime1 waits till ticks for a single state equal the given value (or more).

ctx: optional context that will close the channel early.

type WorkerTracer

type WorkerTracer struct {
	*am.NoOpTracer
	// contains filtered or unexported fields
}

WorkerTracer is a tracer for local worker machines (event source).

func (*WorkerTracer) SchemaChange added in v0.12.0

func (t *WorkerTracer) SchemaChange(mach am.Api, oldSchema am.Schema)

func (*WorkerTracer) TransitionEnd

func (t *WorkerTracer) TransitionEnd(tx *am.Transition)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL