Documentation
¶
Index ¶
- Constants
- Variables
- func AddErrJoining(event *am.Event, mach *am.Machine, err error, args am.A) error
- func AddErrListening(event *am.Event, mach *am.Machine, err error, args am.A) error
- func LogArgs(args am.A) map[string]string
- func Pass(args *A) am.A
- type A
- type Gossips
- type Info
- type ListFilters
- type MachClocks
- type MachInfo
- type Msg
- type MsgBye
- type MsgByeMach
- type MsgGossip
- type MsgInfo
- type MsgReqInfo
- type MsgReqUpdates
- type MsgType
- type MsgUpdates
- type PeerClocks
- type PeerGossips
- type PeerInfo
- type Topic
- func (t *Topic) ConnCount() int
- func (t *Topic) ConnectingEnter(e *am.Event) bool
- func (t *Topic) ConnectingState(e *am.Event)
- func (t *Topic) Dispose() am.Result
- func (t *Topic) DoSendInfoEnter(e *am.Event) bool
- func (t *Topic) DoSendInfoState(e *am.Event)
- func (t *Topic) ExceptionEnter(e *am.Event) bool
- func (t *Topic) ExceptionState(e *am.Event)
- func (t *Topic) GetPeerAddrs() ([]ma.Multiaddr, error)
- func (t *Topic) HeartbeatState(e *am.Event)
- func (t *Topic) Join() am.Result
- func (t *Topic) JoinedEnd(e *am.Event)
- func (t *Topic) JoinedState(e *am.Event)
- func (t *Topic) JoiningState(e *am.Event)
- func (t *Topic) ListMachinesEnter(e *am.Event) bool
- func (t *Topic) ListMachinesState(e *am.Event)
- func (t *Topic) MissPeersByGossipEnter(e *am.Event) bool
- func (t *Topic) MissPeersByGossipExit(e *am.Event) bool
- func (t *Topic) MissPeersByGossipState(e *am.Event)
- func (t *Topic) MissPeersByUpdatesEnter(e *am.Event) bool
- func (t *Topic) MissPeersByUpdatesExit(e *am.Event) bool
- func (t *Topic) MissPeersByUpdatesState(e *am.Event)
- func (t *Topic) MissUpdatesByGossipEnter(e *am.Event) bool
- func (t *Topic) MissUpdatesByGossipExit(e *am.Event) bool
- func (t *Topic) MissUpdatesByGossipState(e *am.Event)
- func (t *Topic) MsgByeEnter(e *am.Event) bool
- func (t *Topic) MsgInfoEnter(e *am.Event) bool
- func (t *Topic) MsgInfoState(e *am.Event)
- func (t *Topic) MsgReceivedEnter(e *am.Event) bool
- func (t *Topic) MsgReqInfoEnter(e *am.Event) bool
- func (t *Topic) MsgReqInfoState(e *am.Event)
- func (t *Topic) MsgReqUpdatesEnter(e *am.Event) bool
- func (t *Topic) MsgReqUpdatesState(e *am.Event)
- func (t *Topic) MsgUpdatesEnter(e *am.Event) bool
- func (t *Topic) MsgUpdatesState(e *am.Event)
- func (t *Topic) PeerJoinedEnter(e *am.Event) bool
- func (t *Topic) PeerJoinedState(e *am.Event)
- func (t *Topic) PeerLeftEnter(e *am.Event) bool
- func (t *Topic) ProcessMsgsState(e *am.Event)
- func (t *Topic) ReadyEnter(e *am.Event) bool
- func (t *Topic) ReqMissingPeersEnter(e *am.Event) bool
- func (t *Topic) ReqMissingPeersState(e *am.Event)
- func (t *Topic) ReqMissingUpdatesEnter(e *am.Event) bool
- func (t *Topic) ReqMissingUpdatesState(e *am.Event)
- func (t *Topic) SendGossipsEnter(e *am.Event) bool
- func (t *Topic) SendGossipsState(e *am.Event)
- func (t *Topic) SendInfoEnter(e *am.Event) bool
- func (t *Topic) SendInfoState(e *am.Event)
- func (t *Topic) SendMsgEnter(e *am.Event) bool
- func (t *Topic) SendMsgState(e *am.Event)
- func (t *Topic) SendUpdatesEnter(e *am.Event) bool
- func (t *Topic) SendUpdatesState(e *am.Event)
- func (t *Topic) Start() am.Result
- func (t *Topic) StartAndJoin(ctx context.Context) am.Result
- func (t *Topic) StartEnd(e *am.Event)
- func (t *Topic) StartEnter(e *am.Event) bool
- func (t *Topic) StartState(e *am.Event)
- type TopicOpts
- type Tracer
Constants ¶
const ( // EnvAmPubsubLog enables machine logging for pubsub. EnvAmPubsubLog = "AM_PUBSUB_LOG" // EnvAmPubsubDbg exposes PubSub workers over dbg telemetry. EnvAmPubsubDbg = "AM_PUBSUB_DBG" )
const APrefix = "am_pubsub"
Variables ¶
var ( ErrJoining = errors.New("error joining") ErrListening = errors.New("error listening") )
Functions ¶
func AddErrJoining ¶
AddErrJoining wraps an error in the ErrJoining sentinel and adds to a machine.
func AddErrListening ¶
AddErrListening wraps an error in the ErrListening sentinel and adds to a machine.
Types ¶
type A ¶
type A struct {
// MsgInfo happens when a peer introduces it's exposed state machines.
MsgInfo *MsgInfo
MsgUpdates *MsgUpdates
MsgBye *MsgBye
MsgReqInfo *MsgReqInfo
MsgReqUpdates *MsgReqUpdates
MsgGossip *MsgGossip
// Msgs is a list of PubSub messages.
Msgs []*pubsub.Message
// Length is a general length used for logging
Length int `log:"length"`
// Msg is a raw msg.
Msg []byte
// PeerId is a peer ID
Peer string `log:"peer"`
PeerId string
PeerIds []string
// MachId is a state machine ID
MachId string `log:"mach_id"`
// MTime is machine time
MTime am.Time `log:"mtime"`
// HTime is human time
HTime time.Time
// Addrs is a list of addresses.
Addrs []multiaddr.Multiaddr `log:"addrs"`
// NetMachs is a return channel for a list of [rpc.NetworkMachine]. It has to
// be buffered or the mutation will fail.
NetMachs chan<- []*rpc.NetworkMachine
ListFilters *ListFilters
MachClocks MachClocks
MsgType string `log:"msg_type"`
PeersGossip PeerGossips
}
A is a struct for node arguments. It's a typesafe alternative to am.A.
type Info ¶
type Info struct {
Id string
Schema am.Schema
States am.S
MTime am.Time
Tags []string
Parent string
}
Info is sent when a peer exposes state machines in the topic.
type ListFilters ¶
type ListFilters struct {
IdExact string
IdPartial string
IdRegexp *regexp.Regexp
Parent string
PeerId string
// Level to traverse towards the tree root.
DepthLevel int
ChildrenMax int
ChildrenMin int
}
TODO merge with REPL, add tag-based queries (to find workers)
type MsgByeMach ¶
MsgByeMach is sent when a peer un-exposes state machines in the topic.
type MsgGossip ¶
type MsgGossip struct {
Msg
// Number of workers from each peer
PeerGossips PeerGossips
}
type MsgInfo ¶
type MsgInfo struct {
Msg
// peer ID => mach idx => schema
PeerInfo PeerInfo
// Number of workers from each peer
PeerGossips PeerGossips
}
type MsgReqInfo ¶
type MsgReqUpdates ¶
type MsgUpdates ¶
type MsgUpdates struct {
Msg
PeerClocks PeerClocks
}
type PeerClocks ¶
type PeerClocks map[string]MachClocks
Machine clocks indexed by a local peer index.
type PeerGossips ¶
PeerGossips is peer ID => machine time sum TODO check local time sum
type Topic ¶
type Topic struct {
*am.ExceptionHandler
// T represents the PubSub topic used for communication.
T *ps.Topic
// Mach is a state machine for this PubSub.
Mach *am.Machine
MachMetrics atomic.Pointer[am.Machine]
HostToPeer map[string]string
// ListenAddrs contains the list of multiaddresses that this topic listens on.
// None will allocate one automatically.
ListenAddrs []ma.Multiaddr
// Addrs is a list of addresses for this peer.
Addrs atomic.Pointer[[]ma.Multiaddr]
// Name indicates the name of the channel or topic instance,
// typically associated with a given process.
Name string
// ConnAddrs contains a list of addresses for initial
// connections to discovery nodes in the network.
ConnAddrs []ma.Multiaddr
// List of exposed state machines, index => mach_id, indexes are used on the
// channel. Indexes should NOT change, as they are used for addressing. `nil`
// value means empty.
ExposedMachs []*am.Machine
// Debounce for clock broadcasts (separate per each exposed state machine).
Debounce time.Time
LogEnabled bool
// HeartbeatFreq broadcasts changed clocks of exposed machines.
HeartbeatFreq time.Duration
// Maximum msgs per minute sent to the network. Does not include MsgInfo,
// which are debounced.
MaxMsgsPerWin int
// DbgNetMachs exposes local network machines via to am-dbg
DbgNetMachs bool
ConnectionsToReady int
// all amounts and delayes are multiplied by this factor
Multiplayer int
SendInfoDebounceMs int
// Max allowed queue length to send MsgInfo to newly joned peers, as well as
// received msgs.
MaxQueueLen uint16
// Number of gossips to send in SendGossipsState
GossipAmount int
// Use this hardcoded schema instead of exchanging real ones. Limits all the
// workers to this one.
// TODO catalog of named schemas, exchange unknown ones
TestSchema am.Schema
TestStates am.S
// contains filtered or unexported fields
}
Topic is a single-topic PubSub based on lib2p-pubsub with manual discovery. Each peer can have many exposed state machines and broadcasts their clock changes on the channel, introduces them when joining and via private messages to newly joined peers. TODO optimizations:
- avoid txs which will get cancelled (CPU)
- rate limit the TOTAL amount of msgs in the network (CPU, network)
- hook into libp2p errors, eg queue full, and act accordingly
func (*Topic) ConnectingState ¶
func (*Topic) DoSendInfoState ¶ added in v0.12.0
func (*Topic) ExceptionState ¶
func (*Topic) HeartbeatState ¶
func (*Topic) JoinedState ¶
func (*Topic) JoiningState ¶
func (*Topic) ListMachinesState ¶
func (*Topic) MissPeersByGossipState ¶
func (*Topic) MissPeersByUpdatesState ¶
func (*Topic) MissUpdatesByGossipState ¶
func (*Topic) MsgInfoState ¶
func (*Topic) MsgReqInfoState ¶
func (*Topic) MsgReqUpdatesState ¶
TODO this is never fired
func (*Topic) MsgUpdatesState ¶
func (*Topic) PeerJoinedState ¶
func (*Topic) ProcessMsgsState ¶ added in v0.13.0
func (*Topic) ReqMissingPeersEnter ¶ added in v0.12.0
func (*Topic) ReqMissingPeersState ¶ added in v0.12.0
func (*Topic) ReqMissingUpdatesEnter ¶ added in v0.12.0
func (*Topic) ReqMissingUpdatesState ¶ added in v0.12.0
func (*Topic) SendGossipsEnter ¶ added in v0.12.0
func (*Topic) SendGossipsState ¶ added in v0.12.0
func (*Topic) SendInfoState ¶
func (*Topic) SendMsgState ¶
func (*Topic) SendUpdatesEnter ¶ added in v0.13.0
func (*Topic) SendUpdatesState ¶ added in v0.12.0
func (*Topic) StartAndJoin ¶ added in v0.10.2
func (*Topic) StartState ¶
type Tracer ¶
type Tracer struct {
*am.TracerNoOp
// contains filtered or unexported fields
}
func (*Tracer) TransitionEnd ¶
func (t *Tracer) TransitionEnd(transition *am.Transition)
TransitionEnd sends a message when a transition ends