rpc

package
v0.17.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: MIT Imports: 26 Imported by: 1

README

/pkg/rpc

cd /

[!NOTE] asyncmachine-go is a batteries-included graph control flow library (AOP, actor model, state-machine).

aRPC is a transparent RPC for state machines implemented using asyncmachine-go. It's clock-based and features many optimizations, e.g. having most of the API methods executed locally (as state changes are regularly pushed to the client). It's built on top of cenkalti/rpc2, net/rpc, and soheilhy/cmux. Check out a dedicated example, gRPC benchmark, and an integration tests tutorial.

Features

  • mutation methods
  • wait methods
  • clock pushes (from source mutations)
  • remote contexts
  • multiplexing
  • reconnect / fail-safety
  • network machine sending payloads to the client
  • REPL
  • queue ticks support
  • initial optimizations

Not implemented (yet):

  • WhenArgs, Err()
  • PushAllTicks
  • chunked payloads
  • TLS
  • compression
  • msgpack encoding

Each RPC server can handle 1 RPC client at a time, but 1 state source (asyncmachine) can have many RPC servers attached to itself (via Tracer API). Additionally, remote RPC network-machines can also have RPC servers attached to themselves, creating a tree structure (see /examples/benchmark_state_source).

Components

Network Machine

Any state machine can be exposed as an RPC network machine, as long as it implements /pkg/rpc/states/Network MachineStructDef. This can be done either manually, or by using state helpers (SchemaMerge, SAdd), or by generating a schema file with am-gen. It's also required to have the states verified by Machine.VerifyStates. Network Machine can send data to the client via the SendPayload state.

import (
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

// inherit from RPC network machine
schema := am.SchemaMerge(ssrpc.Network MachineStruct, am.Schema{
    "Foo": {Require: am.S{"Bar"}},
    "Bar": {},
})
name := am.SAdd(ssrpc.Network MachineStates.Names(), am.S{"Foo", "Bar"})

// init
network machine := am.New(ctx, schema, nil)
netMach.VerifyStates(name)

// ...

// send data to the client
netMach.Add1(ssrpc.Network MachineStates.SendPayload, arpc.Pass(&arpc.A{
    Name: "mypayload",
    Payload: &arpc.ArgsPayload{
        Name: "mypayload",
        Source: "network machine1",
        Data: []byte{1,2,3},
    },
}))
Network Machine Schema

State schema from /pkg/rpc/states/ss_rpc.go.

type NetMachStatesDef struct {
    ErrProviding string
    ErrSendPayload string
    SendPayload string
}
Server

Each RPC server can handle 1 client at a time. Both client and server need the same network machine states definition (structure map and ordered list of states). After the initial handshake, server will be pushing local state changes every PushInterval, while state changes made by an RPC client are delivered synchronously. Server starts listening on either Addr, Listener, or Conn. Basic ACL is possible via AllowId.

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

var addr string
var netMach *am.Machine

// init
s, err := arpc.NewServer(ctx, addr, netMach.ID, network machine, nil)
if err != nil {
    panic(err)
}

// start
s.Start()
err = amhelp.WaitForAll(ctx, 2*time.Second,
    s.Mach.When1(ssrpc.ServerStates.RpcReady, ctx))
if ctx.Err() != nil {
    return
}
if err != nil {
    return err
}

// react to the client
<-netMach.When1("Foo", nil)
print("Client added Foo")
netMach.Add1("Bar", nil)
Server Schema

State schema from /pkg/rpc/states/ss_rpc.go.

Client

Each RPC client can connect to 1 server and needs to know network machine's machine schema and order. Data send by a network machine via SendPayload will be received by a Consumer machine (passed via ClientOpts.Consumer) as an Add mutation of the Network MachinePayload state (see a detailed diagram). Client supports fail-safety for both connection (eg ConnRetries, ConnRetryBackoff) and calls (eg CallRetries, CallRetryBackoff).

After the client's Ready state becomes active, it exposes a remote network machine at client.Network Machine. Remote network machine implements most of Machine's methods, many of which are evaluated locally (like Is, When, NewStateCtx). See machine.Api for a full list.

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

var addr string
// network machine state schema
var schema am.Schema
// network machine state names
var names am.S

// consumer
consumer := am.New(ctx, ssrpc.ConsumerSchema, nil)

// init
c, err := arpc.NewClient(ctx, addr, "clientid", schema, &arpc.ClientOpts{
    Consumer: consumer,
})

// start
c.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
    c.Mach.When1(ssrpc.ClientStates.Ready, ctx))
if ctx.Err() != nil {
    return
}

// use the remote network machine
c.NetMach.Add1("Foo", nil)
<-c.NetMach.When1("Bar", nil)
print("Server added Bar")
Client Schema

State schema from /pkg/rpc/states/ss_rpc.go.

Multiplexer

Because 1 server can serve only 1 client (for simplicity), it's often required to use a port multiplexer. It's very simple to create one using NewMux and a callback function, which returns a new server instance.

import (
    amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
    arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
    ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)

// ...

var ctx context.Context

// new server per each new client (optional)
var newServer arpc.MuxNewServer = func(num int64, _ net.Conn) (*Server, error) {
    name := fmt.Sprintf("%s-%d", t.Name(), num)
    s, err := NewServer(ctx, "", name, w, nil)
    if err != nil {
        t.Fatal(err)
    }

    return s, nil
}

// start cmux
mux, err := arpc.NewMux(ctx, t.Name(), newServer, nil)
mux.Listener = listener // or mux.Addr := ":1234"
mux.Start()
err := amhelp.WaitForAll(ctx, 2*time.Second,
    mux.Mach.When1(ssrpc.MuxStates.Ready, ctx))
Multiplexer Schema

State schema from /pkg/rpc/states/ss_rpc.go.

Selective Distribution

Every state-machine can be partially distributed over the network and updated on a different granularity level.

Synchronization scenarios:

  • full sync (state names, schema, machine time)
  • no schema (state names, machine time)
  • selected states (state names, schema, selective ticks)
  • shallow clocks (state names, schema, binary ticks)

Synchronization granularity:

  • 1 clock diff per N mutations
  • N clock diffs per N mutations

Scenarios can be mixed with each other to a certain degree (eg shallow clocks for selected states). Selective Distribution is like state piping, but over the network and utilizes all other aRPC optimizations.

Network Handlers

Locally piped Network Machine can then run local handlers (via a local /pkg/machine.Machine instance) and mutate itself, which will effectively mutate the Network Source. Combining network handlers with selective distribution can lead to large network coverage (number of hosts) consuming a tiny bandwidth usage. The structure of the network has to be fixed, unlike in case of /pkg/pubsub.

By combining Network Handlers with Selective Distribution received by a redundant number of clients (per state), we can create multidimensional graphs. Those could be developed further with "voting receivers" (vote-based firewalls) to create more organic systems.

Implementation details

  • schema is always full-or-nothing
  • state indexes and time slices are always source-bound
    • unless there's no schema sync, then client-bound
  • time sum is always client-bound for checksums
    • from a binary time slice for shallow clocks
  • aRPC mutations don't have arguments - hash am.Time and machine IDs to create a remote address for payloads

aRPC implements several optimization strategies to achieve the results:

  • net/rpc method names as runes
  • binary format of encoding/gob
  • index-based clock
    • [0, 100, 0, 120]
  • diff-based clock updates
    • [0, 1, 0, 1]
  • debounced server-mutation clock pushes
    • [0, 5, 2, 1]
  • partial clock updates
    • [[1, 1], [3, 1]]

Documentation

Benchmark: aRPC vs gRPC

A simple and opinionated benchmark showing a subscribe-get-process scenario, implemented in both gRPC and aRPC. See /examples/benchmark_grpc for details and source code.

