Documentation
¶
Overview ¶
Package rpc is a transparent RPC for state machines.
Index ¶
- Constants
- Variables
- func AddErr(e *am.Event, mach *am.Machine, msg string, err error)
- func AddErrNetwork(e *am.Event, mach *am.Machine, err error)
- func AddErrNoConn(e *am.Event, mach *am.Machine, err error)
- func AddErrParams(e *am.Event, mach *am.Machine, err error)
- func AddErrResp(e *am.Event, mach *am.Machine, err error)
- func AddErrRpcStr(e *am.Event, mach *am.Machine, msg string)
- func BindServer(source, target *am.Machine, rpcReady, clientConn string) error
- func BindServerMulti(source, target *am.Machine, rpcReady, clientConn, clientDisconn string) error
- func BindServerRpcReady(source, target *am.Machine, rpcReady string) error
- func Checksum(mTime uint64, qTick uint64) uint8
- func ClockFromMsg(timeBefore am.Time, qTickBefore uint64, msg *ClockMsg) (am.Time, uint64)
- func GetClientId(name string) string
- func LogArgs(args am.A) map[string]string
- func MachRepl(mach am.Api, addr, addrDir string, addrCh chan<- string, errCh chan<- error)
- func MachReplEnv(mach am.Api) <-chan error
- func Pass(args *A) am.A
- func PassRpc(args *A) am.A
- func TrafficMeter(listener net.Listener, fwdTo string, counter chan<- int64, end <-chan struct{})
- type A
- type ARpc
- type ArgsGet
- type ArgsHello
- type ArgsLog
- type ArgsMut
- type ArgsPayload
- type Client
- func (c *Client) CallRetryFailedState(e *am.Event)
- func (c *Client) ConnectedState(e *am.Event)
- func (c *Client) ConnectingState(e *am.Event)
- func (c *Client) DisconnectedEnter(e *am.Event) bool
- func (c *Client) DisconnectedState(e *am.Event)
- func (c *Client) DisconnectingEnter(e *am.Event) bool
- func (c *Client) DisconnectingState(e *am.Event)
- func (c *Client) ExceptionState(e *am.Event)
- func (c *Client) GetKind() Kind
- func (c *Client) HandshakeDoneEnter(e *am.Event) bool
- func (c *Client) HandshakeDoneState(e *am.Event)
- func (c *Client) HandshakingState(e *am.Event)
- func (c *Client) HealthcheckState(e *am.Event)
- func (c *Client) RemoteBye(_ *rpc2.Client, _ *Empty, _ *Empty) error
- func (c *Client) RemotePushAllTicks(_ *rpc2.Client, clocks []PushAllTicks, _ *Empty) error
- func (c *Client) RemoteSchemaChange(_ *rpc2.Client, msg *RespHandshake, _ *Empty) error
- func (c *Client) RemoteSendPayload(_ *rpc2.Client, payload *ArgsPayload, _ *Empty) error
- func (c *Client) RemoteSendingPayload(_ *rpc2.Client, payload *ArgsPayload, _ *Empty) error
- func (c *Client) RemoteSetClock(_ *rpc2.Client, clock *ClockMsg, _ *Empty) error
- func (c *Client) RetryingCallEnter(e *am.Event) bool
- func (c *Client) RetryingConnState(e *am.Event)
- func (c *Client) Start() am.Result
- func (c *Client) StartEnd(e *am.Event)
- func (c *Client) StartState(e *am.Event)
- func (c *Client) Stop(waitTillExit context.Context, dispose bool) am.Result
- func (c *Client) WorkerPayloadEnter(e *am.Event) bool
- func (c *Client) WorkerPayloadState(e *am.Event)
- type ClientMethod
- type ClientOpts
- type ClockMsg
- type Empty
- type ExceptionHandler
- type Kind
- type Mux
- func (m *Mux) ClientConnectedState(e *am.Event)
- func (m *Mux) ExceptionState(e *am.Event)
- func (m *Mux) HasClientsEnd(e *am.Event) bool
- func (m *Mux) HealthcheckState(e *am.Event)
- func (m *Mux) NewServerErrEnter(e *am.Event) bool
- func (m *Mux) NewServerErrState(e *am.Event)
- func (m *Mux) Start() am.Result
- func (m *Mux) StartEnd(e *am.Event)
- func (m *Mux) StartEnter(e *am.Event) bool
- func (m *Mux) StartState(e *am.Event)
- func (m *Mux) Stop(dispose bool) am.Result
- type MuxNewServerFn
- type MuxOpts
- type PushAllTicks
- type RespGet
- type RespHandshake
- type RespResult
- type RespSync
- type SendPayloadHandlers
- type Server
- func (s *Server) ClientId() string
- func (s *Server) GetKind() Kind
- func (s *Server) HandshakeDoneEnd(e *am.Event)
- func (s *Server) RemoteAdd(_ *rpc2.Client, args *ArgsMut, resp *RespResult) error
- func (s *Server) RemoteAddNS(_ *rpc2.Client, args *ArgsMut, _ *Empty) error
- func (s *Server) RemoteBye(_ *rpc2.Client, _ *Empty, _ *Empty) error
- func (s *Server) RemoteHandshake(client *rpc2.Client, id *string, _ *Empty) error
- func (s *Server) RemoteHello(client *rpc2.Client, req *ArgsHello, resp *RespHandshake) error
- func (s *Server) RemoteRemove(_ *rpc2.Client, args *ArgsMut, resp *RespResult) error
- func (s *Server) RemoteSet(_ *rpc2.Client, args *ArgsMut, resp *RespResult) error
- func (s *Server) RemoteSetPushAllTicks(_ *rpc2.Client, val bool, _ *Empty) error
- func (s *Server) RemoteSync(_ *rpc2.Client, sum uint64, resp *RespSync) error
- func (s *Server) RpcReadyEnter(e *am.Event) bool
- func (s *Server) RpcReadyState(e *am.Event)
- func (s *Server) RpcStartingEnter(e *am.Event) bool
- func (s *Server) RpcStartingState(e *am.Event)
- func (s *Server) SendPayload(ctx context.Context, event *am.Event, payload *ArgsPayload) error
- func (s *Server) Start() am.Result
- func (s *Server) StartEnd(e *am.Event)
- func (s *Server) Stop(dispose bool) am.Result
- type ServerMethod
- type ServerOpts
- type Worker
- func (w *Worker) ActiveStates(states am.S) am.S
- func (w *Worker) Add(states am.S, args am.A) am.Result
- func (w *Worker) Add1(state string, args am.A) am.Result
- func (w *Worker) Add1NS(state string, args am.A) am.Result
- func (w *Worker) AddBreakpoint(added am.S, removed am.S, strict bool)
- func (w *Worker) AddBreakpoint1(added string, removed string, strict bool)
- func (w *Worker) AddErr(err error, args am.A) am.Result
- func (w *Worker) AddErrState(state string, err error, args am.A) am.Result
- func (w *Worker) AddNS(states am.S, args am.A) am.Result
- func (w *Worker) Any(states ...am.S) bool
- func (w *Worker) Any1(states ...string) bool
- func (w *Worker) BindHandlers(handlers any) error
- func (w *Worker) BindTracer(tracer am.Tracer) error
- func (w *Worker) CanAdd(states am.S, args am.A) am.Result
- func (w *Worker) CanAdd1(state string, args am.A) am.Result
- func (w *Worker) CanRemove(states am.S, args am.A) am.Result
- func (w *Worker) CanRemove1(state string, args am.A) am.Result
- func (w *Worker) Clock(states am.S) am.Clock
- func (w *Worker) CountActive(states am.S) int
- func (w *Worker) Ctx() context.Context
- func (w *Worker) DetachHandlers(handlers any) error
- func (w *Worker) DetachTracer(tracer am.Tracer) error
- func (w *Worker) Dispose()
- func (w *Worker) Err() error
- func (w *Worker) EvAdd(event *am.Event, states am.S, args am.A) am.Result
- func (w *Worker) EvAdd1(event *am.Event, state string, args am.A) am.Result
- func (w *Worker) EvAddErr(event *am.Event, err error, args am.A) am.Result
- func (w *Worker) EvAddErrState(event *am.Event, state string, err error, args am.A) am.Result
- func (w *Worker) EvRemove(event *am.Event, states am.S, args am.A) am.Result
- func (w *Worker) EvRemove1(event *am.Event, state string, args am.A) am.Result
- func (w *Worker) EvToggle(e *am.Event, states am.S, args am.A) am.Result
- func (w *Worker) EvToggle1(e *am.Event, state string, args am.A) am.Result
- func (w *Worker) Export() *am.Serialized
- func (w *Worker) Groups() (map[string][]int, []string)
- func (w *Worker) Has(states am.S) bool
- func (w *Worker) Has1(state string) bool
- func (w *Worker) HasHandlers() bool
- func (w *Worker) Id() string
- func (w *Worker) Index(states am.S) []int
- func (w *Worker) Index1(state string) int
- func (w *Worker) Inspect(states am.S) string
- func (w *Worker) InternalUpdateClock(now am.Time, qTick uint64, lock bool)
- func (w *Worker) Is(states am.S) bool
- func (w *Worker) Is1(state string) bool
- func (w *Worker) IsClock(clock am.Clock) bool
- func (w *Worker) IsDisposed() bool
- func (w *Worker) IsErr() bool
- func (w *Worker) IsTime(t am.Time, states am.S) bool
- func (w *Worker) Log(msg string, args ...any)
- func (w *Worker) MachineTick() uint32
- func (w *Worker) MustParseStates(states am.S) am.S
- func (w *Worker) NewStateCtx(state string) context.Context
- func (w *Worker) Not(states am.S) bool
- func (w *Worker) Not1(state string) bool
- func (w *Worker) OnDispose(fn am.HandlerDispose)
- func (w *Worker) ParentId() string
- func (w *Worker) ParseStates(states am.S) am.S
- func (w *Worker) QueueLen() uint16
- func (w *Worker) QueueTick() uint64
- func (w *Worker) RemoteId() string
- func (w *Worker) Remove(states am.S, args am.A) am.Result
- func (w *Worker) Remove1(state string, args am.A) am.Result
- func (w *Worker) Schema() am.Schema
- func (w *Worker) SemLogger() am.SemLogger
- func (w *Worker) Set(states am.S, args am.A) am.Result
- func (w *Worker) StateNames() am.S
- func (w *Worker) StateNamesMatch(re *regexp.Regexp) am.S
- func (w *Worker) StatesVerified() bool
- func (w *Worker) String() string
- func (w *Worker) StringAll() string
- func (w *Worker) Switch(groups ...am.S) string
- func (w *Worker) Sync() am.Time
- func (w *Worker) Tags() []string
- func (w *Worker) Tick(state string) uint64
- func (w *Worker) Time(states am.S) am.Time
- func (w *Worker) Toggle(states am.S, args am.A) am.Result
- func (w *Worker) Toggle1(state string, args am.A) am.Result
- func (w *Worker) Tracers() []am.Tracer
- func (w *Worker) WasClock(clock am.Clock) bool
- func (w *Worker) WasTime(t am.Time, states am.S) bool
- func (w *Worker) When(states am.S, ctx context.Context) <-chan struct{}
- func (w *Worker) When1(state string, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenArgs(state string, args am.A, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenDisposed() <-chan struct{}
- func (w *Worker) WhenErr(disposeCtx context.Context) <-chan struct{}
- func (w *Worker) WhenNot(states am.S, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenNot1(state string, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenQuery(clockCheck func(clock am.Clock) bool, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenQueue(tick am.Result) <-chan struct{}
- func (w *Worker) WhenTicks(state string, ticks int, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenTime(states am.S, times am.Time, ctx context.Context) <-chan struct{}
- func (w *Worker) WhenTime1(state string, ticks uint64, ctx context.Context) <-chan struct{}
- type WorkerTracer
Constants ¶
const ( // EnvAmRpcLogServer enables machine logging for RPC server. EnvAmRpcLogServer = "AM_RPC_LOG_SERVER" // EnvAmRpcLogClient enables machine logging for RPC client. EnvAmRpcLogClient = "AM_RPC_LOG_CLIENT" // EnvAmRpcLogMux enables machine logging for RPC multiplexers. EnvAmRpcLogMux = "AM_RPC_LOG_MUX" // EnvAmRpcDbg enables env-based debugging for RPC components. EnvAmRpcDbg = "AM_RPC_DBG" // EnvAmReplAddr is a REPL address to listen on. "1" expands to 127.0.0.1:0. EnvAmReplAddr = "AM_REPL_ADDR" // EnvAmReplDir is a dir path to save the address file as // $AM_REPL_DIR/mach-id.addr. Optional. EnvAmReplDir = "AM_REPL_DIR" )
const APrefix = "am_rpc"
Variables ¶
var ( ServerAdd = ServerMethod{"Add"} ServerAddNS = ServerMethod{"AddNS"} ServerRemove = ServerMethod{"Remove"} ServerSet = ServerMethod{"Set"} ServerHello = ServerMethod{"Hello"} ServerHandshake = ServerMethod{"Handshake"} ServerLog = ServerMethod{"Log"} ServerSync = ServerMethod{"Sync"} ServerBye = ServerMethod{"Close"} ServerMethods = enum.New(ServerAdd, ServerAddNS, ServerRemove, ServerSet, ServerHello, ServerHandshake, ServerLog, ServerSync, ServerBye) ClientSetClock = ClientMethod{"ClientSetClock"} ClientPushAllTicks = ClientMethod{"ClientPushAllTicks"} ClientSendPayload = ClientMethod{"ClientSendPayload"} ClientBye = ClientMethod{"ClientBye"} ClientSchemaChange = ClientMethod{"SchemaChange"} ClientMethods = enum.New(ClientSetClock, ClientPushAllTicks, ClientSendPayload, ClientBye, ClientSchemaChange) )
var ( ErrInvalidParams = errors.New("invalid params") ErrInvalidResp = errors.New("invalid response") ErrRpc = errors.New("rpc") ErrNoAccess = errors.New("no access") ErrNoConn = errors.New("not connected") ErrDestination = errors.New("wrong destination") ErrNetwork = errors.New("network error") ErrNetworkTimeout = errors.New("network timeout") )
Functions ¶
func AddErr ¶ added in v0.8.0
AddErr detects sentinels from error msgs and calls the proper error setter. TODO also return error for compat
func BindServer ¶ added in v0.8.0
BindServer binds RpcReady and ClientConnected with Add/Remove, to custom states.
func BindServerMulti ¶ added in v0.8.0
func BindServerMulti( source, target *am.Machine, rpcReady, clientConn, clientDisconn string, ) error
BindServerMulti binds RpcReady, ClientConnected, and ClientDisconnected. RpcReady is Add/Remove, other two are Add-only to passed multi states.
func BindServerRpcReady ¶ added in v0.8.0
BindServerRpcReady bind RpcReady using Add to a custom multi state.
func ClockFromMsg ¶
func GetClientId ¶ added in v0.8.0
GetClientId returns an RPC Client machine ID from a name. This ID will be used to handshake the server.
func MachRepl ¶ added in v0.10.2
MachRepl sets up a machine for a REPL connection, which allows for mutations, like any other RPC connection. See [/tools/cmd/arpc] for usage. This function is considered a debugging helper and can panic.
addr: address to listen on, default to 127.0.0.1:0 addrDir: optional dir path to save the address file as addrDir/mach-id.addr. addrCh: optional channel to send the address to, once ready errCh: optional channel to send err to, once ready
func MachReplEnv ¶ added in v0.10.2
MachReplEnv sets up a machine for a REPL connection in case AM_REPL_ADDR env var is set. See MachRepl.
Types ¶
type A ¶ added in v0.8.0
type A struct {
Id string `log:"id"`
Name string `log:"name"`
MachTime am.Time
QueueTick uint64
Payload *ArgsPayload
Addr string `log:"addr"`
Err error
Method string `log:"addr"`
StartedAt time.Time
Dispose bool
Client *rpc2.Client
}
A represents typed arguments of the RPC package. It's a typesafe alternative to am.A.
type ARpc ¶ added in v0.9.0
type ARpc struct {
Id string `log:"id"`
Name string `log:"name"`
MachTime am.Time
Payload *ArgsPayload
Addr string `log:"addr"`
Err error
Method string `log:"addr"`
StartedAt time.Time
Dispose bool
}
ARpc is a subset of A, that can be passed over RPC.
type ArgsPayload ¶
type ArgsPayload struct {
Name string
// Source is the machine ID that sent the payload.
Source string
// SourceTx is transition ID.
SourceTx string
// Destination is an optional machine ID that is supposed to receive the
// payload. Useful when using rpc.Mux.
Destination string
// Data is the payload data. The Consumer has to know the type.
Data any
// Token is a unique random ID for the payload. Autofilled by the server.
Token string
}
type Client ¶
type Client struct {
*ExceptionHandler
Mach *am.Machine
Name string
// Addr is the address the Client will connect to.
Addr string
// Request the state schema from the server.
RequestSchema bool
// Worker is a remote am.Machine instance
Worker *Worker
// Consumer is the optional consumer for deliveries.
Consumer *am.Machine
CallCount uint64
LogEnabled bool
// DisconnCooldown is the time to wait after notifying the server about
// disconnecting before actually disconnecting. Default 10ms.
DisconnCooldown time.Duration
// LastMsgAt is the last received msg from the worker TODO
LastMsgAt time.Time
// HelloDelay between Connected and Handshaking. Default 0, useful for
// rpc/Mux.
HelloDelay time.Duration
// ReconnectOn decides if the client will try to [RetryingConn] after a
// clean [Disconnect].
ReconnectOn bool
// ConnTimeout is the maximum time to wait for a connection to be established.
// Default 3s.
ConnTimeout time.Duration
// ConnRetries is the number of retries for a connection. Default 15.
ConnRetries int
// ConnRetryTimeout is the maximum time to retry a connection. Default 1m.
ConnRetryTimeout time.Duration
// ConnRetryDelay is the time to wait between retries. Default 100ms. If
// ConnRetryBackoff is set, this is the initial delay, and doubles on each
// retry.
ConnRetryDelay time.Duration
// ConnRetryBackoff is the maximum time to wait between retries. Default 3s.
ConnRetryBackoff time.Duration
// CallTimeout is the maximum time to wait for a call to complete. Default 3s.
CallTimeout time.Duration
// CallRetries is the number of retries for a call. Default 15.
CallRetries int
// CallRetryTimeout is the maximum time to retry a call. Default 1m.
CallRetryTimeout time.Duration
// CallRetryDelay is the time to wait between retries. Default 100ms. If
// CallRetryBackoff is set, this is the initial delay, and doubles on each
// retry.
CallRetryDelay time.Duration
// CallRetryBackoff is the maximum time to wait between retries. Default 3s.
CallRetryBackoff time.Duration
DisconnTimeout time.Duration
// contains filtered or unexported fields
}
Client is a type representing an RPC client that interacts with a remote am.Machine instance.
func NewClient ¶
func NewClient( ctx context.Context, workerAddr string, name string, stateStruct am.Schema, stateNames am.S, opts *ClientOpts, ) (*Client, error)
NewClient creates a new RPC client and exposes a remote state machine as a remote worker, with a subst of the API under Client.Worker. Optionally takes a consumer, which is a state machine with a WorkerPayload state. See states.ConsumerStates.
func (*Client) CallRetryFailedState ¶ added in v0.8.0
func (*Client) ConnectedState ¶
func (*Client) ConnectingState ¶
func (*Client) DisconnectedState ¶
func (*Client) DisconnectingState ¶
func (*Client) ExceptionState ¶ added in v0.8.0
ExceptionState handles network errors and retries the connection.
func (*Client) HandshakeDoneEnter ¶ added in v0.8.0
func (*Client) HandshakeDoneState ¶
func (*Client) HandshakingState ¶
func (*Client) HealthcheckState ¶ added in v0.9.0
func (*Client) RemoteBye ¶ added in v0.10.1
RemoteBye is called by the server on a planned disconnect. TODO take a reason / source event?
func (*Client) RemotePushAllTicks ¶ added in v0.8.0
RemotePushAllTicks log all the machine clock's ticks, so all final handlers can be executed in order. Only called by the server. TODO
func (*Client) RemoteSchemaChange ¶ added in v0.12.0
RemoteSchemaChange is called by the server on a source machine schema change.
func (*Client) RemoteSendPayload ¶
RemoteSendPayload receives a payload from the server and triggers WorkerPayload. The Consumer should bind his handlers and handle this state to receive the data.
func (*Client) RemoteSendingPayload ¶ added in v0.8.0
RemoteSendingPayload triggers the WorkerDelivering state, which is an optional indication that the server has started a data transmission to the Client. This payload shouldn't contain the data itself, only the name and token.
func (*Client) RemoteSetClock ¶
RemoteSetClock updates the client's clock. Only called by the server.
func (*Client) RetryingCallEnter ¶ added in v0.8.0
func (*Client) RetryingConnState ¶ added in v0.8.0
RetryingConnState should be set without Connecting in the same tx
func (*Client) Start ¶
Start connects the client to the server and initializes the worker. Results in the Ready state.
func (*Client) StartState ¶ added in v0.8.0
func (*Client) Stop ¶
Stop disconnects the client from the server and disposes the worker.
waitTillExit: if passed, waits for the client to disconnect using the context.
func (*Client) WorkerPayloadEnter ¶ added in v0.8.0
func (*Client) WorkerPayloadState ¶ added in v0.8.0
type ClientMethod ¶ added in v0.15.1
type ClientOpts ¶ added in v0.8.0
type ClientOpts struct {
// PayloadState is a state for the server to listen on, to deliver payloads
// to the client. The client adds this state to request a payload from the
// worker. Default: am/rpc/states/WorkerStates.SendPayload.
Consumer *am.Machine
// Parent is a parent state machine for a new Client state machine. See
// [am.Opts].
Parent am.Api
}
type ClockMsg ¶
type ExceptionHandler ¶
type ExceptionHandler struct {
*am.ExceptionHandler
}
ExceptionHandler is a shared exception handler for RPC server and client.
func (*ExceptionHandler) ExceptionEnter ¶
func (h *ExceptionHandler) ExceptionEnter(e *am.Event) bool
type Mux ¶ added in v0.8.0
type Mux struct {
*am.ExceptionHandler
Mach *am.Machine
// Source is the state source to expose via RPC. Required if NewServerFn
// isnt provided.
Source am.Api
// NewServerFn creates a new instance of Server and is called for every new
// connection.
NewServerFn MuxNewServerFn
Name string
Addr string
// The listener used by this Mux, can be set manually before Start().
Listener net.Listener
LogEnabled bool
// The last error returned by NewServerFn.
NewServerErr error
// contains filtered or unexported fields
}
Mux creates a new RPC server for each incoming connection.
func NewMux ¶ added in v0.8.0
func NewMux( ctx context.Context, name string, newServerFn MuxNewServerFn, opts *MuxOpts, ) (*Mux, error)
NewMux initializes a Mux instance to handle RPC server creation for incoming connections with the given parameters.
newServerFn: when nil, [Mux.Source] needs to be set manually before calling Mux.Start.
func (*Mux) ClientConnectedState ¶ added in v0.8.0
func (*Mux) ExceptionState ¶ added in v0.8.0
func (*Mux) HealthcheckState ¶ added in v0.8.0
func (*Mux) NewServerErrState ¶ added in v0.8.0
func (*Mux) StartState ¶ added in v0.8.0
type MuxNewServerFn ¶ added in v0.10.2
MuxNewServerFn is a function to create a new RPC server for each incoming connection.
type PushAllTicks ¶ added in v0.8.0
type PushAllTicks struct {
MutationType []am.MutationType
CalledStates [][]int
ClockMsg []*ClockMsg
}
PushAllTicks TODO implement PushAllTicks should list all the mutations, with clocks and queue ticks which then will be processed as a queue. The client reconstructs the tx on his side. TODO mind partially accepted auto states (fake called states).
type RespHandshake ¶
type RespHandshake struct {
Schema am.Schema
Serialized *am.Serialized
}
type RespResult ¶
type SendPayloadHandlers ¶ added in v0.8.0
type SendPayloadHandlers struct {
SendPayloadState am.HandlerFinal
}
type Server ¶
type Server struct {
*ExceptionHandler
Mach *am.Machine
// Source is a state Source, either a local or remote RPC worker.
Source am.Api
// Addr is the address of the server on the network.
Addr string
// DeliveryTimeout is a timeout for SendPayload to the client.
DeliveryTimeout time.Duration
// PushInterval is the interval for clock updates, effectively throttling
// the number of updates sent to the client within the interval window.
// 0 means pushes are disabled. Setting to a very small value will make
// pushes instant.
PushInterval time.Duration
// PushAllTicks will push all ticks to the client, enabling client-side
// final handlers. TODO more info, implement via a queue
PushAllTicks bool
// Listener can be set manually before starting the server.
Listener atomic.Pointer[net.Listener]
// Conn can be set manually before starting the server.
Conn net.Conn
// NoNewListener will prevent the server from creating a new listener if
// one is not provided, or has been closed. Useful for cmux.
NoNewListener bool
LogEnabled bool
CallCount uint64
// AllowId will limit clients to a specific ID, if set.
AllowId string
// contains filtered or unexported fields
}
Server is an RPC server that can be bound to a worker machine and provide remote access to its states and methods.
func NewServer ¶
func NewServer( ctx context.Context, addr string, name string, sourceMach am.Api, opts *ServerOpts, ) (*Server, error)
NewServer creates a new RPC server, bound to a worker machine. The source machine has to implement am/rpc/states/WorkerStatesDef interface.
func (*Server) HandshakeDoneEnd ¶
func (*Server) RemoteAddNS ¶
func (*Server) RemoteHandshake ¶
func (*Server) RemoteHello ¶ added in v0.8.0
func (*Server) RemoteRemove ¶
func (*Server) RemoteSetPushAllTicks ¶ added in v0.8.0
func (*Server) RemoteSync ¶
func (*Server) RpcReadyState ¶
RpcReadyState starts a ticker to compensate for clock push debounces.
func (*Server) RpcStartingEnter ¶ added in v0.8.0
func (*Server) RpcStartingState ¶
func (*Server) SendPayload ¶
SendPayload sends a payload to the client. It's usually called by a handler for SendPayload.
type ServerMethod ¶ added in v0.15.1
type ServerOpts ¶ added in v0.8.0
type ServerOpts struct {
// PayloadState is a state for the server to listen on, to deliver payloads
// to the client. The client activates this state to request a payload from
// the worker. Default: am/rpc/states/WorkerStates.SendPayload.
PayloadState string
// Parent is a parent state machine for a new Server state machine. See
// [am.Opts].
Parent am.Api
}
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker is a subset of `pkg/machine#Machine` for RPC. Lacks the queue and other local methods. Most methods are clock-based, thus executed locally. TODO rename to NetworkMachine (netMach)
func NewWorker ¶ added in v0.10.1
func NewWorker( ctx context.Context, id string, c *Client, schema am.Schema, stateNames am.S, parent *am.Machine, tags []string, ) (*Worker, error)
NewWorker creates a new instance of a Worker. TODO link godoc to pkg/machine
func (*Worker) ActiveStates ¶
ActiveStates returns a copy of the currently active states.
func (*Worker) Add ¶
Add activates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.
func (*Worker) AddBreakpoint ¶ added in v0.12.0
func (*Worker) AddBreakpoint1 ¶ added in v0.15.1
func (*Worker) AddErr ¶
AddErr is a dedicated method to add the Exception state with the passed error and optional arguments. Like every mutation method, it will resolve relations and trigger handlers. AddErr produces a stack trace of the error, if LogStackTrace is enabled.
func (*Worker) AddErrState ¶
AddErrState adds a dedicated error state, along with the build in Exception state. Like every mutation method, it will resolve relations and trigger handlers. AddErrState produces a stack trace of the error, if LogStackTrace is enabled.
func (*Worker) AddNS ¶
AddNS is a NoSync method - an efficient way for adding states, as it doesn't wait for, nor transfers a response. Because of which it doesn't update the clock. Use Sync() to update the clock after a batch of AddNS calls.
func (*Worker) Any ¶
Any is group call to Is, returns true if any of the params return true from Is.
machine.StringAll() // ()[Foo:0 Bar:0 Baz:0]
machine.Add(S{"Foo"})
// is(Foo, Bar) or is(Bar)
machine.Any(S{"Foo", "Bar"}, S{"Bar"}) // false
// is(Foo) or is(Bar)
machine.Any(S{"Foo"}, S{"Bar"}) // true
func (*Worker) Any1 ¶
Any1 is group call to Is1(), returns true if any of the params return true from Is1().
func (*Worker) BindHandlers ¶ added in v0.8.0
BindHandlers binds a struct of handler methods to machine's states, based on the naming convention, eg `FooState(e *Event)`. Negotiation handlers can optionally return bool.
RPC worker will bind handlers locally, not to the remote machine. RPC worker handlers are still TODO.
func (*Worker) BindTracer ¶ added in v0.8.0
BindTracer binds a Tracer to the machine.
func (*Worker) CanRemove1 ¶ added in v0.15.0
func (*Worker) Clock ¶
Clock returns current machine's clock, a state-keyed map of ticks. If states are passed, only the ticks of the passed states are returned.
func (*Worker) CountActive ¶ added in v0.15.0
CountActive returns the number of active states from a passed list. Useful for state groups.
func (*Worker) DetachHandlers ¶ added in v0.9.0
DetachHandlers detaches previously bound machine handlers.
func (*Worker) DetachTracer ¶ added in v0.8.0
DetachTracer tries to remove a tracer from the machine. Returns true if the tracer was found and removed.
func (*Worker) Dispose ¶
func (w *Worker) Dispose()
Dispose disposes the machine and all its emitters. You can wait for the completion of the disposal with `<-mach.WhenDisposed`.
func (*Worker) EvAdd1 ¶ added in v0.9.0
EvAdd1 is a shorthand method to add a single state with the passed args.
func (*Worker) EvAddErrState ¶ added in v0.9.0
func (*Worker) Export ¶
func (w *Worker) Export() *am.Serialized
Export exports the machine state: id, time and state names.
func (*Worker) Has1 ¶
Has1 is a shorthand for Has. It returns true if the passed state is registered in the machine.
func (*Worker) HasHandlers ¶ added in v0.12.0
HasHandlers returns true if this machine has bound handlers, and thus an allocated goroutine. It also makes it nondeterministic.
func (*Worker) Index1 ¶ added in v0.12.0
Index1 returns the index of a state in the machine's StateNames() list.
func (*Worker) Inspect ¶
Inspect returns a multi-line string representation of the machine (states, relations, clock). states: param for ordered or partial results.
func (*Worker) InternalUpdateClock ¶ added in v0.15.1
InternalUpdateClock is an internal method to update the clock of this Worker. It should NOT be called by anything else then a synchronization source (eg RPC client, pubsub, etc).
func (*Worker) Is ¶
Is checks if all the passed states are currently active.
machine.StringAll() // ()[Foo:0 Bar:0 Baz:0]
machine.Add(S{"Foo"})
machine.TestIs(S{"Foo"}) // true
machine.TestIs(S{"Foo", "Bar"}) // false
func (*Worker) Is1 ¶
Is1 is a shorthand method to check if a single state is currently active. See Is().
func (*Worker) IsClock ¶
IsClock checks if the machine has changed since the passed clock. Returns true if at least one state has changed.
func (*Worker) IsDisposed ¶ added in v0.8.0
IsDisposed returns true if the machine has been disposed.
func (*Worker) IsTime ¶
IsTime checks if the machine has changed since the passed time (list of ticks). Returns true if at least one state has changed. The states param is optional and can be used to check only a subset of states.
func (*Worker) MachineTick ¶ added in v0.16.0
func (*Worker) MustParseStates ¶
MustParseStates parses the states and returns them as a list. Panics when a state is not defined. It's an usafe equivalent of VerifyStates.
func (*Worker) NewStateCtx ¶
NewStateCtx returns a new sub-context, bound to the current clock's tick of the passed state.
Context cancels when the state has been de-activated, or right away, if it isn't currently active.
State contexts are used to check state expirations and should be checked often inside goroutines. TODO log reader
func (*Worker) Not ¶
Not checks if **none** of the passed states are currently active.
machine.StringAll()
// -> ()[A:0 B:0 C:0 D:0]
machine.Add(S{"A", "B"})
// not(A) and not(C)
machine.TestNot(S{"A", "C"})
// -> false
// not(C) and not(D)
machine.TestNot(S{"C", "D"})
// -> true
func (*Worker) Not1 ¶
Not1 is a shorthand method to check if a single state is currently inactive. See Not().
func (*Worker) OnDispose ¶ added in v0.16.0
func (w *Worker) OnDispose(fn am.HandlerDispose)
func (*Worker) Remove ¶
Remove deactivates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.
func (*Worker) Remove1 ¶
Remove1 is a shorthand method to remove a single state with the passed args. See Remove().
func (*Worker) Set ¶
Set deactivates a list of states in the machine, returning the result of the transition (Executed, Queued, Canceled). Like every mutation method, it will resolve relations and trigger handlers.
func (*Worker) StateNames ¶
StateNames returns a copy of all the state names.
func (*Worker) StateNamesMatch ¶ added in v0.12.0
func (*Worker) StatesVerified ¶ added in v0.8.0
StatesVerified returns true if the state names have been ordered using VerifyStates.
func (*Worker) String ¶
String returns a one line representation of the currently active states, with their clock values. Inactive states are omitted. Eg: (Foo:1 Bar:3)
func (*Worker) StringAll ¶
StringAll returns a one line representation of all the states, with their clock values. Inactive states are in square brackets. Eg: (Foo:1 Bar:3)[Baz:2]
func (*Worker) Switch ¶
Switch returns the first state from the passed list that is currently active, making it useful for switch statements.
switch mach.Switch(ss.GroupPlaying) {
case "Playing":
case "Paused":
case "Stopped":
}
func (*Worker) Sync ¶
Sync requests fresh clock values from the remote machine. Useful to call after a batch of no-sync methods, eg AddNS.
func (*Worker) Tags ¶ added in v0.9.0
Tags returns machine's tags, a list of unstructured strings without spaces.
func (*Worker) Time ¶
Time returns machine's time, a list of ticks per state. Returned value includes the specified states, or all the states if nil.
func (*Worker) Toggle ¶ added in v0.15.1
Toggle deactivates a list of states in case all are active, or activates them otherwise. Returns the result of the transition (Executed, Queued, Canceled).
func (*Worker) Toggle1 ¶ added in v0.15.1
Toggle1 activates or deactivates a single state, depending on its current state. Returns the result of the transition (Executed, Queued, Canceled).
func (*Worker) WasClock ¶ added in v0.12.0
WasClock checks if the passed time has happened (or happening right now). Returns false if at least one state is too early.
func (*Worker) WasTime ¶ added in v0.12.0
WasTime checks if the passed time has happened (or happening right now). Returns false if at least one state is too early. The states param is optional and can be used to check only a subset of states.
func (*Worker) When ¶
When returns a channel that will be closed when all the passed states become active or the machine gets disposed.
ctx: optional context that will close the channel early.
func (*Worker) WhenArgs ¶
WhenArgs returns a channel that will be closed when the passed state becomes active with all the passed args. Args are compared using the native '=='. It's meant to be used with async Multi states, to filter out a specific completion.
ctx: optional context that will close the channel when done.
func (*Worker) WhenDisposed ¶
func (w *Worker) WhenDisposed() <-chan struct{}
WhenDisposed returns a channel that will be closed when the machine is disposed. Requires bound handlers. Use Machine.Disposed in case no handlers have been bound.
func (*Worker) WhenErr ¶
WhenErr returns a channel that will be closed when the machine is in the StateException state.
ctx: optional context that will close the channel early.
func (*Worker) WhenNot ¶
WhenNot returns a channel that will be closed when all the passed states become inactive or the machine gets disposed.
ctx: optional context that will close the channel early.
func (*Worker) WhenQuery ¶ added in v0.16.0
func (w *Worker) WhenQuery( clockCheck func(clock am.Clock) bool, ctx context.Context, ) <-chan struct{}
WhenQuery returns a channel that will be closed when the passed [clockCheck] function returns true. [clockCheck] should be a pure function and non-blocking.`
ctx: optional context that will close the channel early.
func (*Worker) WhenTicks ¶
WhenTicks waits N ticks of a single state (relative to now). Uses WhenTime underneath.
ctx: optional context that will close the channel early.
func (*Worker) WhenTime ¶
WhenTime returns a channel that will be closed when all the passed states have passed the specified time. The time is a logical clock of the state. Machine time can be sourced from [Machine.Time](), or [Machine.Clock]().
ctx: optional context that will close the channel early.
type WorkerTracer ¶
type WorkerTracer struct {
*am.NoOpTracer
// contains filtered or unexported fields
}
WorkerTracer is a tracer for local worker machines (event source).
func (*WorkerTracer) SchemaChange ¶ added in v0.12.0
func (t *WorkerTracer) SchemaChange(mach am.Api, oldSchema am.Schema)
func (*WorkerTracer) TransitionEnd ¶
func (t *WorkerTracer) TransitionEnd(tx *am.Transition)