Documentation
¶
Index ¶
- Constants
- Variables
- func AddErrJoining(event *am.Event, mach *am.Machine, err error, args am.A) error
- func AddErrListening(event *am.Event, mach *am.Machine, err error, args am.A) error
- func LogArgs(args am.A) map[string]string
- func Pass(args *A) am.A
- type A
- type Gossips
- type Info
- type ListFilters
- type MachClocks
- type MachInfo
- type Msg
- type MsgBye
- type MsgByeMach
- type MsgGossip
- type MsgInfo
- type MsgReqInfo
- type MsgReqUpdates
- type MsgType
- type MsgUpdates
- type PeerClocks
- type PeerGossips
- type PeerInfo
- type Topic
- func (t *Topic) ConnCount() int
- func (t *Topic) ConnectingEnter(e *am.Event) bool
- func (t *Topic) ConnectingState(e *am.Event)
- func (t *Topic) DisconnectingStart(e *am.Event)
- func (t *Topic) Dispose() am.Result
- func (t *Topic) ExceptionState(e *am.Event)
- func (t *Topic) GetPeerAddrs() ([]ma.Multiaddr, error)
- func (t *Topic) HeartbeatState(e *am.Event)
- func (t *Topic) Join() am.Result
- func (t *Topic) JoinedEnd(e *am.Event)
- func (t *Topic) JoinedState(e *am.Event)
- func (t *Topic) JoiningState(e *am.Event)
- func (t *Topic) ListMachinesEnter(e *am.Event) bool
- func (t *Topic) ListMachinesState(e *am.Event)
- func (t *Topic) MissPeersByGossipEnter(e *am.Event) bool
- func (t *Topic) MissPeersByGossipExit(e *am.Event) bool
- func (t *Topic) MissPeersByGossipState(e *am.Event)
- func (t *Topic) MissPeersByUpdatesEnter(e *am.Event) bool
- func (t *Topic) MissPeersByUpdatesExit(e *am.Event) bool
- func (t *Topic) MissPeersByUpdatesState(e *am.Event)
- func (t *Topic) MissUpdatesByGossipEnter(e *am.Event) bool
- func (t *Topic) MissUpdatesByGossipExit(e *am.Event) bool
- func (t *Topic) MissUpdatesByGossipState(e *am.Event)
- func (t *Topic) MsgByeEnter(e *am.Event) bool
- func (t *Topic) MsgByeState(e *am.Event)
- func (t *Topic) MsgInfoEnter(e *am.Event) bool
- func (t *Topic) MsgInfoState(e *am.Event)
- func (t *Topic) MsgReceivedEnter(e *am.Event) bool
- func (t *Topic) MsgReqInfoEnter(e *am.Event) bool
- func (t *Topic) MsgReqInfoState(e *am.Event)
- func (t *Topic) MsgReqUpdatesEnter(e *am.Event) bool
- func (t *Topic) MsgReqUpdatesState(e *am.Event)
- func (t *Topic) MsgUpdatesEnter(e *am.Event) bool
- func (t *Topic) MsgUpdatesState(e *am.Event)
- func (t *Topic) PeerJoinedEnter(e *am.Event) bool
- func (t *Topic) PeerJoinedState(e *am.Event)
- func (t *Topic) PeerLeftEnter(e *am.Event) bool
- func (t *Topic) PeerLeftState(e *am.Event)
- func (t *Topic) ReadyEnter(e *am.Event) bool
- func (t *Topic) SendInfoEnter(e *am.Event) bool
- func (t *Topic) SendInfoState(e *am.Event)
- func (t *Topic) SendMsgEnter(e *am.Event) bool
- func (t *Topic) SendMsgState(e *am.Event)
- func (t *Topic) Start() am.Result
- func (t *Topic) StartAndJoin(ctx context.Context) am.Result
- func (t *Topic) StartEnd(e *am.Event)
- func (t *Topic) StartEnter(e *am.Event) bool
- func (t *Topic) StartState(e *am.Event)
- type TopicOpts
- type Tracer
Constants ¶
const ( // EnvAmPubsubLog enables machine logging for pubsub. EnvAmPubsubLog = "AM_PUBSUB_LOG" // EnvAmPubsubDbg exposes PubSub workers over dbg telemetry. EnvAmPubsubDbg = "AM_PUBSUB_DBG" )
const APrefix = "am_pubsub"
Variables ¶
var ( ErrJoining = errors.New("error joining") ErrListening = errors.New("error listening") )
Functions ¶
func AddErrJoining ¶
AddErrJoining wraps an error in the ErrJoining sentinel and adds to a machine.
func AddErrListening ¶
AddErrListening wraps an error in the ErrListening sentinel and adds to a machine.
Types ¶
type A ¶
type A struct {
// MsgInfo happend when a peer introduces it's exposed state machines.
MsgInfo *MsgInfo
MsgUpdates *MsgUpdates
MsgBye *MsgBye
MsgReqInfo *MsgReqInfo
MsgReqUpdates *MsgReqUpdates
MsgGossip *MsgGossip
// Msgs is a list of received general (non-semantic) PubSub messages.
Msgs []*pubsub.Message
// Msg is a raw msg.
Msg []byte
// PeerId is a peer ID
PeerId string `log:"peer"`
PeerIds []string
// MachId is a state machine ID
MachId string
// MTime is machine time
MTime am.Time `log:"mtime"`
// Addrs is a list of addresses.
Addrs []multiaddr.Multiaddr `log:"addrs"`
// WorkersCh is a return channel for a list of [rpc.Worker]. It has to be
// buffered or the mutation will fail.
WorkersCh chan<- []*rpc.Worker
ListFilters *ListFilters
MachClocks MachClocks
MsgType string `log:"msg_type"`
PeersGossip PeerGossips
}
A is a struct for node arguments. It's a typesafe alternative to am.A.
type Info ¶
type Info struct {
Id string
Schema am.Struct
States am.S
MTime am.Time
Tags []string
Parent string
}
MsgHello is sent when a peer exposes state machines in the topic.
type ListFilters ¶
type MsgByeMach ¶
MsgBye is sent when a peer un-exposes state machines in the topic.
type MsgGossip ¶
type MsgGossip struct {
Msg
// Number of workers from each peer
PeerGossips PeerGossips
}
type MsgInfo ¶
type MsgInfo struct {
Msg
// peer ID => mach idx => schema
PeerInfo PeerInfo
// Number of workers from each peer
PeerGossips PeerGossips
}
type MsgReqInfo ¶
type MsgReqUpdates ¶
type MsgUpdates ¶
type MsgUpdates struct {
Msg
PeerClocks PeerClocks
}
type PeerClocks ¶
type PeerClocks map[string]MachClocks
Machine clocks indexed by a local peer index.
type PeerGossips ¶
PeerGossips is peer ID => machine time sum TODO check local time sum
type Topic ¶
type Topic struct {
*am.ExceptionHandler
// T represents the PubSub topic used for communication.
T *ps.Topic
// Mach is a state machine for this PubSub.
Mach *am.Machine
// ListenAddrs contains the list of multiaddresses that this topic listens on.
// None will allocate one automatically.
ListenAddrs []ma.Multiaddr
// Addrs is a list of addresses for this peer.
Addrs atomic.Pointer[[]ma.Multiaddr]
// Name indicates the name of the channel or topic instance,
// typically associated with a given process.
Name string
// ConnAddrs contains a list of addresses for initial
// connections to discovery nodes in the network.
ConnAddrs []ma.Multiaddr
// List of exposed state machines, index => mach_id, indexes are used on the
// channel. Indexes should NOT change, as they are used for addressing. `nil`
// value means empty.
ExposedMachs []*am.Machine
// Debounce for clock broadcasts (separate per each exposed state machine).
Debounce time.Time
LogEnabled bool
// HeartbeatFreq brodcasts changed clocks of exposed machines.
HeartbeatFreq time.Duration
// Maximum msgs per minute in the network,
MaxMsgsPerWin int
// DebugWorkerTelemetry exposes local workers to am-dbg
DebugWorkerTelemetry bool
ConnectionsToReady int
// contains filtered or unexported fields
}
Topic is a single-topic PubSub based on lib2p-pubsub with manual discovery. Each peer can have many exposed state machines and broadcasts their clock changes on the channel, introduces them when joining and via private messages to newly joined peers. TODO optimizations:
- predefined schemas
- random delay for initial hello
- reduce number of messages (max total rcv/snt msgs per a time window)
- smaller pool for requests?
- dont gossip about missing updates
- worker pool for forked goroutines
func (*Topic) ConnectingState ¶
func (*Topic) DisconnectingStart ¶
func (*Topic) ExceptionState ¶
func (*Topic) HeartbeatState ¶
func (*Topic) JoinedState ¶
func (*Topic) JoiningState ¶
func (*Topic) ListMachinesState ¶
func (*Topic) MissPeersByGossipState ¶
func (*Topic) MissPeersByUpdatesState ¶
func (*Topic) MissUpdatesByGossipState ¶
func (*Topic) MsgByeState ¶
func (*Topic) MsgInfoState ¶
func (*Topic) MsgReqInfoState ¶
func (*Topic) MsgReqUpdatesState ¶
func (*Topic) MsgUpdatesState ¶
func (*Topic) PeerJoinedState ¶
func (*Topic) PeerLeftState ¶
func (*Topic) SendInfoState ¶
func (*Topic) SendMsgState ¶
func (*Topic) StartAndJoin ¶ added in v0.10.2
func (*Topic) StartState ¶
type Tracer ¶
type Tracer struct {
*am.NoOpTracer
// contains filtered or unexported fields
}
func (*Tracer) TransitionEnd ¶
func (t *Tracer) TransitionEnd(transition *am.Transition)
TransitionEnd sends a message when a transition ends