> task benchmark-grpc
...
BenchmarkClientArpc
    client_arpc_test.go:136: Transferred: 609 bytes
    client_arpc_test.go:137: Calls: 4
    client_arpc_test.go:138: Errors: 0
    client_arpc_test.go:136: Transferred: 1,149,424 bytes
    client_arpc_test.go:137: Calls: 10,003
    client_arpc_test.go:138: Errors: 0
BenchmarkClientArpc-8              10000            248913 ns/op           28405 B/op        766 allocs/op
BenchmarkClientGrpc
    client_grpc_test.go:117: Transferred: 1,113 bytes
    client_grpc_test.go:118: Calls: 9
    client_grpc_test.go:119: Errors: 0
    client_grpc_test.go:117: Transferred: 3,400,812 bytes
    client_grpc_test.go:118: Calls: 30,006
    client_grpc_test.go:119: Errors: 0
BenchmarkClientGrpc-8              10000            262693 ns/op           19593 B/op        391 allocs/op
BenchmarkClientLocal
BenchmarkClientLocal-8             10000               434.4 ns/op            16 B/op          1 allocs/op
PASS
ok      github.com/pancsta/asyncmachine-go/examples/benchmark_grpc      5.187s

API

aRPC implements /pkg/machine#Api, which is a large subset of /pkg/machine#Machine methods. Below the full list, with distinction which methods happen where (locally or on remote).

// TODO update
// A (arguments) is a map of named arguments for a Mutation.
type A map[string]any
// S (state names) is a string list of state names.
type S []string
type Time []uint64
type Clock map[string]uint64
type Result int
type Schema = map[string]State

// Api is a subset of Machine for alternative implementations.
type Api interface {
    // ///// REMOTE

    // Mutations (remote)

    Add1(state string, args A) Result
    Add(states S, args A) Result
    Remove1(state string, args A) Result
    Remove(states S, args A) Result
    AddErr(err error, args A) Result
    AddErrState(state string, err error, args A) Result
    Toggle(states S, args A) Result
    Toggle1(state string, args A) Result
    Set(states S, args A) Result

    // Traced mutations (remote)

    EvAdd1(event *Event, state string, args A) Result
    EvAdd(event *Event, states S, args A) Result
    EvRemove1(event *Event, state string, args A) Result
    EvRemove(event *Event, states S, args A) Result
    EvAddErr(event *Event, err error, args A) Result
    EvAddErrState(event *Event, state string, err error, args A) Result
    EvToggle(event *Event, states S, args A) Result
    EvToggle1(event *Event, state string, args A) Result

    // Waiting (remote)

    WhenArgs(state string, args A, ctx context.Context) <-chan struct{}

    // Getters (remote)

    Err() error

    // ///// LOCAL

    // Checking (local)

    IsErr() bool
    Is(states S) bool
    Is1(state string) bool
    Any(states ...S) bool
    Any1(state ...string) bool
    Not(states S) bool
    Not1(state string) bool
    IsTime(time Time, states S) bool
    WasTime(time Time, states S) bool
    IsClock(clock Clock) bool
    WasClock(clock Clock) bool
    Has(states S) bool
    Has1(state string) bool
    CanAdd(states S, args A) Result
    CanAdd1(state string, args A) Result
    CanRemove(states S, args A) Result
    CanRemove1(state string, args A) Result
    CountActive(states S) int

    // Waiting (local)

    When(states S, ctx context.Context) <-chan struct{}
    When1(state string, ctx context.Context) <-chan struct{}
    WhenNot(states S, ctx context.Context) <-chan struct{}
    WhenNot1(state string, ctx context.Context) <-chan struct{}
    WhenTime(states S, times Time, ctx context.Context) <-chan struct{}
    WhenTime1(state string, tick uint64, ctx context.Context) <-chan struct{}
    WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
    WhenQuery(query func(clock Clock) bool, ctx context.Context) <-chan struct{}
    WhenErr(ctx context.Context) <-chan struct{}
    WhenQueue(tick Result) <-chan struct{}

    // Getters (local)

    StateNames() S
    StateNamesMatch(re *regexp.Regexp) S
    ActiveStates() S
    Tick(state string) uint64
    Clock(states S) Clock
    Time(states S) Time
    TimeSum(states S) uint64
    QueueTick() uint64
    NewStateCtx(state string) context.Context
    Export() *Serialized
    Schema() Schema
    Switch(groups ...S) string
    Groups() (map[string][]int, []string)
    Index(states S) []int
    Index1(state string) int

    // Misc (local)

    Id() string
    ParentId() string
    Tags() []string
    Ctx() context.Context
    String() string
    StringAll() string
    Log(msg string, args ...any)
    SemLogger() SemLogger
    Inspect(states S) string
    BindHandlers(handlers any) error
    DetachHandlers(handlers any) error
    HasHandlers() bool
    StatesVerified() bool
    Tracers() []Tracer
    DetachTracer(tracer Tracer) error
    BindTracer(tracer Tracer) error
    AddBreakpoint1(added string, removed string, strict bool)
    AddBreakpoint(added S, removed S, strict bool)
    Dispose()
    WhenDisposed() <-chan struct{}
    IsDisposed() bool
}

Tests

aRPC passes the whole test suite of /pkg/machine for the exposed methods and provides a couple of optimization-focused tests (on top of tests for basic RPC).

Status

Testing, not semantically versioned.

monorepo

Go back to the monorepo root to continue reading.

Documentation

Overview

Package rpc is a transparent RPC for state machines.

Index

Constants

View Source
const (
	// EnvAmRpcLogServer enables machine logging for RPC server.
	EnvAmRpcLogServer = "AM_RPC_LOG_SERVER"
	// EnvAmRpcLogClient enables machine logging for RPC client.
	EnvAmRpcLogClient = "AM_RPC_LOG_CLIENT"
	// EnvAmRpcLogMux enables machine logging for RPC multiplexers.
	EnvAmRpcLogMux = "AM_RPC_LOG_MUX"
	// EnvAmRpcDbg enables env-based debugging for RPC components.
	EnvAmRpcDbg = "AM_RPC_DBG"
	// EnvAmReplAddr is a REPL address to listen on. "1" expands to 127.0.0.1:0.
	EnvAmReplAddr = "AM_REPL_ADDR"
	// EnvAmReplDir is a dir path to save the address file as
	// $AM_REPL_DIR/mach-id.addr. Optional.
	EnvAmReplDir = "AM_REPL_DIR"

	PrefixNetMach = "rnm-"
)
View Source
const APrefix = "am_rpc"

Variables

View Source
var (
	ServerAdd       = ServerMethod{"Add"}
	ServerAddNS     = ServerMethod{"AddNS"}
	ServerRemove    = ServerMethod{"Remove"}
	ServerSet       = ServerMethod{"Set"}
	ServerHello     = ServerMethod{"Hello"}
	ServerHandshake = ServerMethod{"Handshake"}
	ServerLog       = ServerMethod{"Log"}
	ServerSync      = ServerMethod{"Sync"}
	ServerArgs      = ServerMethod{"Args"}
	ServerBye       = ServerMethod{"Close"}

	ServerMethods = enum.New(ServerAdd, ServerAddNS, ServerRemove, ServerSet,
		ServerHello, ServerHandshake, ServerLog, ServerSync, ServerArgs, ServerBye)

	ClientUpdate          = ClientMethod{"ClientSetClock"}
	ClientUpdateMutations = ClientMethod{"ClientSetClockMany"}
	ClientPushAllTicks    = ClientMethod{"ClientPushAllTicks"}
	ClientSendPayload     = ClientMethod{"ClientSendPayload"}
	ClientBye             = ClientMethod{"ClientBye"}
	ClientSchemaChange    = ClientMethod{"SchemaChange"}

	ClientMethods = enum.New(ClientUpdate, ClientPushAllTicks,
		ClientSendPayload, ClientBye, ClientSchemaChange)
)
View Source
var (
	ErrInvalidParams = errors.New("invalid params")
	ErrInvalidResp   = errors.New("invalid response")
	ErrRpc           = errors.New("rpc")
	ErrNoAccess      = errors.New("no access")
	ErrNoConn        = errors.New("not connected")
	ErrDestination   = errors.New("wrong destination")

	ErrNetwork        = errors.New("network error")
	ErrNetworkTimeout = errors.New("network timeout")
)

Functions

func AddErr added in v0.8.0

func AddErr(e *am.Event, mach *am.Machine, msg string, err error)

AddErr detects sentinels from error msgs and calls the proper error setter. TODO also return error for compat

func AddErrNetwork added in v0.8.0

func AddErrNetwork(e *am.Event, mach *am.Machine, err error)

func AddErrNoConn added in v0.8.0

func AddErrNoConn(e *am.Event, mach *am.Machine, err error)

func AddErrParams added in v0.8.0

func AddErrParams(e *am.Event, mach *am.Machine, err error)

func AddErrResp added in v0.8.0

func AddErrResp(e *am.Event, mach *am.Machine, err error)

func AddErrRpcStr added in v0.8.0

func AddErrRpcStr(e *am.Event, mach *am.Machine, msg string)

func BindServer added in v0.8.0

func BindServer(source, target *am.Machine, rpcReady, clientConn string) error

BindServer binds RpcReady and ClientConnected with Add/Remove, to custom states.

func BindServerMulti added in v0.8.0

func BindServerMulti(
	source, target *am.Machine, rpcReady, clientConn, clientDisconn string,
) error

BindServerMulti binds RpcReady, ClientConnected, and ClientDisconnected. RpcReady is Add/Remove, the other two are Add-only to passed multi states.

func BindServerRpcReady added in v0.8.0

func BindServerRpcReady(source, target *am.Machine, rpcReady string) error

BindServerRpcReady bind RpcReady using Add to a custom multi state.

func Checksum added in v0.15.1

func Checksum(mTime uint64, qTick uint64, machTick uint32) uint8

Checksum calculates a short checksum of current machine time and ticks.

func GetClientId added in v0.8.0

func GetClientId(name string) string

GetClientId returns an RPC Client machine ID from a name. This ID will be used to handshake the server.

func LogArgs added in v0.8.0

func LogArgs(args am.A) map[string]string

LogArgs is an args logger for A.

func MachRepl added in v0.10.2

func MachRepl(mach am.Api, addr string, opts *ReplOpts) error

MachRepl sets up a machine for a REPL connection, which allows for mutations, like any other RPC connection. See [/tools/cmd/arpc] for usage. This function is considered a debugging helper and can panic.

addr: address to listen on, default to 127.0.0.1:0 addrDir: optional dir path to save the address file as addrDir/mach-id.addr addrCh: optional channel to send the address to, once ready errCh: optional channel for errors

func MachReplEnv added in v0.10.2

func MachReplEnv(mach am.Api) (error, <-chan error)

MachReplEnv sets up a machine for a REPL connection in case AM_REPL_ADDR env var is set. See MachRepl.

func NewMsgpackCodec added in v0.17.0

func NewMsgpackCodec(conn io.ReadWriteCloser) rpc2.Codec

TODO optimize with msgpack

func NewNetworkMachine added in v0.17.0

func NewNetworkMachine(
	ctx context.Context, id string, conn NetMachConn, schema am.Schema,
	stateNames am.S, parent *am.Machine, tags []string, filterMutations bool,
) (*NetworkMachine, *NetMachInternal, error)

NewNetworkMachine creates a new instance of a NetworkMachine.

func Pass added in v0.8.0

func Pass(args *A) am.A

Pass prepares am.A from A to pass to further mutations.

func PassRpc added in v0.9.0

func PassRpc(args *A) am.A

PassRpc prepares am.A from A to pass over RPC.

func TrafficMeter

func TrafficMeter(
	listener net.Listener, fwdTo string, counter chan<- int64,
	end <-chan struct{},
)

TrafficMeter measures the traffic of a listener and forwards it to a destination. Results are sent to the [counter] channel. Useful for testing and benchmarking.

Types

type A added in v0.8.0

type A struct {
	Id        string `log:"id"`
	Name      string `log:"name"`
	MachTime  am.Time
	QueueTick uint64
	MachTick  uint32
	Payload   *MsgSrvPayload
	Addr      string `log:"addr"`
	Err       error
	Method    string `log:"addr"`
	StartedAt time.Time
	Dispose   bool

	Client *rpc2.Client
}

A represents typed arguments of the RPC package. It's a typesafe alternative to am.A.

func ParseArgs added in v0.8.0

func ParseArgs(args am.A) *A

ParseArgs extracts A from am.Event.ArgsAPrefix.

type ARpc added in v0.9.0

type ARpc struct {
	Id        string `log:"id"`
	Name      string `log:"name"`
	MachTime  am.Time
	QueueTick uint64
	MachTick  uint32
	Payload   *MsgSrvPayload
	Addr      string `log:"addr"`
	Err       error
	Method    string `log:"addr"`
	StartedAt time.Time
	Dispose   bool
}

ARpc is a subset of A, that can be passed over RPC.

type Client

type Client struct {
	*ExceptionHandler

	Mach *am.Machine
	Name string

	// Addr is the address the Client will connect to.
	Addr string
	// NetMach is a remote am.Machine instance
	NetMach *NetworkMachine
	// Consumer is the optional consumer for deliveries.
	Consumer   *am.Machine
	CallCount  uint64
	LogEnabled bool
	// DisconnCooldown is the time to wait after notifying the server about
	// disconnecting before actually disconnecting. Default 10ms.
	DisconnCooldown time.Duration
	// LastMsgAt is the last received msg from the worker TODO
	LastMsgAt time.Time
	// HelloDelay between Connected and Handshaking. Default 0, useful for
	// rpc/Mux.
	HelloDelay time.Duration
	// ReconnectOn decides if the client will try to [RetryingConn] after a
	// clean [Disconnect].
	ReconnectOn bool

	// Skip schema synchronization / fetching.
	SyncNoSchema bool
	// Synchronize machine times for every mutation (within a single sync msg).
	SyncAllMutations bool
	// Only sync selected states.
	SyncAllowedStates am.S
	// Skip syncing of these states.
	SyncSkippedStates am.S
	// Only activete/deactivate (0-1) clock values will be sent.
	SyncShallowClocks     bool
	SyncMutationFiltering bool

	// ConnTimeout is the maximum time to wait for a connection to be established.
	// Default 3s.
	ConnTimeout time.Duration
	// ConnRetries is the number of retries for a connection. Default 15.
	ConnRetries int
	// ConnRetryTimeout is the maximum time to retry a connection. Default 1m.
	ConnRetryTimeout time.Duration
	// ConnRetryDelay is the time to wait between retries. Default 100ms. If
	// ConnRetryBackoff is set, this is the initial delay, and doubles on each
	// retry.
	ConnRetryDelay time.Duration
	// ConnRetryBackoff is the maximum time to wait between retries. Default 3s.
	ConnRetryBackoff time.Duration

	// CallTimeout is the maximum time to wait for a call to complete. Default 3s.
	CallTimeout time.Duration
	// CallRetries is the number of retries for a call. Default 15.
	CallRetries int
	// CallRetryTimeout is the maximum time to retry a call. Default 1m.
	CallRetryTimeout time.Duration
	// CallRetryDelay is the time to wait between retries. Default 100ms. If
	// CallRetryBackoff is set, this is the initial delay, and doubles on each
	// retry.
	CallRetryDelay time.Duration
	// CallRetryBackoff is the maximum time to wait between retries. Default 3s.
	CallRetryBackoff time.Duration
	DisconnTimeout   time.Duration
	// contains filtered or unexported fields
}

Client is a type representing an RPC client that interacts with a remote am.Machine instance.

func NewClient

func NewClient(
	ctx context.Context, netSrcAddr string, name string, netSrcSchema am.Schema,
	opts *ClientOpts,
) (*Client, error)

NewClient creates a new RPC client and exposes a remote state machine as a remote worker, with a subst of the API under Client.NetMach. Optionally takes a consumer, which is a state machine with a WorkerPayload state. See states.ConsumerStates.

func (*Client) Args added in v0.17.1

func (c *Client) Args() []string

Args returns a list of registered typed args for a given machine.

func (*Client) CallRetryFailedState added in v0.8.0

func (c *Client) CallRetryFailedState(e *am.Event)

func (*Client) ConnectedState

func (c *Client) ConnectedState(e *am.Event)

func (*Client) ConnectingState

func (c *Client) ConnectingState(e *am.Event)

func (*Client) DisconnectedEnter

func (c *Client) DisconnectedEnter(e *am.Event) bool

func (*Client) DisconnectedState

func (c *Client) DisconnectedState(e *am.Event)

func (*Client) DisconnectingEnter

func (c *Client) DisconnectingEnter(e *am.Event) bool

func (*Client) DisconnectingState

func (c *Client) DisconnectingState(e *am.Event)

func (*Client) ExceptionState added in v0.8.0

func (c *Client) ExceptionState(e *am.Event)

ExceptionState handles network errors and retries the connection.

func (*Client) GetKind

func (c *Client) GetKind() Kind

GetKind returns a kind of the RPC component (server / client).

func (*Client) HandshakeDoneEnter added in v0.8.0

func (c *Client) HandshakeDoneEnter(e *am.Event) bool

func (*Client) HandshakeDoneState

func (c *Client) HandshakeDoneState(e *am.Event)

func (*Client) HandshakingState

func (c *Client) HandshakingState(e *am.Event)

func (*Client) HealthcheckState added in v0.9.0

func (c *Client) HealthcheckState(e *am.Event)

func (*Client) IsPartial added in v0.17.0

func (c *Client) IsPartial() bool

IsPartial is true for NetMachs syncing only a subset of the Net Source's states.

func (*Client) RemoteBye added in v0.10.1

func (c *Client) RemoteBye(
	_ *rpc2.Client, _ *MsgEmpty, _ *MsgEmpty,
) error

RemoteBye is called by the server on a planned disconnect. TODO take a reason / source event?

func (*Client) RemoteSchemaChange added in v0.12.0

func (c *Client) RemoteSchemaChange(
	_ *rpc2.Client, msg *MsgSrvHello, _ *MsgEmpty,
) error

RemoteSchemaChange is called by the server on a source machine schema change.

func (*Client) RemoteSendPayload

func (c *Client) RemoteSendPayload(
	_ *rpc2.Client, payload *MsgSrvPayload, _ *MsgEmpty,
) error

RemoteSendPayload receives a payload from the server and triggers WorkerPayload. The Consumer should bind his handlers and handle this state to receive the data.

func (*Client) RemoteSendingPayload added in v0.8.0

func (c *Client) RemoteSendingPayload(
	_ *rpc2.Client, payload *MsgSrvPayload, _ *MsgEmpty,
) error

RemoteSendingPayload triggers the WorkerDelivering state, which is an optional indication that the server has started a data transmission to the Client. This payload shouldn't contain the data itself, only the name and token.

func (*Client) RemoteUpdate added in v0.17.0

func (c *Client) RemoteUpdate(
	_ *rpc2.Client, update *MsgSrvUpdate, _ *MsgEmpty,
) error

RemoteUpdate updates the clock of NetMach from a cumulative diff. Only called by the server.

func (*Client) RemoteUpdateMutations added in v0.17.0

func (c *Client) RemoteUpdateMutations(
	_ *rpc2.Client, updates *MsgSrvUpdateMuts, _ *MsgEmpty,
) error

RemoteUpdateMutations updates the clock of NetMach from a list of mutations. Only called by the server.

func (*Client) RetryingCallEnter added in v0.8.0

func (c *Client) RetryingCallEnter(e *am.Event) bool

func (*Client) RetryingConnState added in v0.8.0

func (c *Client) RetryingConnState(e *am.Event)

RetryingConnState should be set without Connecting in the same tx

func (*Client) Start

func (c *Client) Start() am.Result

Start connects the client to the server and initializes the worker. Results in the Ready state.

func (*Client) StartEnd

func (c *Client) StartEnd(e *am.Event)

func (*Client) StartState added in v0.8.0

func (c *Client) StartState(e *am.Event)

func (*Client) Stop

func (c *Client) Stop(waitTillExit context.Context, dispose bool) am.Result

Stop disconnects the client from the server and disposes the worker.

waitTillExit: if passed, waits for the client to disconnect using the context.

func (*Client) Sync added in v0.17.0

func (c *Client) Sync() am.Time

Sync requests non-diff clock values from the remote machine. Useful to call after a batch of no-sync methods, eg NetworkMachine.AddNS. Sync doesn't honor [ClientOpts.SyncMutations] and only returns clock values (so can be used to skip mutation syncing within a period).

func (*Client) WorkerPayloadEnter added in v0.8.0

func (c *Client) WorkerPayloadEnter(e *am.Event) bool

func (*Client) WorkerPayloadState added in v0.8.0

func (c *Client) WorkerPayloadState(e *am.Event)

type ClientMethod added in v0.15.1

type ClientMethod enum.Member[string]

type ClientOpts added in v0.8.0

type ClientOpts struct {
	// Consumer is an optional target for the [states.SendPayload] state.
	Consumer *am.Machine
	// Parent is a parent state machine for a new Client state machine. See
	// [am.Opts].
	Parent am.Api
	// Make this client schema-less (infer an empty one for tracked states).
	NoSchema bool
	// Only sync selected states.
	AllowedStates am.S
	// Skip syncing of these states.
	SkippedStates am.S
	// Sync machine time for every mutation. Disables
	// [ClientOpts.SyncShallowClocks].
	SyncMutations bool
	// Only activete/deactivate (0-1) clock values will be sent.
	SyncShallowClocks bool
	// Enable client-side mutation filtering by performing relations resolution
	// based on locally active states. Doesn't work with [ClientOpts.NoSchema].
	// TODO not implemented yet
	MutationFiltering bool
}

type ClockUpdateFunc added in v0.17.0

type ClockUpdateFunc func(now am.Time, qTick uint64, machTick uint32)

type ExceptionHandler

type ExceptionHandler struct {
	*am.ExceptionHandler
}

ExceptionHandler is a shared exception handler for RPC server and client.

func (*ExceptionHandler) ExceptionEnter

func (h *ExceptionHandler) ExceptionEnter(e *am.Event) bool

type Kind

type Kind string

Kind of the RCP component.

const (
	KindClient Kind = "client"
	KindServer Kind = "server"
)

type MsgCliHello added in v0.17.0

type MsgCliHello struct {
	// ID of the client saying Hello.
	Id string
	// Client wants to synchronize the schema.
	SyncSchema bool
	// Hash of the current schema, or "". Schema is always full and not affected
	// by [MsgCliHello.AllowedStates] or [MsgCliHello.SkippedStates].
	SchemaHash    string
	SyncMutations bool
	AllowedStates am.S
	SkippedStates am.S
	ShallowClocks bool
}

MsgCliHello is the client saying hello to the server.

type MsgCliMutation added in v0.17.0

type MsgCliMutation struct {
	States []int
	Args   am.A
	Event  *am.Event
}

MsgCliMutation is the client requesting a mutation from the server.

type MsgEmpty added in v0.17.0

type MsgEmpty struct{}

MsgEmpty is an empty message of either the server or client.

type MsgSrvArgs added in v0.17.1

type MsgSrvArgs struct {
	Args []string
}

type MsgSrvHello added in v0.17.0

type MsgSrvHello struct {
	Schema     am.Schema
	Serialized *am.Serialized
	// total source states count
	StatesCount uint32
}

MsgSrvHello is the server saying hello to the client.

type MsgSrvMutation added in v0.17.0

type MsgSrvMutation struct {
	Update    *MsgSrvUpdate
	Mutations *MsgSrvUpdateMuts
	Result    am.Result
}

MsgSrvMutation is the server replying to a mutation request for the client.

type MsgSrvPayload added in v0.17.0

type MsgSrvPayload struct {
	// Name is used to distinguish different payload types at the destination.
	Name string
	// Source is the machine ID that sent the payload.
	Source string
	// SourceTx is transition ID.
	SourceTx string
	// Destination is an optional machine ID that is supposed to receive the
	// payload. Useful when using rpc.Mux.
	Destination string
	// Data is the payload data. The Consumer has to know the type.
	Data any

	// Token is a unique random ID for the payload. Autofilled by the server.
	Token string
}

MsgSrvPayload is the server sending a payload to the client.

type MsgSrvSync added in v0.17.0

type MsgSrvSync struct {
	Time      am.Time
	QueueTick uint64
	MachTick  uint32
}

MsgSrvSync is the server replying to a full sync request from the client.

type MsgSrvUpdate added in v0.17.0

type MsgSrvUpdate struct {
	// Indexes of incremented states.
	Indexes []uint16
	// Clock diffs of incremented states.
	// TODO optimize: []uint16 and send 2 updates when needed
	Ticks []uint32
	// TODO optimize: for shallow clocks
	// Active []bool
	// QueueTick is an incremental diff for the queue tick.
	QueueTick uint16
	// MachTick is an incremental diff for the machine tick.
	MachTick uint8
	// Checksum is the last digit of (TimeSum + QueueTick + MachTick)
	Checksum uint8
}

MsgSrvUpdate is the server telling the client about a net source's update.

type MsgSrvUpdateMuts added in v0.17.0

type MsgSrvUpdateMuts struct {
	// TODO mind partially accepted auto states (fake called states).
	// Auto         bool
	MutationType []am.MutationType
	CalledStates [][]uint16
	Updates      []MsgSrvUpdate
}

MsgSrvUpdateMuts is like MsgSrvUpdate but contains several clock updates (one for each mutation), as well as extra mutation info.

type Mux added in v0.8.0

type Mux struct {
	*am.ExceptionHandler

	Mach *am.Machine
	// Source is the state source to expose via RPC. Required if NewServerFn
	// isnt provided.
	Source am.Api
	// NewServerFn creates a new instance of Server and is called for every new
	// connection.
	NewServerFn MuxNewServerFn
	// Typed arguments struct pointer
	Args       any
	ArgsPrefix string

	Name string
	Addr string
	// The listener used by this Mux, can be set manually before Start().
	Listener   net.Listener
	LogEnabled bool
	// The last error returned by NewServerFn.
	NewServerErr error
	// contains filtered or unexported fields
}

Mux creates a new RPC server for each incoming connection.

func NewMux added in v0.8.0

func NewMux(
	ctx context.Context, name string, newServerFn MuxNewServerFn, opts *MuxOpts,
) (*Mux, error)

NewMux initializes a Mux instance to handle RPC server creation for incoming connections with the given parameters.

newServerFn: when nil, [Mux.Source] needs to be set manually before calling Mux.Start.

func (*Mux) ClientConnectedState added in v0.8.0

func (m *Mux) ClientConnectedState(e *am.Event)

func (*Mux) ExceptionState added in v0.8.0

func (m *Mux) ExceptionState(e *am.Event)

func (*Mux) HasClientsEnd added in v0.8.0

func (m *Mux) HasClientsEnd(e *am.Event) bool

func (*Mux) HealthcheckState added in v0.8.0

func (m *Mux) HealthcheckState(e *am.Event)

func (*Mux) NewServerErrEnter added in v0.8.0

func (m *Mux) NewServerErrEnter(e *am.Event) bool

func (*Mux) NewServerErrState added in v0.8.0

func (m *Mux) NewServerErrState(e *am.Event)

func (*Mux) Start added in v0.8.0

func (m *Mux) Start() am.Result

func (*Mux) StartEnd added in v0.8.0

func (m *Mux) StartEnd(e *am.Event)

func (*Mux) StartEnter added in v0.10.2

func (m *Mux) StartEnter(e *am.Event) bool

func (*Mux) StartState added in v0.8.0

func (m *Mux) StartState(e *am.Event)

func (*Mux) Stop added in v0.8.0

func (m *Mux) Stop(dispose bool) am.Result

type MuxNewServerFn added in v0.10.2

type MuxNewServerFn func(num int, conn net.Conn) (*Server, error)

MuxNewServerFn is a function to create a new RPC server for each incoming connection.

type MuxOpts added in v0.8.0

type MuxOpts struct {
	// Parent is a parent state machine for a new Mux state machine. See
	// [am.Opts].
	Parent am.Api
	// Typed arguments struct pointer
	Args       any
	ArgsPrefix string
}

type NetMachConn added in v0.17.0

type NetMachConn interface {
	Call(ctx context.Context, method ServerMethod, args any, resp any) bool
	Notify(ctx context.Context, method ServerMethod, args any) bool
}

NetMachConn is a mutation interface for NetworkMachine instances. It's meant to be (optionally) injected by whatever creates network machines, so they can communicate with the server (or another source).

type NetMachInternal added in v0.17.0

type NetMachInternal struct {
	// contains filtered or unexported fields
}

NetMachInternal are internal methods of a NetworkMachine instance returned by the constructor.

func (*NetMachInternal) Lock added in v0.17.0

func (i *NetMachInternal) Lock()

func (*NetMachInternal) Unlock added in v0.17.0

func (i *NetMachInternal) Unlock()

func (*NetMachInternal) UpdateClock added in v0.17.0

func (i *NetMachInternal) UpdateClock(
	now am.Time, qTick uint64, machTick uint32,
)

type NetworkMachine added in v0.17.0

type NetworkMachine struct {
	// If true, the machine will print all exceptions to stdout. Default: true.
	// Requires an ExceptionHandler binding and Machine.PanicToException set.
	LogStackTrace bool
	// contains filtered or unexported fields
}

NetworkMachine is a subset of `pkg/machine#Machine` for RPC. Lacks the queue and other local methods. Most methods are clock-based, thus executed locally. NetworkMachine implements am.Api.

func (*NetworkMachine) ActiveStates added in v0.17.0

func (m *NetworkMachine) ActiveStates(states am.S) am.S

ActiveStates returns a copy of the currently active states.

func (*NetworkMachine) Add added in v0.17.0

func (m *NetworkMachine) Add(states am.S, args am.A) am.Result

Add is am.Api.Add.

func (*NetworkMachine) Add1 added in v0.17.0

func (m *NetworkMachine) Add1(state string, args am.A) am.Result

Add1 is am.Api.Add1.

func (*NetworkMachine) Add1NS added in v0.17.0

func (m *NetworkMachine) Add1NS(state string, args am.A) am.Result

Add1NS is a single state version of AddNS.

func (*NetworkMachine) AddBreakpoint added in v0.17.0

func (m *NetworkMachine) AddBreakpoint(
	added am.S, removed am.S, strict bool,
)

AddBreakpoint is am.Api.AddBreakpoint.

func (*NetworkMachine) AddBreakpoint1 added in v0.17.0

func (m *NetworkMachine) AddBreakpoint1(
	added string, removed string, strict bool,
)

AddBreakpoint1 is am.Api.AddBreakpoint1.

func (*NetworkMachine) AddErr added in v0.17.0

func (m *NetworkMachine) AddErr(err error, args am.A) am.Result

AddErr is am.Api.AddErr.

func (*NetworkMachine) AddErrState added in v0.17.0

func (m *NetworkMachine) AddErrState(
	state string, err error, args am.A,
) am.Result

AddErrState is am.Api.AddErrState.

func (*NetworkMachine) AddNS added in v0.17.0

func (m *NetworkMachine) AddNS(states am.S, args am.A) am.Result

AddNS is a NoSync method - an efficient way for adding states, as it doesn't wait for, nor transfers a response. Because of which it doesn't update the clock. Use Sync() to update the clock after a batch of AddNS calls.

func (*NetworkMachine) Any added in v0.17.0

func (m *NetworkMachine) Any(states ...am.S) bool

Any is am.Api.Any.

func (*NetworkMachine) Any1 added in v0.17.0

func (m *NetworkMachine) Any1(states ...string) bool

Any1 is am.Api.Any1.

func (*NetworkMachine) BindHandlers added in v0.17.0

func (m *NetworkMachine) BindHandlers(handlers any) error

BindHandlers is am.Api.BindHandlers.

NetworkMachine supports only pipe handlers (final ones, without negotiation).

func (*NetworkMachine) BindTracer added in v0.17.0

func (m *NetworkMachine) BindTracer(tracer am.Tracer) error

BindTracer is am.Machine.BindTracer.

NetworkMachine tracers cannot mutate synchronously, as network machines don't have a queue and WILL deadlock when nested.

func (*NetworkMachine) CanAdd added in v0.17.0

func (m *NetworkMachine) CanAdd(states am.S, args am.A) am.Result

CanAdd is am.Api.CanAdd.

func (*NetworkMachine) CanAdd1 added in v0.17.0

func (m *NetworkMachine) CanAdd1(state string, args am.A) am.Result

CanAdd1 is am.Api.CanAdd1.

func (*NetworkMachine) CanRemove added in v0.17.0

func (m *NetworkMachine) CanRemove(states am.S, args am.A) am.Result

CanRemove is am.Api.CanRemove.

func (*NetworkMachine) CanRemove1 added in v0.17.0

func (m *NetworkMachine) CanRemove1(state string, args am.A) am.Result

CanRemove1 is am.Api.CanRemove1.

func (*NetworkMachine) Clock added in v0.17.0

func (m *NetworkMachine) Clock(states am.S) am.Clock

Clock returns current machine's clock, a state-keyed map of ticks. If states are passed, only the ticks of the passed states are returned.

func (*NetworkMachine) Ctx added in v0.17.0

func (m *NetworkMachine) Ctx() context.Context

Ctx return worker's root context.

func (*NetworkMachine) DetachHandlers added in v0.17.0

func (m *NetworkMachine) DetachHandlers(handlers any) error

DetachHandlers is am.Api.DetachHandlers.

func (*NetworkMachine) DetachTracer added in v0.17.0

func (m *NetworkMachine) DetachTracer(tracer am.Tracer) error

DetachTracer is am.Api.DetachTracer.

func (*NetworkMachine) Dispose added in v0.17.0

func (m *NetworkMachine) Dispose()

Dispose disposes the machine and all its emitters. You can wait for the completion of the disposal with `<-mach.WhenDisposed`.

func (*NetworkMachine) Err added in v0.17.0

func (m *NetworkMachine) Err() error

Err returns the last error.

func (*NetworkMachine) EvAdd added in v0.17.0

func (m *NetworkMachine) EvAdd(
	event *am.Event, states am.S, args am.A,
) am.Result

EvAdd is am.Api.EvAdd.

func (*NetworkMachine) EvAdd1 added in v0.17.0

func (m *NetworkMachine) EvAdd1(
	event *am.Event, state string, args am.A,
) am.Result

EvAdd1 is am.Api.EvAdd1.

func (*NetworkMachine) EvAddErr added in v0.17.0

func (m *NetworkMachine) EvAddErr(
	event *am.Event, err error, args am.A,
) am.Result

EvAddErr is am.Api.EvAddErr.

func (*NetworkMachine) EvAddErrState added in v0.17.0

func (m *NetworkMachine) EvAddErrState(
	event *am.Event, state string, err error, args am.A,
) am.Result

EvAddErrState is am.Api.EvAddErrState.

func (*NetworkMachine) EvRemove added in v0.17.0

func (m *NetworkMachine) EvRemove(
	event *am.Event, states am.S, args am.A,
) am.Result

EvRemove is am.Api.EvRemove.

func (*NetworkMachine) EvRemove1 added in v0.17.0

func (m *NetworkMachine) EvRemove1(
	event *am.Event, state string, args am.A,
) am.Result

EvRemove1 is am.Api.EvRemove1.

func (*NetworkMachine) EvToggle added in v0.17.0

func (m *NetworkMachine) EvToggle(
	e *am.Event, states am.S, args am.A,
) am.Result

EvToggle is am.Api.EvToggle.

func (*NetworkMachine) EvToggle1 added in v0.17.0

func (m *NetworkMachine) EvToggle1(
	e *am.Event, state string, args am.A,
) am.Result

EvToggle1 is am.Api.EvToggle1.

func (*NetworkMachine) Export added in v0.17.0

func (m *NetworkMachine) Export() (*am.Serialized, am.Schema, error)

Export exports the machine state: id, time and state names.

func (*NetworkMachine) Groups added in v0.17.0

func (m *NetworkMachine) Groups() (map[string][]int, []string)

Groups is am.Api.Groups.

func (*NetworkMachine) Has added in v0.17.0

func (m *NetworkMachine) Has(states am.S) bool

Has is am.Api.Has.

func (*NetworkMachine) Has1 added in v0.17.0

func (m *NetworkMachine) Has1(state string) bool

Has1 is am.Api.Has1.

func (*NetworkMachine) HasHandlers added in v0.17.0

func (m *NetworkMachine) HasHandlers() bool

HasHandlers is am.Api.HasHandlers.

func (*NetworkMachine) Id added in v0.17.0

func (m *NetworkMachine) Id() string

Id returns the machine's id.

func (*NetworkMachine) Index added in v0.17.0

func (m *NetworkMachine) Index(states am.S) []int

func (*NetworkMachine) Index1 added in v0.17.0

func (m *NetworkMachine) Index1(state string) int

Index1 returns the index of a state in the machine's StateNames() list.

func (*NetworkMachine) Inspect added in v0.17.0

func (m *NetworkMachine) Inspect(states am.S) string

Inspect returns a multi-line string representation of the machine (states, relations, clock). states: param for ordered or partial results.

func (*NetworkMachine) Is added in v0.17.0

func (m *NetworkMachine) Is(states am.S) bool

Is is am.Api.Is.

func (*NetworkMachine) Is1 added in v0.17.0

func (m *NetworkMachine) Is1(state string) bool

Is1 is am.Api.Is1.

func (*NetworkMachine) IsClock added in v0.17.0

func (m *NetworkMachine) IsClock(clock am.Clock) bool

IsClock is am.Api.IsClock.

func (*NetworkMachine) IsDisposed added in v0.17.0

func (m *NetworkMachine) IsDisposed() bool

IsDisposed returns true if the machine has been disposed.

func (*NetworkMachine) IsErr added in v0.17.0

func (m *NetworkMachine) IsErr() bool

IsErr is am.Api.IsErr.

func (*NetworkMachine) IsTime added in v0.17.0

func (m *NetworkMachine) IsTime(t am.Time, states am.S) bool

IsTime is am.Api.IsTime.

func (*NetworkMachine) Log added in v0.17.0

func (m *NetworkMachine) Log(msg string, args ...any)

Log logs is a local logger.

func (*NetworkMachine) MachineTick added in v0.17.0

func (m *NetworkMachine) MachineTick() uint32

MachineTick is am.Api.MachineTick.

func (*NetworkMachine) MustParseStates added in v0.17.0

func (m *NetworkMachine) MustParseStates(states am.S) am.S

MustParseStates parses the states and returns them as a list. Panics when a state is not defined. It's an usafe equivalent of VerifyStates.

func (*NetworkMachine) NewStateCtx added in v0.17.0

func (m *NetworkMachine) NewStateCtx(state string) context.Context

NewStateCtx returns a new sub-context, bound to the current clock's tick of the passed state.

Context cancels when the state has been de-activated, or right away, if it isn't currently active.

State contexts are used to check state expirations and should be checked often inside goroutines. TODO log reader

func (*NetworkMachine) Not added in v0.17.0

func (m *NetworkMachine) Not(states am.S) bool

Not is am.Api.Not.

func (*NetworkMachine) Not1 added in v0.17.0

func (m *NetworkMachine) Not1(state string) bool

Not1 is am.Api.No1.

func (*NetworkMachine) OnDispose added in v0.17.0

func (m *NetworkMachine) OnDispose(fn am.HandlerDispose)

OnDispose is am.Api.OnDispose.

func (*NetworkMachine) ParentId added in v0.17.0

func (m *NetworkMachine) ParentId() string

ParentId returns the id of the parent machine (if any).

func (*NetworkMachine) ParseStates added in v0.17.0

func (m *NetworkMachine) ParseStates(states am.S) am.S

ParseStates is am.Api.ParseStates.

func (*NetworkMachine) QueueLen added in v0.17.0

func (m *NetworkMachine) QueueLen() uint16

QueueLen is am.Api.QueueLen.

func (*NetworkMachine) QueueTick added in v0.17.0

func (m *NetworkMachine) QueueTick() uint64

QueueTick is am.Api.QueueTick.

func (*NetworkMachine) RemoteId added in v0.17.0

func (m *NetworkMachine) RemoteId() string

RemoteId returns the ID of the remote state machine.

func (*NetworkMachine) Remove added in v0.17.0

func (m *NetworkMachine) Remove(states am.S, args am.A) am.Result

Remove is am.Api.Remove.

func (*NetworkMachine) Remove1 added in v0.17.0

func (m *NetworkMachine) Remove1(state string, args am.A) am.Result

Remove1 is am.Api.Remove1.

func (*NetworkMachine) Schema added in v0.17.0

func (m *NetworkMachine) Schema() am.Schema

Schema returns a copy of machine's state structure.

func (*NetworkMachine) SemLogger added in v0.17.0

func (m *NetworkMachine) SemLogger() am.SemLogger

func (*NetworkMachine) Set added in v0.17.0

func (m *NetworkMachine) Set(states am.S, args am.A) am.Result

Set is am.Api.Set.

func (*NetworkMachine) StateNames added in v0.17.0

func (m *NetworkMachine) StateNames() am.S

StateNames returns a copy of all the state names.

func (*NetworkMachine) StateNamesMatch added in v0.17.0

func (m *NetworkMachine) StateNamesMatch(re *regexp.Regexp) am.S

func (*NetworkMachine) StatesVerified added in v0.17.0

func (m *NetworkMachine) StatesVerified() bool

StatesVerified returns true if the state names have been ordered using VerifyStates.

func (*NetworkMachine) String added in v0.17.0

func (m *NetworkMachine) String() string

String returns a one line representation of the currently active states, with their clock values. Inactive states are omitted. Eg: (Foo:1 Bar:3)

func (*NetworkMachine) StringAll added in v0.17.0

func (m *NetworkMachine) StringAll() string

StringAll returns a one line representation of all the states, with their clock values. Inactive states are in square brackets. Eg: (Foo:1 Bar:3)[Baz:2]

func (*NetworkMachine) Switch added in v0.17.0

func (m *NetworkMachine) Switch(groups ...am.S) string

Switch is am.Api.Switch.

func (*NetworkMachine) Tags added in v0.17.0

func (m *NetworkMachine) Tags() []string

Tags returns machine's tags, a list of unstructured strings without spaces.

func (*NetworkMachine) Tick added in v0.17.0

func (m *NetworkMachine) Tick(state string) uint64

Tick returns the current tick for a given state.

func (*NetworkMachine) Time added in v0.17.0

func (m *NetworkMachine) Time(states am.S) am.Time

Time returns machine's time, a list of ticks per state. Returned value includes the specified states, or all the states if nil.

func (*NetworkMachine) Toggle added in v0.17.0

func (m *NetworkMachine) Toggle(states am.S, args am.A) am.Result

Toggle is am.Api.Toggle.

func (*NetworkMachine) Toggle1 added in v0.17.0

func (m *NetworkMachine) Toggle1(state string, args am.A) am.Result

Toggle1 is am.Api.Toggle1.

func (*NetworkMachine) Tracers added in v0.17.0

func (m *NetworkMachine) Tracers() []am.Tracer

Tracers is am.Api.Tracers.

func (*NetworkMachine) Transition added in v0.17.0

func (m *NetworkMachine) Transition() *am.Transition

Transition is am.Machine.Transition.

func (*NetworkMachine) WasClock added in v0.17.0

func (m *NetworkMachine) WasClock(clock am.Clock) bool

WasClock is am.Api.WasClock.

func (*NetworkMachine) WasTime added in v0.17.0

func (m *NetworkMachine) WasTime(t am.Time, states am.S) bool

WasTime is am.Api.WasTime.

func (*NetworkMachine) When added in v0.17.0

func (m *NetworkMachine) When(
	states am.S, ctx context.Context,
) <-chan struct{}

When is am.Api.When.

func (*NetworkMachine) When1 added in v0.17.0

func (m *NetworkMachine) When1(
	state string, ctx context.Context,
) <-chan struct{}

When1 is an alias to When() for a single state. See When.

func (*NetworkMachine) WhenArgs added in v0.17.0

func (m *NetworkMachine) WhenArgs(
	state string, args am.A, ctx context.Context,
) <-chan struct{}

WhenArgs returns a channel that will be closed when the passed state becomes active with all the passed args. Args are compared using the native '=='. It's meant to be used with async Multi states, to filter out a specific completion.

ctx: optional context that will close the channel when done.

func (*NetworkMachine) WhenDisposed added in v0.17.0

func (m *NetworkMachine) WhenDisposed() <-chan struct{}

WhenDisposed returns a channel that will be closed when the machine is disposed. Requires bound handlers. Use Machine.Disposed in case no handlers have been bound.

func (*NetworkMachine) WhenErr added in v0.17.0

func (m *NetworkMachine) WhenErr(disposeCtx context.Context) <-chan struct{}

WhenErr is am.Api.WhenErr.

func (*NetworkMachine) WhenNot added in v0.17.0

func (m *NetworkMachine) WhenNot(
	states am.S, ctx context.Context,
) <-chan struct{}

WhenNot returns a channel that will be closed when all the passed states become inactive or the machine gets disposed.

ctx: optional context that will close the channel early.

func (*NetworkMachine) WhenNot1 added in v0.17.0

func (m *NetworkMachine) WhenNot1(
	state string, ctx context.Context,
) <-chan struct{}

WhenNot1 is an alias to WhenNot() for a single state. See WhenNot.

func (*NetworkMachine) WhenQuery added in v0.17.0

func (m *NetworkMachine) WhenQuery(
	clockCheck func(clock am.Clock) bool, ctx context.Context,
) <-chan struct{}

WhenQuery returns a channel that will be closed when the passed [clockCheck] function returns true. [clockCheck] should be a pure function and non-blocking.`

ctx: optional context that will close the channel early.

func (*NetworkMachine) WhenQueue added in v0.17.0

func (m *NetworkMachine) WhenQueue(tick am.Result) <-chan struct{}

func (*NetworkMachine) WhenTicks added in v0.17.0

func (m *NetworkMachine) WhenTicks(
	state string, ticks int, ctx context.Context,
) <-chan struct{}

WhenTicks waits N ticks of a single state (relative to now). Uses WhenTime underneath.

ctx: optional context that will close the channel early.moon

func (*NetworkMachine) WhenTime added in v0.17.0

func (m *NetworkMachine) WhenTime(
	states am.S, times am.Time, ctx context.Context,
) <-chan struct{}

WhenTime returns a channel that will be closed when all the passed states have passed the specified time. The time is a logical clock of the state. Machine time can be sourced from [Machine.Time](), or [Machine.Clock]().

ctx: optional context that will close the channel early.

func (*NetworkMachine) WhenTime1 added in v0.17.0

func (m *NetworkMachine) WhenTime1(
	state string, ticks uint64, ctx context.Context,
) <-chan struct{}

WhenTime1 waits till ticks for a single state equal the given value (or more).

ctx: optional context that will close the channel early.

type ReplOpts added in v0.17.1

type ReplOpts struct {
	// optional dir path to save the address file as addrDir/mach-id.addr
	AddrDir string
	// optional channel to send err to, once ready
	ErrCh chan<- error
	// optional channel to send the address to, once ready
	AddrCh chan<- string
	// optional prefix for typesafe args. Requires Args.
	ArgsPrefix string
	// optional typed args instance. Requires ArgsPrefix
	Args any
}

type SendPayloadHandlers added in v0.8.0

type SendPayloadHandlers struct {
	SendPayloadState am.HandlerFinal
}

type Server

type Server struct {
	*ExceptionHandler
	Mach *am.Machine

	// Source is a state Source, either a local or remote RPC worker.
	Source am.Api
	// Addr is the address of the server on the network.
	Addr string
	// DeliveryTimeout is a timeout for SendPayload to the client.
	DeliveryTimeout time.Duration
	// Listener can be set manually before starting the server.
	Listener atomic.Pointer[net.Listener]
	// Conn can be set manually before starting the server.
	Conn net.Conn
	// NoNewListener will prevent the server from creating a new listener if
	// one is not provided or has been closed. Useful for cmux.
	NoNewListener bool
	LogEnabled    bool
	CallCount     uint64
	// Typed arguments struct value with defaults
	Args any
	// Typed arguments prefix in a resulting [am.A] map.
	ArgsPrefix string

	// PushInterval is the interval for clock updates, effectively throttling
	// the number of updates sent to the client within the interval window.
	// 0 means pushes are disabled. Setting to a very small value will make
	// pushes instant.
	PushInterval atomic.Pointer[time.Duration]

	// AllowId will limit clients to a specific ID, if set.
	AllowId string
	// contains filtered or unexported fields
}

Server is an RPC server that can be bound to a worker machine and provide remote access to its states and methods.

func NewServer

func NewServer(
	ctx context.Context, addr string, name string, netSrcMach am.Api,
	opts *ServerOpts,
) (*Server, error)

NewServer creates a new RPC server, bound to a worker machine. The source machine has to implement states.NetSourceStatesDef interface.

func (*Server) ClientId added in v0.9.0

func (s *Server) ClientId() string

func (*Server) GetKind

func (s *Server) GetKind() Kind

GetKind returns a kind of RPC component (server / client).

func (*Server) HandshakeDoneEnd

func (s *Server) HandshakeDoneEnd(e *am.Event)

func (*Server) RemoteAdd

func (s *Server) RemoteAdd(
	_ *rpc2.Client, req *MsgCliMutation, resp *MsgSrvMutation,
) error

func (*Server) RemoteAddNS

func (s *Server) RemoteAddNS(
	_ *rpc2.Client, req *MsgCliMutation, _ *MsgEmpty,
) error

func (*Server) RemoteArgs added in v0.17.1

func (s *Server) RemoteArgs(
	_ *rpc2.Client, _ *MsgEmpty, resp *MsgSrvArgs,
) error

func (*Server) RemoteBye

func (s *Server) RemoteBye(
	_ *rpc2.Client, _ *MsgEmpty, _ *MsgEmpty,
) error

RemoteBye means the client says goodbye and will disconnect shortly.

func (*Server) RemoteHandshake

func (s *Server) RemoteHandshake(
	client *rpc2.Client, _ *MsgEmpty, _ *MsgEmpty,
) error

func (*Server) RemoteHello added in v0.8.0

func (s *Server) RemoteHello(
	client *rpc2.Client, req *MsgCliHello, resp *MsgSrvHello,
) error

func (*Server) RemoteRemove

func (s *Server) RemoteRemove(
	_ *rpc2.Client, req *MsgCliMutation, resp *MsgSrvMutation,
) error

func (*Server) RemoteSet

func (s *Server) RemoteSet(
	_ *rpc2.Client, req *MsgCliMutation, resp *MsgSrvMutation,
) error

func (*Server) RemoteSync

func (s *Server) RemoteSync(
	_ *rpc2.Client, _ *MsgEmpty, resp *MsgSrvSync,
) error

func (*Server) RpcReadyEnter added in v0.8.0

func (s *Server) RpcReadyEnter(e *am.Event) bool

func (*Server) RpcReadyState

func (s *Server) RpcReadyState(e *am.Event)

RpcReadyState starts a ticker to compensate for clock push debounces.

func (*Server) RpcStartingEnter added in v0.8.0

func (s *Server) RpcStartingEnter(e *am.Event) bool

func (*Server) RpcStartingState

func (s *Server) RpcStartingState(e *am.Event)

func (*Server) SendPayload

func (s *Server) SendPayload(
	ctx context.Context, event *am.Event, payload *MsgSrvPayload,
) error

SendPayload sends a payload to the client. It's usually called by a handler for SendPayload. [event] is optional.

func (*Server) Start

func (s *Server) Start() am.Result

Start starts the server, optionally creating a Listener (if Addr provided). Results in either RpcReady or Exception.

func (*Server) StartEnd added in v0.8.0

func (s *Server) StartEnd(e *am.Event)

func (*Server) Stop

func (s *Server) Stop(dispose bool) am.Result

Stop stops the server, and optionally disposes resources.

type ServerMethod added in v0.15.1

type ServerMethod enum.Member[string]

type ServerOpts added in v0.8.0

type ServerOpts struct {
	// PayloadState is a state for the server to listen on, to deliver payloads
	// to the client. The client activates this state to request a payload from
	// the worker. Default: am/rpc/states/WorkerStates.SendPayload.
	PayloadState string
	// Parent is a parent state machine for a new Server state machine. See
	// [am.Opts].
	Parent am.Api
	// Typed arguments struct pointer
	Args       any
	ArgsPrefix string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL