pubsub

package
v0.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2025 License: MIT Imports: 29 Imported by: 0

README

/pkg/pubsub

cd /

[!NOTE] asyncmachine-go is a declarative control flow library implementing AOP and Actor Model through a clock-based state machine.

/pkg/pubsub is a trustful decentralized synchronization network for asyncmachine-go. Each peer exposes several state machines, then starts gossiping about them and other ones known to him. Remote state machines are then visible to other peers as /pkg/rpc.LocalWorker. PubSub can be used to match Clients with Workers from /pkg/node.

Under the hood it's based on libp2p gossipsub, which is a mesh-based PubSub, also based on gossipping, but for the purpose of network topology. libp2p gossips are separate from gossips of this package.

Support

  • state checking YES
  • state mutations NO
  • state waiting YES

Features

  • gossip-based discovery
  • gossip-based clock updates
  • gossip-based checksums via machine time
  • rate limitting
  • no leaders, no elections

Screenshot

am-dbg view of a PubSub with 6 peers, with p1-p5 exposing a single state machine each.

Schema

State schema from /pkg/pubsub/states/.

worker schena

TODO

  • more rate limitting
  • confirmed handler timeouts
  • faster discovery
  • load test
  • mDNS & DHT & auth
  • optimizations
  • documentation
    • discovery protocol
    • sequence diagrams

Usage

import (
    ma "github.com/multiformats/go-multiaddr"
    ampubsub "github.com/pancsta/asyncmachine-go/pkg/pubsub"
)

// ...

// init a pubsub peer
ps, _ := ampubsub.NewTopic(ctx, t.Name(), name, machs, nil)
// prep a libp2p multi address
a, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/75343/quic-v1")
addrs := []ma.Multiaddr{a}
ps.ConnAddrs = addrs
ps.Start()
<-ps.Mach.When1(ss.Connected, ctx)
ps.Mach.Add1()

Status

Alpha, work in progress, not semantically versioned.

Credits

monorepo

Go back to the monorepo root to continue reading.

Documentation

Index

Constants

View Source
const (
	// EnvAmPubsubLog enables machine logging for pubsub.
	EnvAmPubsubLog = "AM_PUBSUB_LOG"
	// EnvAmPubsubDbg exposes PubSub workers over dbg telemetry.
	EnvAmPubsubDbg = "AM_PUBSUB_DBG"
)
View Source
const APrefix = "am_pubsub"

Variables

View Source
var (
	ErrJoining   = errors.New("error joining")
	ErrListening = errors.New("error listening")
)

Functions

func AddErrJoining

func AddErrJoining(
	event *am.Event, mach *am.Machine, err error, args am.A,
) error

AddErrJoining wraps an error in the ErrJoining sentinel and adds to a machine.

func AddErrListening

func AddErrListening(
	event *am.Event, mach *am.Machine, err error, args am.A,
) error

AddErrListening wraps an error in the ErrListening sentinel and adds to a machine.

func LogArgs

func LogArgs(args am.A) map[string]string

LogArgs is an args logger for A and pubsub.A.

func Pass

func Pass(args *A) am.A

Pass prepares am.A from A to pass to further mutations.

Types

type A

type A struct {
	// MsgInfo happend when a peer introduces it's exposed state machines.
	MsgInfo       *MsgInfo
	MsgUpdates    *MsgUpdates
	MsgBye        *MsgBye
	MsgReqInfo    *MsgReqInfo
	MsgReqUpdates *MsgReqUpdates
	MsgGossip     *MsgGossip
	// Msgs is a list of received general (non-semantic) PubSub messages.
	Msgs []*pubsub.Message
	// Msg is a raw msg.
	Msg []byte
	// PeerId is a peer ID
	PeerId  string `log:"peer"`
	PeerIds []string
	// MachId is a state machine ID
	MachId string
	// MTime is machine time
	MTime am.Time `log:"mtime"`
	// Addrs is a list of addresses.
	Addrs []multiaddr.Multiaddr `log:"addrs"`
	// WorkersCh is a return channel for a list of [rpc.Worker]. It has to be
	// buffered or the mutation will fail.
	WorkersCh   chan<- []*rpc.Worker
	ListFilters *ListFilters
	MachClocks  MachClocks
	MsgType     string `log:"msg_type"`
	PeersGossip PeerGossips
}

A is a struct for node arguments. It's a typesafe alternative to am.A.

func ParseArgs

func ParseArgs(args am.A) *A

ParseArgs extracts A from am.Event.ArgsAPrefix.

type Gossips

type Gossips map[int]uint64

type Info

type Info struct {
	Id     string
	Schema am.Struct
	States am.S
	MTime  am.Time
	Tags   []string
	Parent string
}

MsgHello is sent when a peer exposes state machines in the topic.

type ListFilters

type ListFilters struct {
	IdExact   string
	IdPartial string
	IdRegexp  *regexp.Regexp
	Parent    string
	PeerId    string
	// Level to traverse towards the tree root.
	DepthLevel  int
	ChildrenMax int
	ChildrenMin int
}

type MachClocks

type MachClocks map[int]am.Time

Machine clocks indexed by a local peer index.

type MachInfo

type MachInfo map[int]*Info

type Msg

type Msg struct {
	Type MsgType
}

type MsgBye

type MsgBye struct {
	Msg
	Machs map[int]MsgByeMach
}

TODO implement

type MsgByeMach

type MsgByeMach struct {
	PeerBye bool
	Id      string
	MTime   am.Time
}

MsgBye is sent when a peer un-exposes state machines in the topic.

type MsgGossip

type MsgGossip struct {
	Msg
	// Number of workers from each peer
	PeerGossips PeerGossips
}

type MsgInfo

type MsgInfo struct {
	Msg
	// peer ID => mach idx => schema
	PeerInfo PeerInfo
	// Number of workers from each peer
	PeerGossips PeerGossips
}

type MsgReqInfo

type MsgReqInfo struct {
	Msg
	PeerIds []string
}

type MsgReqUpdates

type MsgReqUpdates struct {
	Msg
	// TODO list specific mach indexes
	PeerIds []string
}

type MsgType

type MsgType string
const (
	MsgTypeUpdates    MsgType = "updates"
	MsgTypeBye        MsgType = "bye"
	MsgTypeInfo       MsgType = "info"
	MsgTypeReqInfo    MsgType = "reqinfo"
	MsgTypeReqUpdates MsgType = "requpdates"
	MsgTypeGossip     MsgType = "gossip"
)

type MsgUpdates

type MsgUpdates struct {
	Msg
	PeerClocks PeerClocks
}

type PeerClocks

type PeerClocks map[string]MachClocks

Machine clocks indexed by a local peer index.

type PeerGossips

type PeerGossips map[string]Gossips

PeerGossips is peer ID => machine time sum TODO check local time sum

type PeerInfo

type PeerInfo map[string]MachInfo

type Topic

type Topic struct {
	*am.ExceptionHandler

	// T represents the PubSub topic used for communication.
	T *ps.Topic
	// Mach is a state machine for this PubSub.
	Mach *am.Machine
	// ListenAddrs contains the list of multiaddresses that this topic listens on.
	// None will allocate one automatically.
	ListenAddrs []ma.Multiaddr
	// Addrs is a list of addresses for this peer.
	Addrs atomic.Pointer[[]ma.Multiaddr]
	// Name indicates the name of the channel or topic instance,
	// typically associated with a given process.
	Name string
	// ConnAddrs contains a list of addresses for initial
	// connections to discovery nodes in the network.
	ConnAddrs []ma.Multiaddr
	// List of exposed state machines, index => mach_id, indexes are used on the
	// channel. Indexes should NOT change, as they are used for addressing. `nil`
	// value means empty.
	ExposedMachs []*am.Machine
	// Debounce for clock broadcasts (separate per each exposed state machine).
	Debounce   time.Time
	LogEnabled bool
	// HeartbeatFreq brodcasts changed clocks of exposed machines.
	HeartbeatFreq time.Duration
	// Maximum msgs per minute in the network,
	MaxMsgsPerWin int
	// DebugWorkerTelemetry exposes local workers to am-dbg
	DebugWorkerTelemetry bool
	ConnectionsToReady   int
	// contains filtered or unexported fields
}

Topic is a single-topic PubSub based on lib2p-pubsub with manual discovery. Each peer can have many exposed state machines and broadcasts their clock changes on the channel, introduces them when joining and via private messages to newly joined peers. TODO optimizations:

  • predefined schemas
  • random delay for initial hello
  • reduce number of messages (max total rcv/snt msgs per a time window)
  • smaller pool for requests?
  • dont gossip about missing updates
  • worker pool for forked goroutines

func NewTopic

func NewTopic(
	ctx context.Context, name, suffix string, exposedMachs []*am.Machine,
	opts *TopicOpts,
) (*Topic, error)

func (*Topic) ConnCount

func (t *Topic) ConnCount() int

func (*Topic) ConnectingEnter

func (t *Topic) ConnectingEnter(e *am.Event) bool

func (*Topic) ConnectingState

func (t *Topic) ConnectingState(e *am.Event)

func (*Topic) DisconnectingStart

func (t *Topic) DisconnectingStart(e *am.Event)

func (*Topic) Dispose

func (t *Topic) Dispose() am.Result

func (*Topic) ExceptionState

func (t *Topic) ExceptionState(e *am.Event)

func (*Topic) GetPeerAddrs

func (t *Topic) GetPeerAddrs() ([]ma.Multiaddr, error)

func (*Topic) HeartbeatState

func (t *Topic) HeartbeatState(e *am.Event)

func (*Topic) Join added in v0.10.2

func (t *Topic) Join() am.Result

func (*Topic) JoinedEnd

func (t *Topic) JoinedEnd(e *am.Event)

func (*Topic) JoinedState

func (t *Topic) JoinedState(e *am.Event)

func (*Topic) JoiningState

func (t *Topic) JoiningState(e *am.Event)

func (*Topic) ListMachinesEnter

func (t *Topic) ListMachinesEnter(e *am.Event) bool

func (*Topic) ListMachinesState

func (t *Topic) ListMachinesState(e *am.Event)

func (*Topic) MissPeersByGossipEnter

func (t *Topic) MissPeersByGossipEnter(e *am.Event) bool

func (*Topic) MissPeersByGossipExit

func (t *Topic) MissPeersByGossipExit(e *am.Event) bool

func (*Topic) MissPeersByGossipState

func (t *Topic) MissPeersByGossipState(e *am.Event)

func (*Topic) MissPeersByUpdatesEnter

func (t *Topic) MissPeersByUpdatesEnter(e *am.Event) bool

func (*Topic) MissPeersByUpdatesExit

func (t *Topic) MissPeersByUpdatesExit(e *am.Event) bool

func (*Topic) MissPeersByUpdatesState

func (t *Topic) MissPeersByUpdatesState(e *am.Event)

func (*Topic) MissUpdatesByGossipEnter

func (t *Topic) MissUpdatesByGossipEnter(e *am.Event) bool

func (*Topic) MissUpdatesByGossipExit

func (t *Topic) MissUpdatesByGossipExit(e *am.Event) bool

func (*Topic) MissUpdatesByGossipState

func (t *Topic) MissUpdatesByGossipState(e *am.Event)

func (*Topic) MsgByeEnter

func (t *Topic) MsgByeEnter(e *am.Event) bool

func (*Topic) MsgByeState

func (t *Topic) MsgByeState(e *am.Event)

func (*Topic) MsgInfoEnter

func (t *Topic) MsgInfoEnter(e *am.Event) bool

func (*Topic) MsgInfoState

func (t *Topic) MsgInfoState(e *am.Event)

func (*Topic) MsgReceivedEnter

func (t *Topic) MsgReceivedEnter(e *am.Event) bool

func (*Topic) MsgReqInfoEnter

func (t *Topic) MsgReqInfoEnter(e *am.Event) bool

func (*Topic) MsgReqInfoState

func (t *Topic) MsgReqInfoState(e *am.Event)

func (*Topic) MsgReqUpdatesEnter

func (t *Topic) MsgReqUpdatesEnter(e *am.Event) bool

func (*Topic) MsgReqUpdatesState

func (t *Topic) MsgReqUpdatesState(e *am.Event)

func (*Topic) MsgUpdatesEnter

func (t *Topic) MsgUpdatesEnter(e *am.Event) bool

func (*Topic) MsgUpdatesState

func (t *Topic) MsgUpdatesState(e *am.Event)

func (*Topic) PeerJoinedEnter

func (t *Topic) PeerJoinedEnter(e *am.Event) bool

func (*Topic) PeerJoinedState

func (t *Topic) PeerJoinedState(e *am.Event)

func (*Topic) PeerLeftEnter

func (t *Topic) PeerLeftEnter(e *am.Event) bool

func (*Topic) PeerLeftState

func (t *Topic) PeerLeftState(e *am.Event)

func (*Topic) ReadyEnter

func (t *Topic) ReadyEnter(e *am.Event) bool

TODO exit

func (*Topic) SendInfoEnter

func (t *Topic) SendInfoEnter(e *am.Event) bool

func (*Topic) SendInfoState

func (t *Topic) SendInfoState(e *am.Event)

func (*Topic) SendMsgEnter

func (t *Topic) SendMsgEnter(e *am.Event) bool

func (*Topic) SendMsgState

func (t *Topic) SendMsgState(e *am.Event)

func (*Topic) Start

func (t *Topic) Start() am.Result

func (*Topic) StartAndJoin added in v0.10.2

func (t *Topic) StartAndJoin(ctx context.Context) am.Result

func (*Topic) StartEnd

func (t *Topic) StartEnd(e *am.Event)

func (*Topic) StartEnter

func (t *Topic) StartEnter(e *am.Event) bool

func (*Topic) StartState

func (t *Topic) StartState(e *am.Event)

type TopicOpts

type TopicOpts struct {
	// Parent is a parent state machine for a new Topic state machine. See
	// [am.Opts].
	Parent am.Api
}

type Tracer

type Tracer struct {
	*am.NoOpTracer
	// contains filtered or unexported fields
}

func (*Tracer) TransitionEnd

func (t *Tracer) TransitionEnd(transition *am.Transition)

TransitionEnd sends a message when a transition ends

Directories

Path Synopsis
Package states contains a stateful schema-v2 for Topic.
Package states contains a stateful schema-v2 for Topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL