Documentation
¶
Index ¶
- Constants
- Variables
- func NewStream(conn net.Conn, config *Config) net.Conn
- func NewTransport(ctx context.Context, wg *sync.WaitGroup, config *Config, bindAddress Address) (*transport, error)
- type Address
- type ApplicationVersionCheck
- type Cluster
- func (c *Cluster) AliveNodes() []*Node
- func (c *Cluster) CalcFanOut() int
- func (c *Cluster) CalcPayloadSize(totalItems int) int
- func (c *Cluster) DisableStreamCompression(s net.Conn) *Cluster
- func (c *Cluster) EnableStreamCompression(s net.Conn) *Cluster
- func (c *Cluster) GetCandidates() []*Node
- func (c *Cluster) GetNode(id NodeID) *Node
- func (c *Cluster) GetNodeByIDString(id string) *Node
- func (c *Cluster) HandleFunc(msgType MessageType, handler Handler) error
- func (c *Cluster) HandleFuncNoForward(msgType MessageType, handler Handler) error
- func (c *Cluster) HandleFuncWithReply(msgType MessageType, replyHandler ReplyHandler) error
- func (c *Cluster) HandleGossipFunc(handler GossipHandler) HandlerID
- func (c *Cluster) HandleNodeMetadataChangeFunc(handler NodeMetadataChangeHandler) HandlerID
- func (c *Cluster) HandleNodeStateChangeFunc(handler NodeStateChangeHandler) HandlerID
- func (c *Cluster) HandleStreamFunc(msgType MessageType, handler StreamHandler) error
- func (c *Cluster) Join(peers []string) error
- func (c *Cluster) Leave()
- func (c *Cluster) LocalMetadata() *Metadata
- func (c *Cluster) LocalNode() *Node
- func (c *Cluster) Logger() Logger
- func (c *Cluster) NodeIsLocal(node *Node) bool
- func (c *Cluster) Nodes() []*Node
- func (c *Cluster) NodesToIDs(nodes []*Node) []NodeID
- func (c *Cluster) NumAliveNodes() int
- func (c *Cluster) NumDeadNodes() int
- func (c *Cluster) NumNodes() int
- func (c *Cluster) NumSuspectNodes() int
- func (c *Cluster) OpenStream(dstNode *Node, msgType MessageType, payload interface{}) (net.Conn, error)
- func (c *Cluster) ReadStreamMsg(conn net.Conn, expectedMsgType MessageType, payload interface{}) error
- func (c *Cluster) RemoveGossipHandler(id HandlerID) bool
- func (c *Cluster) RemoveNodeMetadataChangeHandler(id HandlerID) bool
- func (c *Cluster) RemoveNodeStateChangeHandler(id HandlerID) bool
- func (c *Cluster) ResolveAddress(addressStr string) ([]Address, error)
- func (c *Cluster) Send(msgType MessageType, data interface{}) error
- func (c *Cluster) SendExcluding(msgType MessageType, data interface{}, excludeNodes []NodeID) error
- func (c *Cluster) SendReliable(msgType MessageType, data interface{}) error
- func (c *Cluster) SendReliableExcluding(msgType MessageType, data interface{}, excludeNodes []NodeID) error
- func (c *Cluster) SendTo(dstNode *Node, msgType MessageType, data interface{}) error
- func (c *Cluster) SendToPeers(dstNodes []*Node, msgType MessageType, data interface{}) error
- func (c *Cluster) SendToPeersReliable(dstNodes []*Node, msgType MessageType, data interface{}) error
- func (c *Cluster) SendToReliable(dstNode *Node, msgType MessageType, data interface{}) error
- func (c *Cluster) SendToWithResponse(dstNode *Node, msgType MessageType, payload interface{}, ...) error
- func (c *Cluster) Start()
- func (c *Cluster) Stop()
- func (c *Cluster) UnregisterMessageType(msgType MessageType) bool
- func (c *Cluster) UpdateMetadata() error
- func (c *Cluster) WebsocketHandler(w http.ResponseWriter, r *http.Request)
- func (c *Cluster) WriteStreamMsg(conn net.Conn, msgType MessageType, payload interface{}) error
- type Config
- type DataNodeGroup
- func (dng *DataNodeGroup[T]) Close()
- func (dng *DataNodeGroup[T]) Contains(nodeID NodeID) bool
- func (dng *DataNodeGroup[T]) Count() int
- func (dng *DataNodeGroup[T]) GetDataNodes() []*T
- func (dng *DataNodeGroup[T]) GetNodeData(nodeID NodeID) *T
- func (dng *DataNodeGroup[T]) GetNodes(excludeIDs []NodeID) []*Node
- func (dng *DataNodeGroup[T]) SendToPeers(msgType MessageType, data interface{}) error
- func (dng *DataNodeGroup[T]) SendToPeersReliable(msgType MessageType, data interface{}) error
- func (dng *DataNodeGroup[T]) UpdateNodeData(nodeID NodeID, updateFn func(*Node, *T) error) error
- type DataNodeGroupOptions
- type EventHandlers
- type GossipHandler
- type Handler
- type HandlerID
- type Logger
- type MessageID
- type MessageType
- type Metadata
- func (md *Metadata) Delete(key string)
- func (md *Metadata) Exists(key string) bool
- func (md *Metadata) GetAll() map[string]interface{}
- func (md *Metadata) GetAllAsString() map[string]string
- func (md *Metadata) GetAllKeys() []string
- func (md *Metadata) GetBool(key string) bool
- func (md *Metadata) GetFloat32(key string) float32
- func (md *Metadata) GetFloat64(key string) float64
- func (md *Metadata) GetInt(key string) int
- func (md *Metadata) GetInt32(key string) int32
- func (md *Metadata) GetInt64(key string) int64
- func (md *Metadata) GetString(key string) string
- func (md *Metadata) GetTime(key string) time.Time
- func (md *Metadata) GetTimestamp() hlc.Timestamp
- func (md *Metadata) GetUint(key string) uint
- func (md *Metadata) GetUint32(key string) uint32
- func (md *Metadata) GetUint64(key string) uint64
- func (md *Metadata) SetBool(key string, value bool) *Metadata
- func (md *Metadata) SetFloat32(key string, value float32) *Metadata
- func (md *Metadata) SetFloat64(key string, value float64) *Metadata
- func (md *Metadata) SetInt(key string, value int) *Metadata
- func (md *Metadata) SetInt32(key string, value int32) *Metadata
- func (md *Metadata) SetInt64(key string, value int64) *Metadata
- func (md *Metadata) SetString(key, value string) *Metadata
- func (md *Metadata) SetTime(key string, value time.Time) *Metadata
- func (md *Metadata) SetUint(key string, value uint) *Metadata
- func (md *Metadata) SetUint32(key string, value uint32) *Metadata
- func (md *Metadata) SetUint64(key string, value uint64) *Metadata
- type MetadataReader
- type Node
- type NodeGroup
- func (ng *NodeGroup) Close()
- func (ng *NodeGroup) Contains(nodeID NodeID) bool
- func (ng *NodeGroup) Count() int
- func (ng *NodeGroup) GetNodes(excludeIDs []NodeID) []*Node
- func (ng *NodeGroup) SendToPeers(msgType MessageType, data interface{}) error
- func (ng *NodeGroup) SendToPeersReliable(msgType MessageType, data interface{}) error
- type NodeGroupOptions
- type NodeID
- type NodeMetadataChangeHandler
- type NodeState
- type NodeStateChangeHandler
- type NodeWithData
- type NullLogger
- func (l *NullLogger) Debugf(format string, args ...interface{})
- func (l *NullLogger) Err(err error) Logger
- func (l *NullLogger) Errorf(format string, args ...interface{})
- func (l *NullLogger) Field(key string, value interface{}) Logger
- func (l *NullLogger) Infof(format string, args ...interface{})
- func (l *NullLogger) Tracef(format string, args ...interface{})
- func (l *NullLogger) Warnf(format string, args ...interface{})
- type Packet
- type ReplyHandler
- type Stream
- func (s *Stream) Close() error
- func (s *Stream) DisableCompression() *Stream
- func (s *Stream) EnableCompression() *Stream
- func (s *Stream) LocalAddr() net.Addr
- func (s *Stream) Read(b []byte) (n int, err error)
- func (s *Stream) RemoteAddr() net.Addr
- func (s *Stream) SetDeadline(t time.Time) error
- func (s *Stream) SetReadDeadline(t time.Time) error
- func (s *Stream) SetWriteDeadline(t time.Time) error
- func (s *Stream) Write(b []byte) (n int, err error)
- type StreamHandler
- type Transport
- type TransportType
Constants ¶
const ( MetadataAnyValue = "*" MetadataContainsPrefix = "~" )
const (
PROTOCOL_VERSION = 1
)
Variables ¶
var EmptyNodeID = NodeID(uuid.Nil)
var (
ErrNoTransportAvailable = fmt.Errorf("no transport available") // When there's no available transport between two nodes
)
var (
ErrPacketTooLarge = errors.New("packet exceeds maximum allowed size")
)
var (
ErrUnsupportedAddressFormat = fmt.Errorf("unsupported address format")
)
Functions ¶
Types ¶
type Address ¶
type ApplicationVersionCheck ¶
ApplicationVersionCheck is a function that checks if an application version is compatible
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
func NewCluster ¶
func (*Cluster) AliveNodes ¶
func (*Cluster) CalcFanOut ¶
CalcFanOut determines how many peers should receive a message when broadcasting information throughout the cluster. It implements a logarithmic scaling approach to balance network traffic and propagation speed.
func (*Cluster) CalcPayloadSize ¶
CalcPayloadSize determines how many items should be included in a gossip payload. This is used when exchanging state information to limit the size of individual gossip messages.
This is meant for controlling the volume of data, not the number of nodes to contact.
func (*Cluster) DisableStreamCompression ¶
Checks the connections is of Stream type and disables compression if supported This is a no-op if the connection is not a Stream
func (*Cluster) EnableStreamCompression ¶
Checks the connections is of Stream type and enables compression if supported This is a no-op if the connection is not a Stream
func (*Cluster) GetCandidates ¶
Get a random subset of nodes to use for gossiping or exchanging states with, excluding ourselves
func (*Cluster) GetNodeByIDString ¶
func (*Cluster) HandleFunc ¶
func (c *Cluster) HandleFunc(msgType MessageType, handler Handler) error
Registers a handler to accept a message and automatically forward it to other nodes
func (*Cluster) HandleFuncNoForward ¶
func (c *Cluster) HandleFuncNoForward(msgType MessageType, handler Handler) error
Registers a handler to accept a message without automatically forwarding it to other nodes
func (*Cluster) HandleFuncWithReply ¶
func (c *Cluster) HandleFuncWithReply(msgType MessageType, replyHandler ReplyHandler) error
Registers a handler to accept a message and reply to the sender, always uses the reliable transport
func (*Cluster) HandleGossipFunc ¶
func (c *Cluster) HandleGossipFunc(handler GossipHandler) HandlerID
func (*Cluster) HandleNodeMetadataChangeFunc ¶
func (c *Cluster) HandleNodeMetadataChangeFunc(handler NodeMetadataChangeHandler) HandlerID
func (*Cluster) HandleNodeStateChangeFunc ¶
func (c *Cluster) HandleNodeStateChangeFunc(handler NodeStateChangeHandler) HandlerID
func (*Cluster) HandleStreamFunc ¶
func (c *Cluster) HandleStreamFunc(msgType MessageType, handler StreamHandler) error
Registers a handler to accept a message and open a stream between sender and destination, always uses the reliable transport
func (*Cluster) Leave ¶
func (c *Cluster) Leave()
MMarks the local node as leaving and broadcasts this state to the cluster
func (*Cluster) LocalMetadata ¶
Get the local nodes metadata for read and write access
func (*Cluster) NodeIsLocal ¶
func (*Cluster) NodesToIDs ¶
func (*Cluster) NumAliveNodes ¶
Get the number of nodes that are currently alive
func (*Cluster) NumDeadNodes ¶
Get the number of nodes that are currently dead
func (*Cluster) NumSuspectNodes ¶
Get the number of nodes that are currently suspect
func (*Cluster) OpenStream ¶
func (*Cluster) ReadStreamMsg ¶
func (c *Cluster) ReadStreamMsg(conn net.Conn, expectedMsgType MessageType, payload interface{}) error
ReadStreamMsg reads a message from the stream and unmarshals it into the provided payload. It expects the message to have the specific msgType.
func (*Cluster) RemoveGossipHandler ¶
func (*Cluster) RemoveNodeMetadataChangeHandler ¶
func (*Cluster) RemoveNodeStateChangeHandler ¶
func (*Cluster) ResolveAddress ¶
func (*Cluster) Send ¶
func (c *Cluster) Send(msgType MessageType, data interface{}) error
func (*Cluster) SendExcluding ¶
func (c *Cluster) SendExcluding(msgType MessageType, data interface{}, excludeNodes []NodeID) error
func (*Cluster) SendReliable ¶
func (c *Cluster) SendReliable(msgType MessageType, data interface{}) error
func (*Cluster) SendReliableExcluding ¶
func (c *Cluster) SendReliableExcluding(msgType MessageType, data interface{}, excludeNodes []NodeID) error
func (*Cluster) SendTo ¶
func (c *Cluster) SendTo(dstNode *Node, msgType MessageType, data interface{}) error
func (*Cluster) SendToPeers ¶
func (c *Cluster) SendToPeers(dstNodes []*Node, msgType MessageType, data interface{}) error
func (*Cluster) SendToPeersReliable ¶
func (c *Cluster) SendToPeersReliable(dstNodes []*Node, msgType MessageType, data interface{}) error
func (*Cluster) SendToReliable ¶
func (c *Cluster) SendToReliable(dstNode *Node, msgType MessageType, data interface{}) error
func (*Cluster) SendToWithResponse ¶
func (c *Cluster) SendToWithResponse(dstNode *Node, msgType MessageType, payload interface{}, responsePayload interface{}) error
Send a message to the peer then accept a response message. Uses a TCP connection to send the packet and receive the response.
func (*Cluster) UnregisterMessageType ¶
func (c *Cluster) UnregisterMessageType(msgType MessageType) bool
func (*Cluster) UpdateMetadata ¶
Send a metadata update to the cluster.
func (*Cluster) WebsocketHandler ¶
func (c *Cluster) WebsocketHandler(w http.ResponseWriter, r *http.Request)
Handler for incoming WebSocket connections when gossiping over web sockets
func (*Cluster) WriteStreamMsg ¶
func (c *Cluster) WriteStreamMsg(conn net.Conn, msgType MessageType, payload interface{}) error
WriteStreamMsg writes a message directly to the stream with a simple framing protocol: [2 bytes MessageType][4 bytes payload length][payload bytes]
type Config ¶
type Config struct {
NodeID string // NodeID is the unique identifier for the node in the cluster, "" to generate a new one
BindAddr string // BindAddr is the address and port to bind to
// AdvertiseAddr is the address and port to advertise to other nodes, if this is given as a domain name
// it will be resolved to an IP address and used as the advertised address
// If this is prefixed with srv+ then a SRV record will be used to resolve the address to an IP and port
// If not given the BindAddr will be used.
AdvertiseAddr string
ApplicationVersion string // ApplicationVersion is the version of the application, used for compatibility checks
DefaultPort int // DefaultPort is the default port to use for the node
EncryptionKey []byte // Encryption key for the messages, must be either 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
Transport Transport // Transport layer to communicate over UDP or TCP
Logger Logger // Logger is the logger to use for logging messages
MsgCodec codec.Serializer // The codec to use for encoding and decoding messages
Compressor compression.Codec // The codec to use for compressing and decompressing messages, if not given messages will not be compressed
CompressMinSize int // The minimum size of a message before attempting to compress it
WebsocketProvider websocket.Provider // The provider to use for WebSocket connections
BearerToken string // Bearer token to use for authentication, if not given no authentication will be used
AllowInsecureWebsockets bool // Whether to allow insecure WebSocket connections (ws://)
SocketTransportEnabled bool // Whether to use the socket transport layer (TCP/UDP)
Cipher encryption.Cipher // The cipher to use for encrypting and decrypting messages
ApplicationVersionCheck ApplicationVersionCheck // The application version check to use for checking compatibility with other nodes
GossipInterval time.Duration // How often to send gossip messages
GossipMaxInterval time.Duration // Maximum interval between gossip messages
TCPDialTimeout time.Duration // TCPDialTimeout is the duration to wait for a TCP connection to be established
TCPDeadline time.Duration // TCPDeadline is the duration to wait for a TCP operation to complete
UDPDeadline time.Duration // UDPDeadline is the duration to wait for a UDP operation to complete
UDPMaxPacketSize int // UDPMaxSize is the maximum size of a UDP packet in bytes
TCPMaxPacketSize int // TCPMaxSize is the maximum size of a TCP packet in bytes
StreamMaxPacketSize int // StreamMaxSize is the maximum size of a stream packet in bytes
MsgHistoryGCInterval time.Duration // MsgHistoryGCInterval is the duration between garbage collection operations
MsgHistoryMaxAge time.Duration // MsgHistoryMaxAge is the maximum age of a message in the history
MsgHistoryShardCount int // MessageHistoryShardCount is the number of shards to use for storing message history, 16 for up to 50 nodes, 32 for up to 500 nodes and 64 for larger clusters.
NodeShardCount int // NodeShardCount is the number of shards to use for storing node information, 4 for up to 50 nodes, 16 for up to 500 nodes and 32 for larger clusters.
NumSendWorkers int // The number of workers to use for sending messages
SendQueueSize int // SendQueueSize is the size of the send queue
NumIncomingWorkers int // The number of workers to use for processing incoming messages
IncomingPacketQueueDepth int // Depth of the queue for incoming packets
HealthCheckInterval time.Duration // How often to perform health checks
HealthCheckSampleSize int // Number of random nodes to check each interval
ActivityThresholdPercent float64 // Percentage of activity threshold for a node to be considered alive, multiplied with HealthCheckInterval
SuspectThreshold int // Number of consecutive failures before marking suspect
SuspectTimeout time.Duration // How long a node can be suspect before final check
DeadNodeTimeout time.Duration // How long to keep dead nodes before removal
RefutationThreshold int // Number of peers refuting suspicion to restore node
EnableIndirectPings bool // Whether to use indirect pings
PingTimeout time.Duration // Timeout for ping operations, should be less than HealthCheckInterval
MaxParallelSuspectEvaluations int // Max number of parallel evaluations for suspect nodes
StateSyncInterval time.Duration // How often to perform state synchronization with peers
FanOutMultiplier float64 // Scale of peer count for broadcast messages
StateExchangeMultiplier float64 // Scale of peer sampling for state exchange messages
IndirectPingMultiplier float64 // Scale of peer sampling for indirect ping messages
TTLMultiplier float64 // Multiplier for TTL, used to determine how many hops a message can take
}
func DefaultConfig ¶
func DefaultConfig() *Config
type DataNodeGroup ¶
type DataNodeGroup[T any] struct { // contains filtered or unexported fields }
DataNodeGroup groups nodes based on meta data and stores custom data with the nodes.
func NewDataNodeGroup ¶
func NewDataNodeGroup[T any]( cluster *Cluster, criteria map[string]string, options *DataNodeGroupOptions[T], ) *DataNodeGroup[T]
NewDataNodeGroup creates a new data node group
func (*DataNodeGroup[T]) Close ¶
func (dng *DataNodeGroup[T]) Close()
Close unregisters event handlers to allow for garbage collection
func (*DataNodeGroup[T]) Contains ¶
func (dng *DataNodeGroup[T]) Contains(nodeID NodeID) bool
Contains checks if a node with the given ID is in this group
func (*DataNodeGroup[T]) Count ¶
func (dng *DataNodeGroup[T]) Count() int
Count returns the number of alive nodes in this group
func (*DataNodeGroup[T]) GetDataNodes ¶
func (dng *DataNodeGroup[T]) GetDataNodes() []*T
GetDataNodes returns all alive nodes custom data
func (*DataNodeGroup[T]) GetNodeData ¶
func (dng *DataNodeGroup[T]) GetNodeData(nodeID NodeID) *T
GetNodeData returns a node's custom data if it exists in the group
func (*DataNodeGroup[T]) GetNodes ¶
func (dng *DataNodeGroup[T]) GetNodes(excludeIDs []NodeID) []*Node
GetNodes returns all alive nodes in this group, excluding specified node IDs
func (*DataNodeGroup[T]) SendToPeers ¶
func (dng *DataNodeGroup[T]) SendToPeers(msgType MessageType, data interface{}) error
SendToPeers sends a message to all peers in the group and if necessary gossips to random peers.
func (*DataNodeGroup[T]) SendToPeersReliable ¶
func (dng *DataNodeGroup[T]) SendToPeersReliable(msgType MessageType, data interface{}) error
SendToPeersReliable sends a message to all peers in the group reliably and if necessary gossips to random peers.
func (*DataNodeGroup[T]) UpdateNodeData ¶
func (dng *DataNodeGroup[T]) UpdateNodeData(nodeID NodeID, updateFn func(*Node, *T) error) error
UpdateNodeData allows updating a node's custom data safely
type DataNodeGroupOptions ¶
type DataNodeGroupOptions[T any] struct { // Callbacks OnNodeAdded func(node *Node, data *T) OnNodeRemoved func(node *Node, data *T) OnNodeUpdated func(node *Node, data *T) // Function to initialize custom data for a node DataInitializer func(node *Node) *T }
DataNodeGroupOptions contains configuration options
type EventHandlers ¶
type EventHandlers[T any] struct { // contains filtered or unexported fields }
EventHandlers manages a collection of handlers of a specific type
func NewEventHandlers ¶
func NewEventHandlers[T any]() *EventHandlers[T]
NewEventHandlers creates a new handler collection for any type
func (*EventHandlers[T]) Add ¶
func (l *EventHandlers[T]) Add(handler T) HandlerID
Add registers a handler and returns its ID
func (*EventHandlers[T]) ForEach ¶
func (l *EventHandlers[T]) ForEach(fn func(T))
ForEach executes a function for each handler (optimized for iteration)
func (*EventHandlers[T]) GetHandlers ¶
func (l *EventHandlers[T]) GetHandlers() []T
GetHandlers returns all registered handlers (lock-free read)
func (*EventHandlers[T]) Remove ¶
func (l *EventHandlers[T]) Remove(id HandlerID) bool
Remove unregisters a handler by its ID
type GossipHandler ¶
type GossipHandler func()
type Logger ¶
type Logger interface {
Tracef(format string, args ...interface{})
Debugf(format string, args ...interface{})
Infof(format string, args ...interface{})
Warnf(format string, args ...interface{})
Errorf(format string, args ...interface{})
Field(key string, value interface{}) Logger
Err(err error) Logger
}
Logger is a generic logger interface similar to BadgerDB's logger
type MessageType ¶
type MessageType uint16
const ( ReservedMsgsStart MessageType = 64 // Reserved for future use UserMsg MessageType = 128 // User messages start here )
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
Metadata holds node metadata with type-safe accessors
func (*Metadata) GetAllAsString ¶
GetAllAsString returns all metadata as string values
func (*Metadata) GetAllKeys ¶
GetAllKeys returns all keys in the metadata
func (*Metadata) GetFloat32 ¶
GetFloat32 returns a 32-bit float value
func (*Metadata) GetFloat64 ¶
GetFloat64 returns a 64-bit float value (our base float conversion method)
func (*Metadata) GetInt64 ¶
GetInt64 returns a 64-bit integer value (our base integer conversion method)
func (*Metadata) GetTimestamp ¶
GetTimestamp returns the last modification timestamp
func (*Metadata) GetUint64 ¶
GetUint64 returns a 64-bit unsigned integer value (our base unsigned conversion method)
func (*Metadata) SetFloat32 ¶
SetFloat32 sets a 32-bit float value
func (*Metadata) SetFloat64 ¶
SetFloat64 sets a 64-bit float value
type MetadataReader ¶
type MetadataReader interface {
GetString(key string) string
GetBool(key string) bool
GetInt(key string) int
GetInt32(key string) int32
GetInt64(key string) int64
GetUint(key string) uint
GetUint32(key string) uint32
GetUint64(key string) uint64
GetFloat32(key string) float32
GetFloat64(key string) float64
GetTime(key string) time.Time
GetTimestamp() hlc.Timestamp
GetAll() map[string]interface{}
GetAllKeys() []string
GetAllAsString() map[string]string
Exists(key string) bool
}
MetadataReader provides read-only access to metadata
type Node ¶
type Node struct {
ID NodeID
Metadata MetadataReader
ProtocolVersion uint16
ApplicationVersion string
// contains filtered or unexported fields
}
func (*Node) DeadOrLeft ¶
func (*Node) GetAddress ¶
type NodeGroup ¶
type NodeGroup struct {
// contains filtered or unexported fields
}
NodeGroup represents a group of nodes that match specific metadata criteria
func NewNodeGroup ¶
func NewNodeGroup(cluster *Cluster, criteria map[string]string, opts *NodeGroupOptions) *NodeGroup
NewNodeGroup creates a new node group that tracks nodes with matching metadata
func (*NodeGroup) SendToPeers ¶
func (ng *NodeGroup) SendToPeers(msgType MessageType, data interface{}) error
SendToPeers sends a message to all peers in the group and if necessary gossips to random peers.
func (*NodeGroup) SendToPeersReliable ¶
func (ng *NodeGroup) SendToPeersReliable(msgType MessageType, data interface{}) error
SendToPeersReliable sends a message to all peers in the group reliably and if necessary gossips to random peers.
type NodeGroupOptions ¶
type NodeMetadataChangeHandler ¶
type NodeMetadataChangeHandler func(*Node)
type NodeStateChangeHandler ¶
NodeStateChangeHandler and NodeMetadataChangeHandler are used to handle node state and metadata changes
type NodeWithData ¶
NodeWithData pairs a node with its custom data
type NullLogger ¶
type NullLogger struct{}
NullLogger implements the Logger interface with no-op methods
func NewNullLogger ¶
func NewNullLogger() *NullLogger
func (*NullLogger) Debugf ¶
func (l *NullLogger) Debugf(format string, args ...interface{})
func (*NullLogger) Err ¶
func (l *NullLogger) Err(err error) Logger
func (*NullLogger) Errorf ¶
func (l *NullLogger) Errorf(format string, args ...interface{})
func (*NullLogger) Field ¶
func (l *NullLogger) Field(key string, value interface{}) Logger
func (*NullLogger) Infof ¶
func (l *NullLogger) Infof(format string, args ...interface{})
func (*NullLogger) Tracef ¶
func (l *NullLogger) Tracef(format string, args ...interface{})
func (*NullLogger) Warnf ¶
func (l *NullLogger) Warnf(format string, args ...interface{})
type Packet ¶
type Packet struct {
MessageType MessageType `msgpack:"mt" json:"mt"`
SenderID NodeID `msgpack:"si" json:"si"`
MessageID MessageID `msgpack:"mi" json:"mi"`
TTL uint8 `msgpack:"ttl" json:"ttl"`
// contains filtered or unexported fields
}
Packet holds the payload of a message being passed between nodes
func (*Packet) Codec ¶
func (p *Packet) Codec() codec.Serializer
type ReplyHandler ¶
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream wraps a net.Conn with compression and encryption support. It implements the net.Conn interface for compatibility with existing code.
func (*Stream) DisableCompression ¶
func (*Stream) EnableCompression ¶
func (*Stream) RemoteAddr ¶
RemoteAddr returns the remote network address.
func (*Stream) SetDeadline ¶
SetDeadline sets the read and write deadlines.
func (*Stream) SetReadDeadline ¶
SetReadDeadline sets the read deadline.
func (*Stream) SetWriteDeadline ¶
SetWriteDeadline sets the write deadline.
type Transport ¶
type Transport interface {
PacketChannel() chan *Packet
DialPeer(node *Node) (net.Conn, error)
WritePacket(conn net.Conn, packet *Packet) error
ReadPacket(conn net.Conn) (*Packet, error)
SendPacket(transportType TransportType, nodes []*Node, packet *Packet) error
WebsocketHandler(ctx context.Context, w http.ResponseWriter, r *http.Request)
}
Interface to define the transport layer, the transport layer is responsible for placing packets onto the wire and reading them off it also handles encryption and compression of packets.
type TransportType ¶
type TransportType uint8
const ( TransportBestEffort TransportType = iota // Best effort transport, uses UDP TransportReliable // Reliable transport, uses TCP )