gossip

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2025 License: MIT Imports: 23 Imported by: 0

README

Gossip Protocol Library

A lightweight, Go-based library for implementing the gossip protocol in distributed systems. The library supports multiple transport mechanisms, including TCP, UDP, and WebSocket, providing flexibility for a variety of use cases.

With its straightforward API, the library simplifies decentralized communication by enabling the creation and management of nodes, exchanging messages, and handling critical events like node joins and departures. Designed with reliability in mind, it continuously monitors node health within the cluster and seamlessly removes unreachable nodes, ensuring the system stays robust and adaptive.

The library leverages a hybrid approach to gossiping, combining event-driven gossiping and periodic gossiping to balance rapid updates and eventual consistency across the cluster:

  • Event-Driven Gossiping swiftly propagates critical updates immediately after events occur, minimizing latency.
  • Periodic Gossiping adds redundancy by disseminating updates at regular intervals, ensuring eventual consistency even if some nodes initially miss updates.

This flexible architecture supports the development of resilient distributed systems with efficient data sharing and robust fault tolerance.

Features

  • Multiple Transport Support: TCP, UDP, and WebSocket (both ws and wss) connections
  • Pluggable WebSocket Providers: Support for both Gorilla and Coder WebSocket implementations
  • Message Security: Optional encryption for message payloads
  • Compression: Configurable message compression, support for Snappy is provide by default
  • Health Monitoring: Automatic node health checking with direct and indirect pings
  • Flexible Codec Support: Pluggable serialization with support for multiple msgpack implementations
  • Metadata Sharing: Distribute custom node metadata across the cluster
  • Version Checking: Application and protocol version compatibility verification
  • Automatic Transport Selection: UDP will be used wherever possible, however if the packet exceeds the MTU size, TCP will be used instead
  • Node to Node Streams: Support for streaming data between nodes over TCP and WebSocket connections

Installation

To install the library, use the following command:

go get github.com/paularlott/gossip

Basic Usage

package main

import (
  "fmt"
  "time"

  "github.com/paularlott/gossip"
  "github.com/paularlott/gossip/codec"
  "github.com/paularlott/gossip/compression"
  "github.com/paularlott/gossip/websocket"
)

func main() {
  // Create configuration
  config := gossip.DefaultConfig()
  config.NodeID = "01960f9b-72ca-7a51-9efa-47c12f42a138"       // Optional: auto-generated if not specified
  config.BindAddr = "127.0.0.1:8000"                           // Listen on TCP and UDP
  config.EncryptionKey = "your-32-byte-key"                    // Optional: enables encryption
	config.Cipher = encryption.NewAESEncryptor()                 // Encryption algorithm
  config.MsgCodec = codec.NewShamatonMsgpackCodec()            // Message serialization
  config.Compressor = compression.NewSnappyCompressor()        // Optional: enables compression

  // Create and start the cluster
  cluster, err := gossip.NewCluster(config)
  if err != nil {
    panic(err)
  }
	cluster.Start()
	defer cluster.Stop()

  // Join existing cluster (if any)
  err = cluster.Join([]string{"127.0.0.1:8001"})
  if err != nil {
    fmt.Println("Warning:", err)
  }

  // Register message handler
  const CustomMsg gossip.MessageType = gossip.UserMsg + 1
  cluster.HandleFunc(CustomMsg, func(sender *gossip.Node, packet *gossip.Packet) error {
    var message string
    if err := packet.UnmarshalPayload(&message); err != nil {
        return err
    }
    fmt.Printf("Received message from %s: %s\n", sender.ID, message)
    return nil
  })

  // Broadcast a message
  message := "Hello cluster!"
  cluster.Broadcast(CustomMsg, message)

  // Keep the application running
  select {}
}

Address Formats

The gossip library supports multiple address formats for binding and connecting to the cluster:

  • IP:port - Standard TCP/UDP address (e.g., 127.0.0.1:8000)
  • hostname:port - DNS hostname with port, when multiple addresses are returned the node will attempt to connect to each address in turn assuming each is a node within the cluster
  • hostname or IP - The default port will be used, for a hostname returning multiple addresses the node will attempt to connect to each address in turn assuming each is a node within the cluster
  • srv+service-name - SRV DNS record lookup, when multiple addresses are returned the node will attempt to connect to each address in turn assuming each is a node within the cluster
  • ws://hostname:port/endpoint - WebSocket connection
  • wss://hostname:port/endpoint - Secure WebSocket connection

Configuration Options

The Config struct provides extensive customization:

config := gossip.DefaultConfig()

// Node identification
config.NodeID = "unique-node-id"               // Optional: auto-generated if not provided
config.BindAddr = "0.0.0.0:3500"               // Address to bind for listening
config.AdvertiseAddr = "192.168.1.1:3500"      // Address to advertise to peers (optional)

// Communication
config.EncryptionKey = "your-32-byte-key"              // Optional: enables encryption
config.Cipher = encryption.NewAESEncryptor()           // Encryption algorithm
config.Compressor = compression.NewSnappyCompressor()  // Enable payload compression using the provided compressor
config.CompressMinSize = 1024                          // Minimum size of a packet that will be considered for compression

// Networking, optional but if given allows use of WebSockets for transport and disables TCP/UDP
config.WebsocketProvider = websocket.NewGorillaProvider(5*time.Second, true, "")
config.AllowInsecureWebsockets = true
config.SocketTransportEnabled = false

Node States

Nodes in the cluster go through several states:

  • NodeUnknown - Node state is unknown, nodes start in this state and change to NodeAlive when joining the cluster
  • NodeAlive - Node is active and healthy
  • NodeSuspect - Node might be unhealthy (pending confirmation)
  • NodeDead - Node is confirmed dead
  • NodeLeaving - Node is gracefully leaving the cluster

WebSocket Support

The library provides adapters for two WebSocket implementations allowing you to choose the one that fits with the rest of your application:

// Using Gorilla WebSockets
config.WebsocketProvider = websocket.NewGorillaProvider(5*time.Second, true, "")

// Using Coder WebSockets
config.WebsocketProvider = websocket.NewCoderProvider(5*time.Second, true, "")

Message Codecs

Multiple serialization options are available allowing you to choose the one that best fits your application:

// Using Shamaton msgpack
config.MsgCodec = codec.NewShamatonMsgpackCodec()

// Using Vmihailenco msgpack
config.MsgCodec = codec.NewVmihailencoMsgpackCodec()

// Using JSON
config.MsgCodec = codec.NewJSONCodec()
Examples

The examples directory contains various examples demonstrating the library's capabilities. Each example is self-contained and can be run independently.

  • basic: A basic usage example that creates a cluster and joins nodes to it. Nodes can communicate over TCP/UDP or WebSocket.
  • events: Example that installs an event handler to display cluster events.
  • usermessages: Example that demonstrates user defined message handling.
  • kv: Example Key Value store.
  • Stream: Example using the stream functions to pass data between nodes.
  • leader: Example demonstrating leader election.

Documentation

Index

Constants

View Source
const (
	MetadataAnyValue       = "*"
	MetadataContainsPrefix = "~"
)
View Source
const (
	PROTOCOL_VERSION = 1
)

Variables

View Source
var EmptyNodeID = NodeID(uuid.Nil)
View Source
var (
	ErrNoTransportAvailable = fmt.Errorf("no transport available") // When there's no available transport between two nodes
)
View Source
var (
	ErrPacketTooLarge = errors.New("packet exceeds maximum allowed size")
)
View Source
var (
	ErrUnsupportedAddressFormat = fmt.Errorf("unsupported address format")
)

Functions

func NewStream

func NewStream(conn net.Conn, config *Config) net.Conn

NewStream creates a new stream wrapping the given connection

func NewTransport

func NewTransport(ctx context.Context, wg *sync.WaitGroup, config *Config, bindAddress Address) (*transport, error)

Types

type Address

type Address struct {
	IP   net.IP `msgpack:"ip,omitempty" json:"ip,omitempty"`
	Port int    `msgpack:"port,omitempty" json:"port,omitempty"`
	URL  string `msgpack:"url,omitempty" json:"url,omitempty"`
}

func (*Address) Clear added in v0.4.1

func (a *Address) Clear()

func (*Address) IsEmpty added in v0.4.0

func (a *Address) IsEmpty() bool

func (*Address) String

func (a *Address) String() string

type ApplicationVersionCheck

type ApplicationVersionCheck func(version string) bool

ApplicationVersionCheck is a function that checks if an application version is compatible

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(config *Config) (*Cluster, error)

func (*Cluster) AliveNodes

func (c *Cluster) AliveNodes() []*Node

func (*Cluster) CalcFanOut

func (c *Cluster) CalcFanOut() int

CalcFanOut determines how many peers should receive a message when broadcasting information throughout the cluster. It implements a logarithmic scaling approach to balance network traffic and propagation speed.

func (*Cluster) CalcPayloadSize

func (c *Cluster) CalcPayloadSize(totalItems int) int

CalcPayloadSize determines how many items should be included in a gossip payload. This is used when exchanging state information to limit the size of individual gossip messages.

This is meant for controlling the volume of data, not the number of nodes to contact.

func (*Cluster) DisableStreamCompression

func (c *Cluster) DisableStreamCompression(s net.Conn) *Cluster

Checks the connections is of Stream type and disables compression if supported This is a no-op if the connection is not a Stream

func (*Cluster) EnableStreamCompression

func (c *Cluster) EnableStreamCompression(s net.Conn) *Cluster

Checks the connections is of Stream type and enables compression if supported This is a no-op if the connection is not a Stream

func (*Cluster) GetCandidates

func (c *Cluster) GetCandidates() []*Node

Get a random subset of nodes to use for gossiping or exchanging states with, excluding ourselves

func (*Cluster) GetNode

func (c *Cluster) GetNode(id NodeID) *Node

func (*Cluster) GetNodeByIDString

func (c *Cluster) GetNodeByIDString(id string) *Node

func (*Cluster) HandleFunc

func (c *Cluster) HandleFunc(msgType MessageType, handler Handler) error

Registers a handler to accept a message and automatically forward it to other nodes

func (*Cluster) HandleFuncNoForward

func (c *Cluster) HandleFuncNoForward(msgType MessageType, handler Handler) error

Registers a handler to accept a message without automatically forwarding it to other nodes

func (*Cluster) HandleFuncWithReply

func (c *Cluster) HandleFuncWithReply(msgType MessageType, replyHandler ReplyHandler) error

Registers a handler to accept a message and reply to the sender, always uses the reliable transport

func (*Cluster) HandleGossipFunc

func (c *Cluster) HandleGossipFunc(handler GossipHandler) HandlerID

func (*Cluster) HandleNodeMetadataChangeFunc

func (c *Cluster) HandleNodeMetadataChangeFunc(handler NodeMetadataChangeHandler) HandlerID

func (*Cluster) HandleNodeStateChangeFunc

func (c *Cluster) HandleNodeStateChangeFunc(handler NodeStateChangeHandler) HandlerID

func (*Cluster) HandleStreamFunc

func (c *Cluster) HandleStreamFunc(msgType MessageType, handler StreamHandler) error

Registers a handler to accept a message and open a stream between sender and destination, always uses the reliable transport

func (*Cluster) Join

func (c *Cluster) Join(peers []string) error

func (*Cluster) Leave

func (c *Cluster) Leave()

MMarks the local node as leaving and broadcasts this state to the cluster

func (*Cluster) LocalMetadata

func (c *Cluster) LocalMetadata() *Metadata

Get the local nodes metadata for read and write access

func (*Cluster) LocalNode

func (c *Cluster) LocalNode() *Node

func (*Cluster) Logger

func (c *Cluster) Logger() Logger

func (*Cluster) NodeIsLocal

func (c *Cluster) NodeIsLocal(node *Node) bool

func (*Cluster) Nodes

func (c *Cluster) Nodes() []*Node

func (*Cluster) NodesToIDs

func (c *Cluster) NodesToIDs(nodes []*Node) []NodeID

func (*Cluster) NumAliveNodes

func (c *Cluster) NumAliveNodes() int

Get the number of nodes that are currently alive

func (*Cluster) NumDeadNodes

func (c *Cluster) NumDeadNodes() int

Get the number of nodes that are currently dead

func (*Cluster) NumNodes

func (c *Cluster) NumNodes() int

func (*Cluster) NumSuspectNodes

func (c *Cluster) NumSuspectNodes() int

Get the number of nodes that are currently suspect

func (*Cluster) OpenStream

func (c *Cluster) OpenStream(dstNode *Node, msgType MessageType, payload interface{}) (net.Conn, error)

func (*Cluster) ReadStreamMsg

func (c *Cluster) ReadStreamMsg(conn net.Conn, expectedMsgType MessageType, payload interface{}) error

ReadStreamMsg reads a message from the stream and unmarshals it into the provided payload. It expects the message to have the specific msgType.

func (*Cluster) RemoveGossipHandler

func (c *Cluster) RemoveGossipHandler(id HandlerID) bool

func (*Cluster) RemoveNodeMetadataChangeHandler

func (c *Cluster) RemoveNodeMetadataChangeHandler(id HandlerID) bool

func (*Cluster) RemoveNodeStateChangeHandler

func (c *Cluster) RemoveNodeStateChangeHandler(id HandlerID) bool

func (*Cluster) Send

func (c *Cluster) Send(msgType MessageType, data interface{}) error

func (*Cluster) SendExcluding

func (c *Cluster) SendExcluding(msgType MessageType, data interface{}, excludeNodes []NodeID) error

func (*Cluster) SendReliable

func (c *Cluster) SendReliable(msgType MessageType, data interface{}) error

func (*Cluster) SendReliableExcluding

func (c *Cluster) SendReliableExcluding(msgType MessageType, data interface{}, excludeNodes []NodeID) error

func (*Cluster) SendTo

func (c *Cluster) SendTo(dstNode *Node, msgType MessageType, data interface{}) error

func (*Cluster) SendToPeers

func (c *Cluster) SendToPeers(dstNodes []*Node, msgType MessageType, data interface{}) error

func (*Cluster) SendToPeersReliable

func (c *Cluster) SendToPeersReliable(dstNodes []*Node, msgType MessageType, data interface{}) error

func (*Cluster) SendToReliable

func (c *Cluster) SendToReliable(dstNode *Node, msgType MessageType, data interface{}) error

func (*Cluster) SendToWithResponse

func (c *Cluster) SendToWithResponse(dstNode *Node, msgType MessageType, payload interface{}, responsePayload interface{}) error

Send a message to the peer then accept a response message. Uses a TCP connection to send the packet and receive the response.

func (*Cluster) Start

func (c *Cluster) Start()

func (*Cluster) Stop

func (c *Cluster) Stop()

func (*Cluster) UnregisterMessageType

func (c *Cluster) UnregisterMessageType(msgType MessageType) bool

func (*Cluster) UpdateMetadata

func (c *Cluster) UpdateMetadata() error

Send a metadata update to the cluster.

func (*Cluster) WebsocketHandler

func (c *Cluster) WebsocketHandler(w http.ResponseWriter, r *http.Request)

Handler for incoming WebSocket connections when gossiping over web sockets

func (*Cluster) WriteStreamMsg

func (c *Cluster) WriteStreamMsg(conn net.Conn, msgType MessageType, payload interface{}) error

WriteStreamMsg writes a message directly to the stream with a simple framing protocol: [2 bytes MessageType][4 bytes payload length][payload bytes]

type Config

type Config struct {
	NodeID   string // NodeID is the unique identifier for the node in the cluster, "" to generate a new one
	BindAddr string // BindAddr is the address and port to bind to
	// AdvertiseAddr is the address and port to advertise to other nodes, if this is given as a domain name
	// it will be resolved to an IP address and used as the advertised address
	// If this is prefixed with srv+ then a SRV record will be used to resolve the address to an IP and port
	// If not given the BindAddr will be used.
	AdvertiseAddr                 string
	ApplicationVersion            string                  // ApplicationVersion is the version of the application, used for compatibility checks
	DefaultPort                   int                     // DefaultPort is the default port to use for the node
	EncryptionKey                 []byte                  // Encryption key for the messages, must be either 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
	Transport                     Transport               // Transport layer to communicate over UDP or TCP
	Logger                        Logger                  // Logger is the logger to use for logging messages
	MsgCodec                      codec.Serializer        // The codec to use for encoding and decoding messages
	Compressor                    compression.Codec       // The codec to use for compressing and decompressing messages, if not given messages will not be compressed
	CompressMinSize               int                     // The minimum size of a message before attempting to compress it
	WebsocketProvider             websocket.Provider      // The provider to use for WebSocket connections
	BearerToken                   string                  // Bearer token to use for authentication, if not given no authentication will be used
	AllowInsecureWebsockets       bool                    // Whether to allow insecure WebSocket connections (ws://)
	SocketTransportEnabled        bool                    // Whether to use the socket transport layer (TCP/UDP)
	Cipher                        encryption.Cipher       // The cipher to use for encrypting and decrypting messages
	ApplicationVersionCheck       ApplicationVersionCheck // The application version check to use for checking compatibility with other nodes
	GossipInterval                time.Duration           // How often to send gossip messages
	GossipMaxInterval             time.Duration           // Maximum interval between gossip messages
	TCPDialTimeout                time.Duration           // TCPDialTimeout is the duration to wait for a TCP connection to be established
	TCPDeadline                   time.Duration           // TCPDeadline is the duration to wait for a TCP operation to complete
	UDPDeadline                   time.Duration           // UDPDeadline is the duration to wait for a UDP operation to complete
	UDPMaxPacketSize              int                     // UDPMaxSize is the maximum size of a UDP packet in bytes
	TCPMaxPacketSize              int                     // TCPMaxSize is the maximum size of a TCP packet in bytes
	StreamMaxPacketSize           int                     // StreamMaxSize is the maximum size of a stream packet in bytes
	MsgHistoryGCInterval          time.Duration           // MsgHistoryGCInterval is the duration between garbage collection operations
	MsgHistoryMaxAge              time.Duration           // MsgHistoryMaxAge is the maximum age of a message in the history
	MsgHistoryShardCount          int                     // MessageHistoryShardCount is the number of shards to use for storing message history, 16 for up to 50 nodes, 32 for up to 500 nodes and 64 for larger clusters.
	NodeShardCount                int                     // NodeShardCount is the number of shards to use for storing node information, 4 for up to 50 nodes, 16 for up to 500 nodes and 32 for larger clusters.
	NumSendWorkers                int                     // The number of workers to use for sending messages
	SendQueueSize                 int                     // SendQueueSize is the size of the send queue
	NumIncomingWorkers            int                     // The number of workers to use for processing incoming messages
	IncomingPacketQueueDepth      int                     // Depth of the queue for incoming packets
	HealthCheckInterval           time.Duration           // How often to perform health checks
	HealthCheckSampleSize         int                     // Number of random nodes to check each interval
	ActivityThresholdPercent      float64                 // Percentage of activity threshold for a node to be considered alive, multiplied with HealthCheckInterval
	SuspectThreshold              int                     // Number of consecutive failures before marking suspect
	SuspectAttemptInterval        time.Duration           // How frequently to check a suspect node
	SuspectRetentionPeriod        time.Duration           // How long to retain suspect nodes for before moving to dead state
	RecoveryAttemptInterval       time.Duration           // How often to attempt recovery of dead nodes
	DeadNodeRetentionPeriod       time.Duration           // How long to keep dead nodes for recovery attempts
	RefutationThreshold           int                     // Number of peers refuting suspicion to restore node
	EnableIndirectPings           bool                    // Whether to use indirect pings
	PingTimeout                   time.Duration           // Timeout for ping operations, should be less than HealthCheckInterval
	MaxParallelSuspectEvaluations int                     // Max number of parallel evaluations for suspect nodes
	StateSyncInterval             time.Duration           // How often to perform state synchronization with peers
	FanOutMultiplier              float64                 // Scale of peer count for broadcast messages
	StateExchangeMultiplier       float64                 // Scale of peer sampling for state exchange messages
	IndirectPingMultiplier        float64                 // Scale of peer sampling for indirect ping messages
	TTLMultiplier                 float64                 // Multiplier for TTL, used to determine how many hops a message can take
	ForceReliableTransport        bool                    // Force all messages to use reliable transport (TCP/WebSocket)
	Resolver                      Resolver                // DNS resolver to use for address resolution, if not set uses default resolver
	PreferIPv6                    bool                    // Prefer IPv6 addresses when resolving hostnames (default false = prefer IPv4)
}

func DefaultConfig

func DefaultConfig() *Config

type DataNodeGroup

type DataNodeGroup[T any] struct {
	// contains filtered or unexported fields
}

DataNodeGroup groups nodes based on meta data and stores custom data with the nodes.

func NewDataNodeGroup

func NewDataNodeGroup[T any](
	cluster *Cluster,
	criteria map[string]string,
	options *DataNodeGroupOptions[T],
) *DataNodeGroup[T]

NewDataNodeGroup creates a new data node group

func (*DataNodeGroup[T]) Close

func (dng *DataNodeGroup[T]) Close()

Close unregisters event handlers to allow for garbage collection

func (*DataNodeGroup[T]) Contains

func (dng *DataNodeGroup[T]) Contains(nodeID NodeID) bool

Contains checks if a node with the given ID is in this group

func (*DataNodeGroup[T]) Count

func (dng *DataNodeGroup[T]) Count() int

Count returns the number of alive nodes in this group

func (*DataNodeGroup[T]) GetDataNodes

func (dng *DataNodeGroup[T]) GetDataNodes() []*T

GetDataNodes returns all alive nodes custom data

func (*DataNodeGroup[T]) GetNodeData

func (dng *DataNodeGroup[T]) GetNodeData(nodeID NodeID) *T

GetNodeData returns a node's custom data if it exists in the group

func (*DataNodeGroup[T]) GetNodes

func (dng *DataNodeGroup[T]) GetNodes(excludeIDs []NodeID) []*Node

GetNodes returns all alive nodes in this group, excluding specified node IDs

func (*DataNodeGroup[T]) SendToPeers

func (dng *DataNodeGroup[T]) SendToPeers(msgType MessageType, data interface{}) error

SendToPeers sends a message to all peers in the group and if necessary gossips to random peers.

func (*DataNodeGroup[T]) SendToPeersReliable

func (dng *DataNodeGroup[T]) SendToPeersReliable(msgType MessageType, data interface{}) error

SendToPeersReliable sends a message to all peers in the group reliably and if necessary gossips to random peers.

func (*DataNodeGroup[T]) UpdateNodeData

func (dng *DataNodeGroup[T]) UpdateNodeData(nodeID NodeID, updateFn func(*Node, *T) error) error

UpdateNodeData allows updating a node's custom data safely

type DataNodeGroupOptions

type DataNodeGroupOptions[T any] struct {
	// Callbacks
	OnNodeAdded   func(node *Node, data *T)
	OnNodeRemoved func(node *Node, data *T)
	OnNodeUpdated func(node *Node, data *T)

	// Function to initialize custom data for a node
	DataInitializer func(node *Node) *T
}

DataNodeGroupOptions contains configuration options

type DefaultResolver added in v0.5.0

type DefaultResolver struct{}

DefaultResolver implements the Resolver interface using Go's standard net package

func NewDefaultResolver added in v0.5.0

func NewDefaultResolver() *DefaultResolver

NewDefaultResolver creates a new DefaultResolver

func (*DefaultResolver) LookupIP added in v0.5.0

func (r *DefaultResolver) LookupIP(host string) ([]string, error)

LookupIP takes a hostname and converts it to a list of IP addresses

func (*DefaultResolver) LookupSRV added in v0.5.0

func (r *DefaultResolver) LookupSRV(service string) ([]*net.TCPAddr, error)

LookupSRV takes a service name and returns a list of service records as TCPAddr that match the service name

type EventHandlers

type EventHandlers[T any] struct {
	// contains filtered or unexported fields
}

EventHandlers manages a collection of handlers of a specific type

func NewEventHandlers

func NewEventHandlers[T any]() *EventHandlers[T]

NewEventHandlers creates a new handler collection for any type

func (*EventHandlers[T]) Add

func (l *EventHandlers[T]) Add(handler T) HandlerID

Add registers a handler and returns its ID

func (*EventHandlers[T]) ForEach

func (l *EventHandlers[T]) ForEach(fn func(T))

ForEach executes a function for each handler (optimized for iteration)

func (*EventHandlers[T]) GetHandlers

func (l *EventHandlers[T]) GetHandlers() []T

GetHandlers returns all registered handlers (lock-free read)

func (*EventHandlers[T]) Remove

func (l *EventHandlers[T]) Remove(id HandlerID) bool

Remove unregisters a handler by its ID

type GossipHandler

type GossipHandler func()

type Handler

type Handler func(*Node, *Packet) error

type HandlerID

type HandlerID uuid.UUID

HandlerID uniquely identifies a registered event handler

type Logger

type Logger interface {
	Tracef(format string, args ...interface{})
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Errorf(format string, args ...interface{})
	Field(key string, value interface{}) Logger
	Err(err error) Logger
}

Logger is a generic logger interface similar to BadgerDB's logger

type MessageID

type MessageID hlc.Timestamp

type MessageType

type MessageType uint16
const (
	ReservedMsgsStart MessageType = 64 // Reserved for future use

	UserMsg MessageType = 128 // User messages start here
)

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata holds node metadata with type-safe accessors

func NewMetadata

func NewMetadata() *Metadata

NewMetadata creates a new metadata container

func (*Metadata) Delete

func (md *Metadata) Delete(key string)

Delete removes a key from the metadata

func (*Metadata) Exists

func (md *Metadata) Exists(key string) bool

Exists returns true if the key exists in the metadata

func (*Metadata) GetAll

func (md *Metadata) GetAll() map[string]interface{}

GetAll returns a copy of all metadata

func (*Metadata) GetAllAsString

func (md *Metadata) GetAllAsString() map[string]string

GetAllAsString returns all metadata as string values

func (*Metadata) GetAllKeys

func (md *Metadata) GetAllKeys() []string

GetAllKeys returns all keys in the metadata

func (*Metadata) GetBool

func (md *Metadata) GetBool(key string) bool

GetBool returns a boolean value

func (*Metadata) GetFloat32

func (md *Metadata) GetFloat32(key string) float32

GetFloat32 returns a 32-bit float value

func (*Metadata) GetFloat64

func (md *Metadata) GetFloat64(key string) float64

GetFloat64 returns a 64-bit float value (our base float conversion method)

func (*Metadata) GetInt

func (md *Metadata) GetInt(key string) int

GetInt returns an integer value

func (*Metadata) GetInt32

func (md *Metadata) GetInt32(key string) int32

GetInt32 returns a 32-bit integer value

func (*Metadata) GetInt64

func (md *Metadata) GetInt64(key string) int64

GetInt64 returns a 64-bit integer value (our base integer conversion method)

func (*Metadata) GetString

func (md *Metadata) GetString(key string) string

GetString returns a string value

func (*Metadata) GetTime

func (md *Metadata) GetTime(key string) time.Time

GetTime returns a time value

func (*Metadata) GetTimestamp

func (md *Metadata) GetTimestamp() hlc.Timestamp

GetTimestamp returns the last modification timestamp

func (*Metadata) GetUint

func (md *Metadata) GetUint(key string) uint

GetUint returns an unsigned integer value

func (*Metadata) GetUint32

func (md *Metadata) GetUint32(key string) uint32

GetUint32 returns a 32-bit unsigned integer value

func (*Metadata) GetUint64

func (md *Metadata) GetUint64(key string) uint64

GetUint64 returns a 64-bit unsigned integer value (our base unsigned conversion method)

func (*Metadata) SetBool

func (md *Metadata) SetBool(key string, value bool) *Metadata

SetBool sets a boolean value

func (*Metadata) SetFloat32

func (md *Metadata) SetFloat32(key string, value float32) *Metadata

SetFloat32 sets a 32-bit float value

func (*Metadata) SetFloat64

func (md *Metadata) SetFloat64(key string, value float64) *Metadata

SetFloat64 sets a 64-bit float value

func (*Metadata) SetInt

func (md *Metadata) SetInt(key string, value int) *Metadata

SetInt sets an integer value

func (*Metadata) SetInt32

func (md *Metadata) SetInt32(key string, value int32) *Metadata

SetInt32 sets a 32-bit integer value

func (*Metadata) SetInt64

func (md *Metadata) SetInt64(key string, value int64) *Metadata

SetInt64 sets a 64-bit integer value

func (*Metadata) SetString

func (md *Metadata) SetString(key, value string) *Metadata

SetString sets a string value

func (*Metadata) SetTime

func (md *Metadata) SetTime(key string, value time.Time) *Metadata

SetTime sets a time value

func (*Metadata) SetUint

func (md *Metadata) SetUint(key string, value uint) *Metadata

SetUint sets an unsigned integer value

func (*Metadata) SetUint32

func (md *Metadata) SetUint32(key string, value uint32) *Metadata

SetUint32 sets a 32-bit unsigned integer value

func (*Metadata) SetUint64

func (md *Metadata) SetUint64(key string, value uint64) *Metadata

SetUint64 sets a 64-bit unsigned integer value

type MetadataReader

type MetadataReader interface {
	GetString(key string) string
	GetBool(key string) bool
	GetInt(key string) int
	GetInt32(key string) int32
	GetInt64(key string) int64
	GetUint(key string) uint
	GetUint32(key string) uint32
	GetUint64(key string) uint64
	GetFloat32(key string) float32
	GetFloat64(key string) float64
	GetTime(key string) time.Time
	GetTimestamp() hlc.Timestamp
	GetAll() map[string]interface{}
	GetAllKeys() []string
	GetAllAsString() map[string]string
	Exists(key string) bool
}

MetadataReader provides read-only access to metadata

type Node

type Node struct {
	ID NodeID

	Metadata MetadataReader

	ProtocolVersion    uint16
	ApplicationVersion string
	// contains filtered or unexported fields
}

func (*Node) Alive

func (node *Node) Alive() bool

func (*Node) DeadOrLeft

func (node *Node) DeadOrLeft() bool

func (*Node) GetAdvertisedAddress added in v0.4.0

func (node *Node) GetAdvertisedAddress() string

func (*Node) GetState

func (node *Node) GetState() NodeState

func (*Node) Suspect

func (node *Node) Suspect() bool

type NodeGroup

type NodeGroup struct {
	// contains filtered or unexported fields
}

NodeGroup represents a group of nodes that match specific metadata criteria

func NewNodeGroup

func NewNodeGroup(cluster *Cluster, criteria map[string]string, opts *NodeGroupOptions) *NodeGroup

NewNodeGroup creates a new node group that tracks nodes with matching metadata

func (*NodeGroup) Close

func (ng *NodeGroup) Close()

func (*NodeGroup) Contains

func (ng *NodeGroup) Contains(nodeID NodeID) bool

Contains checks if a node with the given ID is in this group

func (*NodeGroup) Count

func (ng *NodeGroup) Count() int

Count returns the number of nodes in this group

func (*NodeGroup) GetNodes

func (ng *NodeGroup) GetNodes(excludeIDs []NodeID) []*Node

GetNodes returns all nodes in this group, excluding specified node IDs

func (*NodeGroup) SendToPeers

func (ng *NodeGroup) SendToPeers(msgType MessageType, data interface{}) error

SendToPeers sends a message to all peers in the group and if necessary gossips to random peers.

func (*NodeGroup) SendToPeersReliable

func (ng *NodeGroup) SendToPeersReliable(msgType MessageType, data interface{}) error

SendToPeersReliable sends a message to all peers in the group reliably and if necessary gossips to random peers.

type NodeGroupOptions

type NodeGroupOptions struct {
	OnNodeAdded   func(*Node)
	OnNodeRemoved func(*Node)
}

type NodeID

type NodeID uuid.UUID

func (NodeID) String

func (n NodeID) String() string

type NodeMetadataChangeHandler

type NodeMetadataChangeHandler func(*Node)

type NodeState

type NodeState uint8
const (
	NodeUnknown NodeState = iota
	NodeAlive
	NodeLeaving
	NodeDead
	NodeSuspect
)

func (NodeState) String

func (ns NodeState) String() string

type NodeStateChangeHandler

type NodeStateChangeHandler func(*Node, NodeState)

NodeStateChangeHandler and NodeMetadataChangeHandler are used to handle node state and metadata changes

type NodeWithData

type NodeWithData[T any] struct {
	Node *Node
	Data *T
}

NodeWithData pairs a node with its custom data

type NullLogger

type NullLogger struct{}

NullLogger implements the Logger interface with no-op methods

func NewNullLogger

func NewNullLogger() *NullLogger

func (*NullLogger) Debugf

func (l *NullLogger) Debugf(format string, args ...interface{})

func (*NullLogger) Err

func (l *NullLogger) Err(err error) Logger

func (*NullLogger) Errorf

func (l *NullLogger) Errorf(format string, args ...interface{})

func (*NullLogger) Field

func (l *NullLogger) Field(key string, value interface{}) Logger

func (*NullLogger) Infof

func (l *NullLogger) Infof(format string, args ...interface{})

func (*NullLogger) Tracef

func (l *NullLogger) Tracef(format string, args ...interface{})

func (*NullLogger) Warnf

func (l *NullLogger) Warnf(format string, args ...interface{})

type Packet

type Packet struct {
	MessageType MessageType `msgpack:"mt" json:"mt"`
	SenderID    NodeID      `msgpack:"si" json:"si"`
	MessageID   MessageID   `msgpack:"mi" json:"mi"`
	TTL         uint8       `msgpack:"ttl" json:"ttl"`
	// contains filtered or unexported fields
}

Packet holds the payload of a message being passed between nodes

func NewPacket

func NewPacket() *Packet

func (*Packet) AddRef

func (p *Packet) AddRef() *Packet

func (*Packet) Codec

func (p *Packet) Codec() codec.Serializer

func (*Packet) Release

func (p *Packet) Release()

func (*Packet) Unmarshal

func (p *Packet) Unmarshal(v interface{}) error

type ReplyHandler

type ReplyHandler func(*Node, *Packet) (interface{}, error)

type Resolver added in v0.5.0

type Resolver interface {
	// LookupIP takes a hostname and converts it to a list of IP addresses
	LookupIP(host string) ([]string, error)

	// LookupSRV takes a service name and returns a list of service records as TCPAddr that match the service name
	LookupSRV(service string) ([]*net.TCPAddr, error)
}

Resolver defines the interface for DNS resolution

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream wraps a net.Conn with compression and encryption support. It implements the net.Conn interface for compatibility with existing code.

func (*Stream) Close

func (s *Stream) Close() error

Close closes the connection. It implements the io.Closer interface.

func (*Stream) DisableCompression

func (s *Stream) DisableCompression() *Stream

func (*Stream) EnableCompression

func (s *Stream) EnableCompression() *Stream

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() net.Addr

LocalAddr returns the local network address.

func (*Stream) Read

func (s *Stream) Read(b []byte) (n int, err error)

Read reads data from the connection. It implements the io.Reader interface.

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() net.Addr

RemoteAddr returns the remote network address.

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines.

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline sets the read deadline.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the write deadline.

func (*Stream) Write

func (s *Stream) Write(b []byte) (n int, err error)

Write writes data to the connection. It implements the io.Writer interface.

type StreamHandler

type StreamHandler func(*Node, *Packet, net.Conn)

type Transport

type Transport interface {
	PacketChannel() chan *Packet
	DialPeer(node *Node) (net.Conn, error)
	WritePacket(conn net.Conn, packet *Packet) error
	ReadPacket(conn net.Conn) (*Packet, error)
	SendPacket(transportType TransportType, nodes []*Node, packet *Packet) error
	WebsocketHandler(ctx context.Context, w http.ResponseWriter, r *http.Request)
	ResolveAddress(addressStr string) ([]Address, error)
}

Interface to define the transport layer, the transport layer is responsible for placing packets onto the wire and reading them off it also handles encryption and compression of packets.

type TransportType

type TransportType uint8
const (
	TransportBestEffort TransportType = iota // Best effort transport, uses UDP
	TransportReliable                        // Reliable transport, uses TCP
)

Directories

Path Synopsis
examples
basic command
events command
kv command
leader command
stream command
usermessages command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL