gossip

package module
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2025 License: MIT Imports: 25 Imported by: 0

README

Gossip Protocol Library

A lightweight, Go-based library for implementing the gossip protocol in distributed systems. The library supports multiple transport mechanisms, including TCP, UDP, and HTTP, providing flexibility for a variety of use cases.

With its straightforward API, the library simplifies decentralized communication by enabling the creation and management of nodes, exchanging messages, and handling critical events like node joins and departures. Designed with reliability in mind, it continuously monitors node health within the cluster and seamlessly removes unreachable nodes, ensuring the system stays robust and adaptive.

The library leverages a hybrid approach to gossiping, combining event-driven gossiping and periodic gossiping to balance rapid updates and eventual consistency across the cluster:

  • Event-Driven Gossiping swiftly propagates critical updates immediately after events occur, minimizing latency.
  • Periodic Gossiping adds redundancy by disseminating updates at regular intervals, ensuring eventual consistency even if some nodes initially miss updates.

This flexible architecture supports the development of resilient distributed systems with efficient data sharing and robust fault tolerance.

Features

  • Multiple Transport Support: TCP, UDP, and HTTP connections
  • Message Security: Optional encryption for message payloads
  • Compression: Configurable message compression, support for Snappy is provide by default
  • Health Monitoring: Automatic node health checking with direct and indirect pings
  • Flexible Codec Support: Pluggable serialization with support for multiple msgpack implementations
  • Metadata Sharing: Distribute custom node metadata across the cluster
  • Version Checking: Application and protocol version compatibility verification
  • Automatic Transport Selection: UDP will be used wherever possible, however if the packet exceeds the MTU size, TCP will be used instead

Installation

To install the library, use the following command:

go get github.com/paularlott/gossip

Basic Usage

package main

import (
  "fmt"
  "time"

  "github.com/paularlott/gossip"
  "github.com/paularlott/gossip/codec"
  "github.com/paularlott/gossip/compression"
)

func main() {
  // Create configuration
  config := gossip.DefaultConfig()
  config.NodeID = "01960f9b-72ca-7a51-9efa-47c12f42a138"       // Optional: auto-generated if not specified
  config.BindAddr = "127.0.0.1:8000"                           // Listen on TCP and UDP
  config.EncryptionKey = "your-32-byte-key"                    // Optional: enables encryption
	config.Cipher = encryption.NewAESEncryptor()                 // Encryption algorithm
  config.MsgCodec = codec.NewShamatonMsgpackCodec()            // Message serialization
  config.Compressor = compression.NewSnappyCompressor()        // Optional: enables compression

  // Create and start the cluster
  cluster, err := gossip.NewCluster(config)
  if err != nil {
    panic(err)
  }
	cluster.Start()
	defer cluster.Stop()

  // Join existing cluster (if any)
  err = cluster.Join([]string{"127.0.0.1:8001"})
  if err != nil {
    fmt.Println("Warning:", err)
  }

  // Register message handler
  const CustomMsg gossip.MessageType = gossip.UserMsg + 1
  cluster.HandleFunc(CustomMsg, func(sender *gossip.Node, packet *gossip.Packet) error {
    var message string
    if err := packet.UnmarshalPayload(&message); err != nil {
        return err
    }
    fmt.Printf("Received message from %s: %s\n", sender.ID, message)
    return nil
  })

  // Broadcast a message
  message := "Hello cluster!"
  cluster.Broadcast(CustomMsg, message)

  // Keep the application running
  select {}
}

Address Formats

The gossip library supports multiple address formats for binding and connecting to the cluster:

  • IP:port - Standard TCP/UDP address (e.g., 127.0.0.1:8000)
  • hostname:port - DNS hostname with port, when multiple addresses are returned the node will attempt to connect to each address in turn assuming each is a node within the cluster
  • hostname or IP - The default port will be used, for a hostname returning multiple addresses the node will attempt to connect to each address in turn assuming each is a node within the cluster
  • srv+service-name - SRV DNS record lookup, when multiple addresses are returned the node will attempt to connect to each address in turn assuming each is a node within the cluster
  • https://hostname:port/endpoint - HTTP connection
  • srv+https://hostname:port/endpoint - Secure HTTP connection looking up the port number for the target

Configuration Options

The Config struct provides extensive customization:

config := gossip.DefaultConfig()

// Node identification
config.NodeID = "unique-node-id"               // Optional: auto-generated if not provided
config.BindAddr = "0.0.0.0:3500"               // Address to bind for listening
config.AdvertiseAddr = "192.168.1.1:3500"      // Address to advertise to peers (optional)

// Communication
config.EncryptionKey = "your-32-byte-key"              // Optional: enables encryption
config.Cipher = encryption.NewAESEncryptor()           // Encryption algorithm
config.Compressor = compression.NewSnappyCompressor()  // Enable payload compression using the provided compressor
config.CompressMinSize = 1024                          // Minimum size of a packet that will be considered for compression
config.Transport = gossip.NewSocketTransport(config)   // Socket based transport

// Logging
config.Logger = logger.NewNullLogger()                 // Default: no logging

Logging

The library uses the github.com/paularlott/logger interface for logging. By default, a null logger is used which discards all log output. You can provide your own logger implementation by implementing the logger.Logger interface:

type Logger interface {
    Trace(msg string, args ...any)
    Debug(msg string, args ...any)
    Info(msg string, args ...any)
    Warn(msg string, args ...any)
    Error(msg string, args ...any)
    Fatal(msg string, args ...any)
    With(key string, value any) Logger
    WithError(err error) Logger
    WithGroup(name string) Logger
}

The Fatal method logs the message and then calls os.Exit(1), making it convenient for handling fatal errors without explicit exit calls.

Using slog

The library provides examples using the slog implementation:

import (
    "os"
    "github.com/paularlott/logger/slog"
)

// Create a console logger with colored output
logger := slog.New("debug", "console", os.Stderr)

// Configure the cluster with the logger
config := gossip.DefaultConfig()
config.Logger = logger

See the examples/common/log.go file for a complete implementation using slog.

Node States

Nodes in the cluster go through several states:

  • NodeUnknown - Node state is unknown, nodes start in this state and change to NodeAlive when joining the cluster
  • NodeAlive - Node is active and healthy
  • NodeSuspect - Node might be unhealthy (pending confirmation)
  • NodeDead - Node is confirmed dead
  • NodeLeaving - Node is gracefully leaving the cluster

Message Codecs

Multiple serialization options are available allowing you to choose the one that best fits your application:

// Using Shamaton msgpack
config.MsgCodec = codec.NewShamatonMsgpackCodec()

// Using Vmihailenco msgpack
config.MsgCodec = codec.NewVmihailencoMsgpackCodec()

// Using JSON
config.MsgCodec = codec.NewJSONCodec()
Examples

The examples directory contains various examples demonstrating the library's capabilities. Each example is self-contained and can be run independently.

  • basic: A basic usage example that creates a cluster and joins nodes to it. Nodes can communicate over TCP/UDP or HTTP.
  • events: Example that installs an event handler to display cluster events.
  • usermessages: Example that demonstrates user defined message handling.
  • kv: Example Key Value store.
  • leader: Example demonstrating leader election.

Documentation

Index

Constants

View Source
const (
	MetadataAnyValue       = "*"
	MetadataContainsPrefix = "~"
)
View Source
const (
	TransportBestEffort = iota
	TransportReliable
)
View Source
const (
	PROTOCOL_VERSION = 1
)

Variables

View Source
var EmptyNodeID = NodeID(uuid.Nil)
View Source
var (
	ErrNoTransportAvailable = fmt.Errorf("no transport available") // When there's no available transport between two nodes
)
View Source
var (
	ErrUnsupportedAddressFormat = fmt.Errorf("unsupported address format")
)

Functions

This section is empty.

Types

type Address

type Address struct {
	IP   net.IP `msgpack:"ip,omitempty" json:"ip,omitempty"`
	Port int    `msgpack:"port,omitempty" json:"port,omitempty"`
	URL  string `msgpack:"url,omitempty" json:"url,omitempty"`
}

func (*Address) Clear added in v0.4.1

func (a *Address) Clear()

func (*Address) IsEmpty added in v0.4.0

func (a *Address) IsEmpty() bool

func (*Address) String

func (a *Address) String() string

type ApplicationVersionCheck

type ApplicationVersionCheck func(version string) bool

ApplicationVersionCheck is a function that checks if an application version is compatible

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(config *Config) (*Cluster, error)

func (*Cluster) AliveNodes

func (c *Cluster) AliveNodes() []*Node

func (*Cluster) CalcFanOut

func (c *Cluster) CalcFanOut() int

CalcFanOut determines how many peers should receive a message when broadcasting information throughout the cluster. It implements a logarithmic scaling approach to balance network traffic and propagation speed.

func (*Cluster) CalcPayloadSize

func (c *Cluster) CalcPayloadSize(totalItems int) int

CalcPayloadSize determines how many items should be included in a gossip payload. This is used when exchanging state information to limit the size of individual gossip messages.

This is meant for controlling the volume of data, not the number of nodes to contact.

func (*Cluster) GetCandidates

func (c *Cluster) GetCandidates() []*Node

Get a random subset of nodes to use for gossiping or exchanging states with, excluding ourselves

func (*Cluster) GetNode

func (c *Cluster) GetNode(id NodeID) *Node

func (*Cluster) GetNodeByIDString

func (c *Cluster) GetNodeByIDString(id string) *Node

func (*Cluster) GetNodesByTag added in v0.10.0

func (c *Cluster) GetNodesByTag(tag string) []*Node

func (*Cluster) HandleFunc

func (c *Cluster) HandleFunc(msgType MessageType, handler Handler) error

Registers a handler to accept a message and automatically forward it to other nodes

func (*Cluster) HandleFuncWithReply

func (c *Cluster) HandleFuncWithReply(msgType MessageType, replyHandler ReplyHandler) error

Registers a handler to accept a message and reply to the sender, always uses the reliable transport

func (*Cluster) HandleFuncWithResponse added in v0.10.0

func (c *Cluster) HandleFuncWithResponse(msgType MessageType, replyHandler ReplyHandler) error

func (*Cluster) HandleGossipFunc

func (c *Cluster) HandleGossipFunc(handler GossipHandler) HandlerID

func (*Cluster) HandleNodeMetadataChangeFunc

func (c *Cluster) HandleNodeMetadataChangeFunc(handler NodeMetadataChangeHandler) HandlerID

func (*Cluster) HandleNodeStateChangeFunc

func (c *Cluster) HandleNodeStateChangeFunc(handler NodeStateChangeHandler) HandlerID

func (*Cluster) Join

func (c *Cluster) Join(peers []string) error

func (*Cluster) Leave

func (c *Cluster) Leave()

Marks the local node as leaving and broadcasts this state to the cluster

func (*Cluster) LocalMetadata

func (c *Cluster) LocalMetadata() *Metadata

Get the local nodes metadata for read and write access

func (*Cluster) LocalNode

func (c *Cluster) LocalNode() *Node

func (*Cluster) Logger

func (c *Cluster) Logger() logger.Logger

func (*Cluster) NodeIsLocal

func (c *Cluster) NodeIsLocal(node *Node) bool

func (*Cluster) Nodes

func (c *Cluster) Nodes() []*Node

func (*Cluster) NodesToIDs

func (c *Cluster) NodesToIDs(nodes []*Node) []NodeID

func (*Cluster) NumAliveNodes

func (c *Cluster) NumAliveNodes() int

Get the number of nodes that are currently alive

func (*Cluster) NumDeadNodes

func (c *Cluster) NumDeadNodes() int

Get the number of nodes that are currently dead

func (*Cluster) NumNodes

func (c *Cluster) NumNodes() int

func (*Cluster) NumSuspectNodes

func (c *Cluster) NumSuspectNodes() int

Get the number of nodes that are currently suspect

func (*Cluster) RemoveGossipHandler

func (c *Cluster) RemoveGossipHandler(id HandlerID) bool

func (*Cluster) RemoveNodeMetadataChangeHandler

func (c *Cluster) RemoveNodeMetadataChangeHandler(id HandlerID) bool

func (*Cluster) RemoveNodeStateChangeHandler

func (c *Cluster) RemoveNodeStateChangeHandler(id HandlerID) bool

func (*Cluster) Send

func (c *Cluster) Send(msgType MessageType, data interface{}) error

func (*Cluster) SendReliable

func (c *Cluster) SendReliable(msgType MessageType, data interface{}) error

func (*Cluster) SendTagged added in v0.10.0

func (c *Cluster) SendTagged(tag string, msgType MessageType, data interface{}) error

func (*Cluster) SendTaggedReliable added in v0.10.0

func (c *Cluster) SendTaggedReliable(tag string, msgType MessageType, data interface{}) error

func (*Cluster) SendTo

func (c *Cluster) SendTo(dstNode *Node, msgType MessageType, data interface{}) error

func (*Cluster) SendToPeers

func (c *Cluster) SendToPeers(dstNodes []*Node, msgType MessageType, data interface{}) error

func (*Cluster) SendToPeersReliable

func (c *Cluster) SendToPeersReliable(dstNodes []*Node, msgType MessageType, data interface{}) error

func (*Cluster) SendToReliable

func (c *Cluster) SendToReliable(dstNode *Node, msgType MessageType, data interface{}) error

func (*Cluster) SendToWithResponse

func (c *Cluster) SendToWithResponse(dstNode *Node, msgType MessageType, payload interface{}, responsePayload interface{}) error

Send a message to the peer then accept a response message. Uses a TCP connection to send the packet and receive the response.

func (*Cluster) Start

func (c *Cluster) Start()

func (*Cluster) Stop

func (c *Cluster) Stop()

func (*Cluster) UnregisterMessageType

func (c *Cluster) UnregisterMessageType(msgType MessageType) bool

type Config

type Config struct {
	NodeID             string   // NodeID is the unique identifier for the node in the cluster, "" to generate a new one
	BindAddr           string   // BindAddr is the address and port to bind to or the path for http transports
	AdvertiseAddr      string   // Advertised address and port or URL for the node
	ApplicationVersion string   // ApplicationVersion is the version of the application, used for compatibility checks
	Tags               []string // Tags for tag-based message routing

	EncryptionKey            []byte                  // Encryption key for the messages, must be either 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
	Transport                Transport               // Transport layer to communicate over UDP or TCP
	Logger                   logger.Logger           // Logger is the logger to use for logging messages
	MsgCodec                 codec.Serializer        // The codec to use for encoding and decoding messages
	Compressor               compression.Compressor  // The compressor to use for compressing and decompressing messages, if not given messages will not be compressed
	CompressMinSize          int                     // The minimum size of a message before attempting to compress it
	BearerToken              string                  // Bearer token to use for authentication, if not given no authentication will be used
	Cipher                   encryption.Cipher       // The cipher to use for encrypting and decrypting messages
	ApplicationVersionCheck  ApplicationVersionCheck // The application version check to use for checking compatibility with other nodes
	GossipInterval           time.Duration           // How often to send gossip messages
	GossipMaxInterval        time.Duration           // Maximum interval between gossip messages
	MetadataGossipInterval   time.Duration           // Fast metadata gossip (~500ms)
	StateGossipInterval      time.Duration           // Medium state sync (~30-60s)
	TCPDialTimeout           time.Duration           // TCPDialTimeout is the duration to wait for a TCP connection to be established
	TCPDeadline              time.Duration           // TCPDeadline is the duration to wait for a TCP operation to complete
	UDPDeadline              time.Duration           // UDPDeadline is the duration to wait for a UDP operation to complete
	UDPMaxPacketSize         int                     // UDPMaxSize is the maximum size of a UDP packet in bytes
	TCPMaxPacketSize         int                     // TCPMaxSize is the maximum size of a TCP packet in bytes
	MsgHistoryGCInterval     time.Duration           // MsgHistoryGCInterval is the duration between garbage collection operations
	MsgHistoryMaxAge         time.Duration           // MsgHistoryMaxAge is the maximum age of a message in the history
	MsgHistoryShardCount     int                     // MessageHistoryShardCount is the number of shards to use for storing message history, 16 for up to 50 nodes, 32 for up to 500 nodes and 64 for larger clusters.
	NodeShardCount           int                     // NodeShardCount is the number of shards to use for storing node information, 4 for up to 50 nodes, 16 for up to 500 nodes and 32 for larger clusters.
	NumSendWorkers           int                     // The number of workers to use for sending messages
	SendQueueSize            int                     // SendQueueSize is the size of the send queue
	NumIncomingWorkers       int                     // The number of workers to use for processing incoming messages
	IncomingPacketQueueDepth int                     // Depth of the queue for incoming packets
	PingTimeout              time.Duration           // Timeout for ping operations, should be less than HealthCheckInterval
	FanOutMultiplier         float64                 // Scale of peer count for broadcast messages
	StateExchangeMultiplier  float64                 // Scale of peer sampling for state exchange messages
	TTLMultiplier            float64                 // Multiplier for TTL, used to determine how many hops a message can take
	ForceReliableTransport   bool                    // Force all messages to use reliable transport
	Resolver                 Resolver                // DNS resolver to use for address resolution, if not set uses default resolver
	PreferIPv6               bool                    // Prefer IPv6 addresses when resolving hostnames (default false = prefer IPv4)
	NodeCleanupInterval      time.Duration           // How often to run node cleanup
	NodeRetentionTime        time.Duration           // How long to keep dead nodes before removal
	LeavingNodeTimeout       time.Duration           // How long to wait before moving leaving nodes to dead
	HealthCheckInterval      time.Duration           // How often to check node health (e.g., 2s)
	SuspectTimeout           time.Duration           // Time before marking node suspect (e.g., 1.5s)
	SuspectRetryInterval     time.Duration           // How often to retry suspect nodes (e.g., 1s)
	DeadNodeTimeout          time.Duration           // Time before marking suspect node as dead (e.g., 15s)
	DeadNodeRetryInterval    time.Duration           // How often to retry dead nodes
	MaxDeadNodeRetryTime     time.Duration           // Stop retrying dead nodes after this time
	HealthWorkerPoolSize     int                     // Number of workers for health checks (e.g., 4)
	HealthCheckQueueDepth    int                     // Queue depth for health check tasks (e.g., 256)
	JoinQueueSize            int                     // Queue depth for joining tasks (e.g., 100)
	NumJoinWorkers           int                     // Number of workers for joining tasks (e.g., 2 - 3)
	PeerRecoveryInterval     time.Duration           // How often to check peer connectivity (default: 30s)
}

func DefaultConfig

func DefaultConfig() *Config

type DataNodeGroup

type DataNodeGroup[T any] struct {
	// contains filtered or unexported fields
}

DataNodeGroup groups nodes based on meta data and stores custom data with the nodes.

func NewDataNodeGroup

func NewDataNodeGroup[T any](
	cluster *Cluster,
	criteria map[string]string,
	options *DataNodeGroupOptions[T],
) *DataNodeGroup[T]

NewDataNodeGroup creates a new data node group

func (*DataNodeGroup[T]) Close

func (dng *DataNodeGroup[T]) Close()

Close unregisters event handlers to allow for garbage collection

func (*DataNodeGroup[T]) Contains

func (dng *DataNodeGroup[T]) Contains(nodeID NodeID) bool

Contains checks if a node with the given ID is in this group

func (*DataNodeGroup[T]) Count

func (dng *DataNodeGroup[T]) Count() int

Count returns the number of alive nodes in this group

func (*DataNodeGroup[T]) GetDataNodes

func (dng *DataNodeGroup[T]) GetDataNodes() []*T

GetDataNodes returns all alive nodes custom data

func (*DataNodeGroup[T]) GetNodeData

func (dng *DataNodeGroup[T]) GetNodeData(nodeID NodeID) *T

GetNodeData returns a node's custom data if it exists in the group

func (*DataNodeGroup[T]) GetNodes

func (dng *DataNodeGroup[T]) GetNodes(excludeIDs []NodeID) []*Node

GetNodes returns all alive nodes in this group, excluding specified node IDs

func (*DataNodeGroup[T]) SendToPeers

func (dng *DataNodeGroup[T]) SendToPeers(msgType MessageType, data interface{}) error

SendToPeers sends a message to all peers in the group and if necessary gossips to random peers.

func (*DataNodeGroup[T]) SendToPeersReliable

func (dng *DataNodeGroup[T]) SendToPeersReliable(msgType MessageType, data interface{}) error

SendToPeersReliable sends a message to all peers in the group reliably and if necessary gossips to random peers.

func (*DataNodeGroup[T]) UpdateNodeData

func (dng *DataNodeGroup[T]) UpdateNodeData(nodeID NodeID, updateFn func(*Node, *T) error) error

UpdateNodeData allows updating a node's custom data safely

type DataNodeGroupOptions

type DataNodeGroupOptions[T any] struct {
	// Callbacks
	OnNodeAdded   func(node *Node, data *T)
	OnNodeRemoved func(node *Node, data *T)
	OnNodeUpdated func(node *Node, data *T)

	// Function to initialize custom data for a node
	DataInitializer func(node *Node) *T
}

DataNodeGroupOptions contains configuration options

type DefaultResolver added in v0.5.0

type DefaultResolver struct{}

DefaultResolver implements the Resolver interface using Go's standard net package

func NewDefaultResolver added in v0.5.0

func NewDefaultResolver() *DefaultResolver

NewDefaultResolver creates a new DefaultResolver

func (*DefaultResolver) LookupIP added in v0.5.0

func (r *DefaultResolver) LookupIP(host string) ([]string, error)

LookupIP takes a hostname and converts it to a list of IP addresses

func (*DefaultResolver) LookupSRV added in v0.5.0

func (r *DefaultResolver) LookupSRV(service string) ([]*net.TCPAddr, error)

LookupSRV takes a service name and returns a list of service records as TCPAddr that match the service name

type EventHandlers

type EventHandlers[T any] struct {
	// contains filtered or unexported fields
}

EventHandlers manages a collection of handlers of a specific type

func NewEventHandlers

func NewEventHandlers[T any]() *EventHandlers[T]

NewEventHandlers creates a new handler collection for any type

func (*EventHandlers[T]) Add

func (l *EventHandlers[T]) Add(handler T) HandlerID

Add registers a handler and returns its ID

func (*EventHandlers[T]) ForEach

func (l *EventHandlers[T]) ForEach(fn func(T))

ForEach executes a function for each handler (optimized for iteration)

func (*EventHandlers[T]) Remove

func (l *EventHandlers[T]) Remove(id HandlerID) bool

Remove unregisters a handler by its ID

type GossipHandler

type GossipHandler func()

type HTTPTransport added in v0.8.0

type HTTPTransport struct {
	// contains filtered or unexported fields
}

func NewHTTPTransport added in v0.8.0

func NewHTTPTransport(config *Config) *HTTPTransport

func (*HTTPTransport) HandleGossipRequest added in v0.8.0

func (ht *HTTPTransport) HandleGossipRequest(w http.ResponseWriter, r *http.Request)

func (*HTTPTransport) Name added in v0.8.0

func (ht *HTTPTransport) Name() string

func (*HTTPTransport) PacketChannel added in v0.8.0

func (ht *HTTPTransport) PacketChannel() chan *Packet

func (*HTTPTransport) Send added in v0.8.0

func (ht *HTTPTransport) Send(transportType TransportType, node *Node, packet *Packet) error

func (*HTTPTransport) SendWithReply added in v0.8.0

func (ht *HTTPTransport) SendWithReply(node *Node, packet *Packet) (*Packet, error)

func (*HTTPTransport) Start added in v0.8.0

func (ht *HTTPTransport) Start(ctx context.Context, wg *sync.WaitGroup) error

type Handler

type Handler func(*Node, *Packet) error

Message handler for fire and forget messages

type HandlerID

type HandlerID uuid.UUID

HandlerID uniquely identifies a registered event handler

type HealthCheckTask added in v0.8.0

type HealthCheckTask struct {
	NodeID    NodeID
	TaskType  HealthCheckType
	Timestamp hlc.Timestamp
}

type HealthCheckType added in v0.8.0

type HealthCheckType int
const (
	DirectPing HealthCheckType = iota
	SuspectRetry
	DeadNodeRetry
)

type HealthMonitor added in v0.8.0

type HealthMonitor struct {
	// contains filtered or unexported fields
}

type LocalMetadata added in v0.8.0

type LocalMetadata interface {
	MetadataReader
	SetString(key, value string) LocalMetadata
	SetBool(key string, value bool) LocalMetadata
	SetInt(key string, value int) LocalMetadata
	SetInt32(key string, value int32) LocalMetadata
	SetInt64(key string, value int64) LocalMetadata
	SetUint(key string, value uint) LocalMetadata
	SetUint32(key string, value uint32) LocalMetadata
	SetUint64(key string, value uint64) LocalMetadata
	SetFloat32(key string, value float32) LocalMetadata
	SetFloat64(key string, value float64) LocalMetadata
	SetTime(key string, value time.Time) LocalMetadata
	Delete(key string)
}

LocalMetadata provides read-write access for local node

type MessageID

type MessageID hlc.Timestamp

type MessageType

type MessageType uint16
const (
	ReservedMsgsStart MessageType = 64 // Reserved for future use

	UserMsg MessageType = 128 // User messages start here
)

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata holds node metadata with type-safe accessors

func NewMetadata

func NewMetadata() *Metadata

NewMetadata creates a new metadata container

func (*Metadata) Delete

func (md *Metadata) Delete(key string)

Delete removes a key from the metadata

func (*Metadata) Exists

func (md *Metadata) Exists(key string) bool

Exists returns true if the key exists in the metadata

func (*Metadata) GetAll

func (md *Metadata) GetAll() map[string]interface{}

GetAll returns a copy of all metadata

func (*Metadata) GetAllAsString

func (md *Metadata) GetAllAsString() map[string]string

GetAllAsString returns all metadata as string values

func (*Metadata) GetAllKeys

func (md *Metadata) GetAllKeys() []string

GetAllKeys returns all keys in the metadata

func (*Metadata) GetBool

func (md *Metadata) GetBool(key string) bool

GetBool returns a boolean value

func (*Metadata) GetFloat32

func (md *Metadata) GetFloat32(key string) float32

GetFloat32 returns a 32-bit float value

func (*Metadata) GetFloat64

func (md *Metadata) GetFloat64(key string) float64

GetFloat64 returns a 64-bit float value (our base float conversion method)

func (*Metadata) GetInt

func (md *Metadata) GetInt(key string) int

GetInt returns an integer value

func (*Metadata) GetInt32

func (md *Metadata) GetInt32(key string) int32

GetInt32 returns a 32-bit integer value

func (*Metadata) GetInt64

func (md *Metadata) GetInt64(key string) int64

GetInt64 returns a 64-bit integer value (our base integer conversion method)

func (*Metadata) GetString

func (md *Metadata) GetString(key string) string

GetString returns a string value

func (*Metadata) GetTime

func (md *Metadata) GetTime(key string) time.Time

GetTime returns a time value

func (*Metadata) GetTimestamp

func (md *Metadata) GetTimestamp() hlc.Timestamp

GetTimestamp returns the last modification timestamp

func (*Metadata) GetUint

func (md *Metadata) GetUint(key string) uint

GetUint returns an unsigned integer value

func (*Metadata) GetUint32

func (md *Metadata) GetUint32(key string) uint32

GetUint32 returns a 32-bit unsigned integer value

func (*Metadata) GetUint64

func (md *Metadata) GetUint64(key string) uint64

GetUint64 returns a 64-bit unsigned integer value (our base unsigned conversion method)

func (*Metadata) SetBool

func (md *Metadata) SetBool(key string, value bool) LocalMetadata

SetBool sets a boolean value

func (*Metadata) SetFloat32

func (md *Metadata) SetFloat32(key string, value float32) LocalMetadata

SetFloat32 sets a 32-bit float value

func (*Metadata) SetFloat64

func (md *Metadata) SetFloat64(key string, value float64) LocalMetadata

SetFloat64 sets a 64-bit float value

func (*Metadata) SetInt

func (md *Metadata) SetInt(key string, value int) LocalMetadata

SetInt sets an integer value

func (*Metadata) SetInt32

func (md *Metadata) SetInt32(key string, value int32) LocalMetadata

SetInt32 sets a 32-bit integer value

func (*Metadata) SetInt64

func (md *Metadata) SetInt64(key string, value int64) LocalMetadata

SetInt64 sets a 64-bit integer value

func (*Metadata) SetOnLocalChange added in v0.7.0

func (md *Metadata) SetOnLocalChange(cb func(hlc.Timestamp, map[string]interface{}))

SetOnLocalChange sets a callback that is invoked whenever local setters (Set*/Delete) modify the metadata. Remote updates via update() do not trigger this callback.

func (*Metadata) SetString

func (md *Metadata) SetString(key, value string) LocalMetadata

SetString sets a string value

func (*Metadata) SetTime

func (md *Metadata) SetTime(key string, value time.Time) LocalMetadata

SetTime sets a time value

func (*Metadata) SetUint

func (md *Metadata) SetUint(key string, value uint) LocalMetadata

SetUint sets an unsigned integer value

func (*Metadata) SetUint32

func (md *Metadata) SetUint32(key string, value uint32) LocalMetadata

SetUint32 sets a 32-bit unsigned integer value

func (*Metadata) SetUint64

func (md *Metadata) SetUint64(key string, value uint64) LocalMetadata

SetUint64 sets a 64-bit unsigned integer value

type MetadataReader

type MetadataReader interface {
	GetString(key string) string
	GetBool(key string) bool
	GetInt(key string) int
	GetInt32(key string) int32
	GetInt64(key string) int64
	GetUint(key string) uint
	GetUint32(key string) uint32
	GetUint64(key string) uint64
	GetFloat32(key string) float32
	GetFloat64(key string) float64
	GetTime(key string) time.Time
	GetTimestamp() hlc.Timestamp
	GetAll() map[string]interface{}
	GetAllKeys() []string
	GetAllAsString() map[string]string
	Exists(key string) bool
}

MetadataReader provides read-only access to metadata

type Node

type Node struct {
	ID NodeID

	Metadata MetadataReader

	ProtocolVersion    uint16
	ApplicationVersion string
	// contains filtered or unexported fields
}

Struct to hold our view of the state of a node within the cluster

func (*Node) AdvertisedAddr added in v0.10.0

func (node *Node) AdvertisedAddr() string

AdvertiseAddr returns the node's advertise address string

func (*Node) Alive

func (node *Node) Alive() bool

func (*Node) ClearAddress added in v0.10.0

func (node *Node) ClearAddress()

ClearAddress clears the node's resolved address (thread-safe)

func (*Node) DeadOrLeft

func (node *Node) DeadOrLeft() bool

func (*Node) GetAddress

func (node *Node) GetAddress() Address

GetAddress returns a copy of the node's resolved address (thread-safe)

func (*Node) GetObservedState added in v0.8.0

func (node *Node) GetObservedState() NodeState

func (*Node) GetTags added in v0.10.0

func (node *Node) GetTags() []string

GetTags returns a copy of the node's tags. Tags are immutable (set at node creation), but we return a defensive copy to prevent accidental modification by callers. Note: For performance-critical paths, consider using HasTag() instead of GetTags()

func (*Node) HasTag added in v0.10.0

func (node *Node) HasTag(tag string) bool

HasTag returns true if the node has the specified tag

func (*Node) IsAddressEmpty added in v0.10.0

func (node *Node) IsAddressEmpty() bool

IsAddressEmpty checks if the node's address is empty (thread-safe)

func (*Node) Removed added in v0.8.0

func (node *Node) Removed() bool

func (*Node) SetAddress added in v0.10.0

func (node *Node) SetAddress(addr Address)

SetAddress sets the node's resolved address (thread-safe)

func (*Node) Suspect

func (node *Node) Suspect() bool

type NodeGroup

type NodeGroup struct {
	// contains filtered or unexported fields
}

NodeGroup represents a group of nodes that match specific metadata criteria

func NewNodeGroup

func NewNodeGroup(cluster *Cluster, criteria map[string]string, opts *NodeGroupOptions) *NodeGroup

NewNodeGroup creates a new node group that tracks nodes with matching metadata

func (*NodeGroup) Close

func (ng *NodeGroup) Close()

func (*NodeGroup) Contains

func (ng *NodeGroup) Contains(nodeID NodeID) bool

Contains checks if a node with the given ID is in this group

func (*NodeGroup) Count

func (ng *NodeGroup) Count() int

Count returns the number of nodes in this group

func (*NodeGroup) GetNodes

func (ng *NodeGroup) GetNodes(excludeIDs []NodeID) []*Node

GetNodes returns all nodes in this group, excluding specified node IDs

func (*NodeGroup) SendToPeers

func (ng *NodeGroup) SendToPeers(msgType MessageType, data interface{}) error

SendToPeers sends a message to all peers in the group and if necessary gossips to random peers.

func (*NodeGroup) SendToPeersReliable

func (ng *NodeGroup) SendToPeersReliable(msgType MessageType, data interface{}) error

SendToPeersReliable sends a message to all peers in the group reliably and if necessary gossips to random peers.

type NodeGroupOptions

type NodeGroupOptions struct {
	OnNodeAdded   func(*Node)
	OnNodeRemoved func(*Node)
}

type NodeID

type NodeID uuid.UUID

func (NodeID) String

func (n NodeID) String() string

type NodeMetadataChangeHandler

type NodeMetadataChangeHandler func(*Node)

type NodeState

type NodeState uint8
const (
	NodeUnknown NodeState = iota
	NodeAlive
	NodeSuspect
	NodeDead
	NodeLeaving
	NodeRemoved
)

func (NodeState) String

func (ns NodeState) String() string

type NodeStateChangeHandler

type NodeStateChangeHandler func(*Node, NodeState)

NodeStateChangeHandler and NodeMetadataChangeHandler are used to handle node state and metadata changes

type NodeWithData

type NodeWithData[T any] struct {
	Node *Node
	Data *T
}

NodeWithData pairs a node with its custom data

type Packet

type Packet struct {
	MessageType  MessageType `msgpack:"mt" json:"mt"`
	SenderID     NodeID      `msgpack:"si" json:"si"`
	TargetNodeID *NodeID     `msgpack:"ti,omitempty" json:"ti,omitempty"`   // Optional: for direct messages
	Tag          *string     `msgpack:"tag,omitempty" json:"tag,omitempty"` // Optional: for tag-based routing
	MessageID    MessageID   `msgpack:"mi" json:"mi"`
	TTL          uint8       `msgpack:"ttl" json:"ttl"`
	// contains filtered or unexported fields
}

Packet holds the payload of a message being passed between nodes

func NewPacket

func NewPacket() *Packet

func (*Packet) AddRef

func (p *Packet) AddRef() *Packet

func (*Packet) CanReply added in v0.8.0

func (p *Packet) CanReply() bool

CanReply returns true if the packet can send a reply

func (*Packet) Codec

func (p *Packet) Codec() codec.Serializer

func (*Packet) Payload added in v0.8.0

func (p *Packet) Payload() []byte

Payload returns the packet payload

func (*Packet) Release

func (p *Packet) Release()

func (*Packet) SendReply added in v0.8.0

func (p *Packet) SendReply() (err error)

SendReply sends a reply packet using the available reply mechanism

func (*Packet) SetCodec added in v0.8.0

func (p *Packet) SetCodec(codec codec.Serializer)

SetCodec sets the packet codec

func (*Packet) SetConn added in v0.8.0

func (p *Packet) SetConn(conn net.Conn)

SetConn sets the connection for this packet

func (*Packet) SetPayload added in v0.8.0

func (p *Packet) SetPayload(payload []byte)

SetPayload sets the packet payload

func (*Packet) SetReplyChan added in v0.8.0

func (p *Packet) SetReplyChan(ch chan<- *Packet)

SetReplyChan sets the reply channel for this packet

func (*Packet) Unmarshal

func (p *Packet) Unmarshal(v interface{}) error

type ReplyHandler

type ReplyHandler func(*Node, *Packet) (interface{}, error)

Message handler for request / reply messages, must return reply data

type Resolver added in v0.5.0

type Resolver interface {
	// LookupIP takes a hostname and converts it to a list of IP addresses
	LookupIP(host string) ([]string, error)

	// LookupSRV takes a service name and returns a list of service records as TCPAddr that match the service name
	LookupSRV(service string) ([]*net.TCPAddr, error)
}

Resolver defines the interface for DNS resolution

type SocketTransport added in v0.8.0

type SocketTransport struct {
	// contains filtered or unexported fields
}

func NewSocketTransport added in v0.8.0

func NewSocketTransport(config *Config) *SocketTransport

func (*SocketTransport) Name added in v0.8.0

func (st *SocketTransport) Name() string

func (*SocketTransport) PacketChannel added in v0.8.0

func (st *SocketTransport) PacketChannel() chan *Packet

func (*SocketTransport) Send added in v0.8.0

func (st *SocketTransport) Send(transportType TransportType, node *Node, packet *Packet) error

func (*SocketTransport) SendWithReply added in v0.8.0

func (st *SocketTransport) SendWithReply(node *Node, packet *Packet) (*Packet, error)

func (*SocketTransport) Start added in v0.8.0

func (st *SocketTransport) Start(ctx context.Context, wg *sync.WaitGroup) error

type Transport

type Transport interface {
	// Get the transport's name
	Name() string

	// Start the transport server
	Start(ctx context.Context, wg *sync.WaitGroup) error

	// PacketChannel returns the channel for incoming packets
	PacketChannel() chan *Packet

	// Send sends a packet to specific node using the specified transport type
	Send(transportType TransportType, node *Node, packet *Packet) error

	// SendWithReply sends a packet and waits for a reply
	SendWithReply(node *Node, packet *Packet) (*Packet, error)
}

Transport defines the interface for packet-based communication

type TransportType

type TransportType int

Directories

Path Synopsis
examples
basic command
events command
health command
kv command
leader command
tagged command
usermessages command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL