pubsub

package
v0.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2026 License: MIT Imports: 29 Imported by: 0

README

/pkg/pubsub

cd /

[!NOTE] asyncmachine-go is a batteries-included graph control flow library (AOP, actor model, state-machine).

/pkg/pubsub is a trustful and decentralized synchronization network for asyncmachine-go. Each peer exposes several state machines, then starts gossiping about them and other ones known to him. Remote state machines are then visible to other peers as /pkg/rpc.NetworkMachine. PubSub can be used to match Clients with Supervisors from /pkg/node.

Under the hood it's based on libp2p gossipsub, which is a mesh-based PubSub, also based on gossipping:

  • libp2p gossips create and maintain the network topology
  • pkg/pubsub gossips synchronize machine schemas and clocks

Support

  • state checking YES
  • state mutations NO
  • state waiting YES

Features

  • gossip-based discovery
  • gossip-based clock updates
  • gossip-based checksums via machine time
  • rate limiting
  • no leaders, no elections

Screenshot

am-dbg view of a PubSub with 6 peers, with p1-p5 exposing a single state machine each.

Schema

State schema from /pkg/pubsub/states/.

TODO

  • more protocol-level rate limiting
  • confirmed handler timeouts #220
  • faster discovery
  • 1k peer load test
  • mDNS & DHT & auth
  • optimizations
  • documentation
    • discovery protocol
    • sequence diagrams

Usage

Peer Init
import (
    ma "github.com/multiformats/go-multiaddr"
    ampubsub "github.com/pancsta/asyncmachine-go/pkg/pubsub"
    ssps "github.com/pancsta/asyncmachine-go/pkg/pubsub/states"
)

var ss = states.TopicStates

// ...

// new pubsub peer
ps, _ := ampubsub.NewTopic(ctx, t.Name(), name, machs, nil)
// address of an existing peer
a, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/75343/quic-v1")
addrs := []ma.Multiaddr{a}

// connect
ps.ConnAddrs = addrs
ps.Start()
<-ps.Mach.When1(ss.Connected, ctx)
ps.Mach.Add1(ss.Joining, nil)

Remote Workers

import (
    ma "github.com/multiformats/go-multiaddr"
    ampubsub "github.com/pancsta/asyncmachine-go/pkg/pubsub"
)

var ss = states.TopicStates

// ...

// var remotePeerId string
// var ps *ampusub.Topic

// list machines exported by [remotePeerId]
ch := make(chan []*rpc.NetworkMachine, 1)
args := &A{
    WorkersCh: ch,
    ListFilters: &ListFilters{
        PeerId: remotePeerId,
    },
}
_ = ps.Mach.Add1(ss.ListMachines, Pass(args))
workers := <-ch
close(ch)s

// find a worker tagged "foo", add state "Bar", and wait for state "Baz"
for _, mach := range workers {
    if amhelp.TagValue(mach.Tags(), "foo") == "" {
        continue
    }
    mach.Add1("Bar", nil)
    println("Bar set on " + mach.Id())

    <-mach.When1("Baz", nil)
    println("Baz active on " + mach.Id())
    break
}

Status

Alpha, work in progress, not semantically versioned.

Credits

monorepo

Go back to the monorepo root to continue reading.

Documentation

Index

Constants

View Source
const (
	// EnvAmPubsubLog enables machine logging for pubsub.
	EnvAmPubsubLog = "AM_PUBSUB_LOG"
	// EnvAmPubsubDbg exposes PubSub workers over dbg telemetry.
	EnvAmPubsubDbg = "AM_PUBSUB_DBG"
)
View Source
const APrefix = "am_pubsub"

Variables

View Source
var (
	ErrJoining   = errors.New("error joining")
	ErrListening = errors.New("error listening")
)

Functions

func AddErrJoining

func AddErrJoining(
	event *am.Event, mach *am.Machine, err error, args am.A,
) error

AddErrJoining wraps an error in the ErrJoining sentinel and adds to a machine.

func AddErrListening

func AddErrListening(
	event *am.Event, mach *am.Machine, err error, args am.A,
) error

AddErrListening wraps an error in the ErrListening sentinel and adds to a machine.

func LogArgs

func LogArgs(args am.A) map[string]string

LogArgs is an args logger for A and pubsub.A.

func Pass

func Pass(args *A) am.A

Pass prepares am.A from A to pass to further mutations.

Types

type A

type A struct {
	// MsgInfo happens when a peer introduces it's exposed state machines.
	MsgInfo       *MsgInfo
	MsgUpdates    *MsgUpdates
	MsgBye        *MsgBye
	MsgReqInfo    *MsgReqInfo
	MsgReqUpdates *MsgReqUpdates
	MsgGossip     *MsgGossip
	// Msgs is a list of PubSub messages.
	Msgs []*pubsub.Message
	// Length is a general length used for logging
	Length int `log:"length"`
	// Msg is a raw msg.
	Msg []byte
	// PeerId is a peer ID
	Peer    string `log:"peer"`
	PeerId  string
	PeerIds []string
	// MachId is a state machine ID
	MachId string `log:"mach_id"`
	// MTime is machine time
	MTime am.Time `log:"mtime"`
	// HTime is human time
	HTime time.Time
	// Addrs is a list of addresses.
	Addrs []multiaddr.Multiaddr `log:"addrs"`
	// NetMachs is a return channel for a list of [rpc.NetworkMachine]. It has to
	// be buffered or the mutation will fail.
	NetMachs    chan<- []*rpc.NetworkMachine
	ListFilters *ListFilters
	MachClocks  MachClocks
	MsgType     string `log:"msg_type"`
	PeersGossip PeerGossips
}

A is a struct for node arguments. It's a typesafe alternative to am.A.

func ParseArgs

func ParseArgs(args am.A) *A

ParseArgs extracts A from am.Event.ArgsAPrefix.

type Gossips

type Gossips map[int]uint64

type Info

type Info struct {
	Id     string
	Schema am.Schema
	States am.S
	MTime  am.Time
	Tags   []string
	Parent string
}

Info is sent when a peer exposes state machines in the topic.

type ListFilters

type ListFilters struct {
	IdExact   string
	IdPartial string
	IdRegexp  *regexp.Regexp
	Parent    string
	PeerId    string
	// Level to traverse towards the tree root.
	DepthLevel  int
	ChildrenMax int
	ChildrenMin int
}

TODO merge with REPL, add tag-based queries (to find workers)

type MachClocks

type MachClocks map[int]am.Time

Machine clocks indexed by a local peer index.

type MachInfo

type MachInfo map[int]*Info

type Msg

type Msg struct {
	Type MsgType
}

type MsgBye

type MsgBye struct {
	Msg
	Machs map[int]MsgByeMach
}

TODO implement

type MsgByeMach

type MsgByeMach struct {
	PeerBye bool
	Id      string
	MTime   am.Time
}

MsgByeMach is sent when a peer un-exposes state machines in the topic.

type MsgGossip

type MsgGossip struct {
	Msg
	// Number of workers from each peer
	PeerGossips PeerGossips
}

type MsgInfo

type MsgInfo struct {
	Msg
	// peer ID => mach idx => schema
	PeerInfo PeerInfo
	// Number of workers from each peer
	PeerGossips PeerGossips
}

type MsgReqInfo

type MsgReqInfo struct {
	Msg
	PeerIds []string
}

type MsgReqUpdates

type MsgReqUpdates struct {
	Msg
	// TODO list specific mach indexes
	PeerIds []string
}

type MsgType

type MsgType string
const (
	MsgTypeUpdates    MsgType = "updates"
	MsgTypeBye        MsgType = "bye"
	MsgTypeInfo       MsgType = "info"
	MsgTypeReqInfo    MsgType = "reqinfo"
	MsgTypeReqUpdates MsgType = "requpdates"
	MsgTypeGossip     MsgType = "gossip"
)

type MsgUpdates

type MsgUpdates struct {
	Msg
	PeerClocks PeerClocks
}

type PeerClocks

type PeerClocks map[string]MachClocks

Machine clocks indexed by a local peer index.

type PeerGossips

type PeerGossips map[string]Gossips

PeerGossips is peer ID => machine time sum TODO check local time sum

type PeerInfo

type PeerInfo map[string]MachInfo

type Topic

type Topic struct {
	*am.ExceptionHandler

	// T represents the PubSub topic used for communication.
	T *ps.Topic
	// Mach is a state machine for this PubSub.
	Mach        *am.Machine
	MachMetrics atomic.Pointer[am.Machine]
	HostToPeer  map[string]string
	// ListenAddrs contains the list of multiaddresses that this topic listens on.
	// None will allocate one automatically.
	ListenAddrs []ma.Multiaddr
	// Addrs is a list of addresses for this peer.
	Addrs atomic.Pointer[[]ma.Multiaddr]
	// Name indicates the name of the channel or topic instance,
	// typically associated with a given process.
	Name string
	// ConnAddrs contains a list of addresses for initial
	// connections to discovery nodes in the network.
	ConnAddrs []ma.Multiaddr
	// List of exposed state machines, index => mach_id, indexes are used on the
	// channel. Indexes should NOT change, as they are used for addressing. `nil`
	// value means empty.
	ExposedMachs []*am.Machine
	// Debounce for clock broadcasts (separate per each exposed state machine).
	Debounce   time.Time
	LogEnabled bool
	// HeartbeatFreq broadcasts changed clocks of exposed machines.
	HeartbeatFreq time.Duration
	// Maximum msgs per minute sent to the network. Does not include MsgInfo,
	// which are debounced.
	MaxMsgsPerWin int
	// DbgNetMachs exposes local network machines via to am-dbg
	DbgNetMachs        bool
	ConnectionsToReady int
	// all amounts and delayes are multiplied by this factor
	Multiplayer        int
	SendInfoDebounceMs int
	// Max allowed queue length to send MsgInfo to newly joned peers, as well as
	// received msgs.
	MaxQueueLen uint16
	// Number of gossips to send in SendGossipsState
	GossipAmount int

	// Use this hardcoded schema instead of exchanging real ones. Limits all the
	// workers to this one.
	// TODO catalog of named schemas, exchange unknown ones
	TestSchema am.Schema
	TestStates am.S
	// contains filtered or unexported fields
}

Topic is a single-topic PubSub based on lib2p-pubsub with manual discovery. Each peer can have many exposed state machines and broadcasts their clock changes on the channel, introduces them when joining and via private messages to newly joined peers. TODO optimizations:

  • avoid txs which will get cancelled (CPU)
  • rate limit the TOTAL amount of msgs in the network (CPU, network)
  • hook into libp2p errors, eg queue full, and act accordingly

func NewTopic

func NewTopic(
	ctx context.Context, name, suffix string, exposedMachs []*am.Machine,
	opts *TopicOpts,
) (*Topic, error)

func (*Topic) ConnCount

func (t *Topic) ConnCount() int

func (*Topic) ConnectingEnter

func (t *Topic) ConnectingEnter(e *am.Event) bool

func (*Topic) ConnectingState

func (t *Topic) ConnectingState(e *am.Event)

func (*Topic) Dispose

func (t *Topic) Dispose() am.Result

func (*Topic) DoSendInfoEnter added in v0.12.0

func (t *Topic) DoSendInfoEnter(e *am.Event) bool

func (*Topic) DoSendInfoState added in v0.12.0

func (t *Topic) DoSendInfoState(e *am.Event)

func (*Topic) ExceptionEnter added in v0.13.0

func (t *Topic) ExceptionEnter(e *am.Event) bool

func (*Topic) ExceptionState

func (t *Topic) ExceptionState(e *am.Event)

func (*Topic) GetPeerAddrs

func (t *Topic) GetPeerAddrs() ([]ma.Multiaddr, error)

func (*Topic) HeartbeatState

func (t *Topic) HeartbeatState(e *am.Event)

func (*Topic) Join added in v0.10.2

func (t *Topic) Join() am.Result

func (*Topic) JoinedEnd

func (t *Topic) JoinedEnd(e *am.Event)

func (*Topic) JoinedState

func (t *Topic) JoinedState(e *am.Event)

func (*Topic) JoiningState

func (t *Topic) JoiningState(e *am.Event)

func (*Topic) ListMachinesEnter

func (t *Topic) ListMachinesEnter(e *am.Event) bool

func (*Topic) ListMachinesState

func (t *Topic) ListMachinesState(e *am.Event)

func (*Topic) MissPeersByGossipEnter

func (t *Topic) MissPeersByGossipEnter(e *am.Event) bool

func (*Topic) MissPeersByGossipExit

func (t *Topic) MissPeersByGossipExit(e *am.Event) bool

func (*Topic) MissPeersByGossipState

func (t *Topic) MissPeersByGossipState(e *am.Event)

func (*Topic) MissPeersByUpdatesEnter

func (t *Topic) MissPeersByUpdatesEnter(e *am.Event) bool

func (*Topic) MissPeersByUpdatesExit

func (t *Topic) MissPeersByUpdatesExit(e *am.Event) bool

func (*Topic) MissPeersByUpdatesState

func (t *Topic) MissPeersByUpdatesState(e *am.Event)

func (*Topic) MissUpdatesByGossipEnter

func (t *Topic) MissUpdatesByGossipEnter(e *am.Event) bool

func (*Topic) MissUpdatesByGossipExit

func (t *Topic) MissUpdatesByGossipExit(e *am.Event) bool

func (*Topic) MissUpdatesByGossipState

func (t *Topic) MissUpdatesByGossipState(e *am.Event)

func (*Topic) MsgByeEnter

func (t *Topic) MsgByeEnter(e *am.Event) bool

func (*Topic) MsgInfoEnter

func (t *Topic) MsgInfoEnter(e *am.Event) bool

func (*Topic) MsgInfoState

func (t *Topic) MsgInfoState(e *am.Event)

func (*Topic) MsgReceivedEnter

func (t *Topic) MsgReceivedEnter(e *am.Event) bool

func (*Topic) MsgReqInfoEnter

func (t *Topic) MsgReqInfoEnter(e *am.Event) bool

func (*Topic) MsgReqInfoState

func (t *Topic) MsgReqInfoState(e *am.Event)

func (*Topic) MsgReqUpdatesEnter

func (t *Topic) MsgReqUpdatesEnter(e *am.Event) bool

func (*Topic) MsgReqUpdatesState

func (t *Topic) MsgReqUpdatesState(e *am.Event)

TODO this is never fired

func (*Topic) MsgUpdatesEnter

func (t *Topic) MsgUpdatesEnter(e *am.Event) bool

func (*Topic) MsgUpdatesState

func (t *Topic) MsgUpdatesState(e *am.Event)

func (*Topic) PeerJoinedEnter

func (t *Topic) PeerJoinedEnter(e *am.Event) bool

func (*Topic) PeerJoinedState

func (t *Topic) PeerJoinedState(e *am.Event)

func (*Topic) PeerLeftEnter

func (t *Topic) PeerLeftEnter(e *am.Event) bool

func (*Topic) ProcessMsgsState added in v0.13.0

func (t *Topic) ProcessMsgsState(e *am.Event)

func (*Topic) ReadyEnter

func (t *Topic) ReadyEnter(e *am.Event) bool

TODO exit

func (*Topic) ReqMissingPeersEnter added in v0.12.0

func (t *Topic) ReqMissingPeersEnter(e *am.Event) bool

func (*Topic) ReqMissingPeersState added in v0.12.0

func (t *Topic) ReqMissingPeersState(e *am.Event)

func (*Topic) ReqMissingUpdatesEnter added in v0.12.0

func (t *Topic) ReqMissingUpdatesEnter(e *am.Event) bool

func (*Topic) ReqMissingUpdatesState added in v0.12.0

func (t *Topic) ReqMissingUpdatesState(e *am.Event)

func (*Topic) SendGossipsEnter added in v0.12.0

func (t *Topic) SendGossipsEnter(e *am.Event) bool

func (*Topic) SendGossipsState added in v0.12.0

func (t *Topic) SendGossipsState(e *am.Event)

func (*Topic) SendInfoEnter

func (t *Topic) SendInfoEnter(e *am.Event) bool

func (*Topic) SendInfoState

func (t *Topic) SendInfoState(e *am.Event)

func (*Topic) SendMsgEnter

func (t *Topic) SendMsgEnter(e *am.Event) bool

func (*Topic) SendMsgState

func (t *Topic) SendMsgState(e *am.Event)

func (*Topic) SendUpdatesEnter added in v0.13.0

func (t *Topic) SendUpdatesEnter(e *am.Event) bool

func (*Topic) SendUpdatesState added in v0.12.0

func (t *Topic) SendUpdatesState(e *am.Event)

func (*Topic) Start

func (t *Topic) Start() am.Result

func (*Topic) StartAndJoin added in v0.10.2

func (t *Topic) StartAndJoin(ctx context.Context) am.Result

func (*Topic) StartEnd

func (t *Topic) StartEnd(e *am.Event)

func (*Topic) StartEnter

func (t *Topic) StartEnter(e *am.Event) bool

func (*Topic) StartState

func (t *Topic) StartState(e *am.Event)

type TopicOpts

type TopicOpts struct {
	// Parent is a parent state machine for a new Topic state machine. See
	// [am.Opts].
	Parent am.Api
}

type Tracer

type Tracer struct {
	*am.TracerNoOp
	// contains filtered or unexported fields
}

func (*Tracer) TransitionEnd

func (t *Tracer) TransitionEnd(transition *am.Transition)

TransitionEnd sends a message when a transition ends

Directories

Path Synopsis
Package states contains a stateful schema-v2 for Topic.
Package states contains a stateful schema-v2 for Topic.
Package uds was auto-translated from rust-libp2p.
Package uds was auto-translated from rust-libp2p.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL