Documentation
¶
Index ¶
- Constants
- Variables
- type Address
- type ApplicationVersionCheck
- type Cluster
- func (c *Cluster) AliveNodes() []*Node
- func (c *Cluster) CalcFanOut() int
- func (c *Cluster) CalcPayloadSize(totalItems int) int
- func (c *Cluster) GetCandidates() []*Node
- func (c *Cluster) GetNode(id NodeID) *Node
- func (c *Cluster) GetNodeByIDString(id string) *Node
- func (c *Cluster) GetNodesByTag(tag string) []*Node
- func (c *Cluster) HandleFunc(msgType MessageType, handler Handler) error
- func (c *Cluster) HandleFuncWithReply(msgType MessageType, replyHandler ReplyHandler) error
- func (c *Cluster) HandleFuncWithResponse(msgType MessageType, replyHandler ReplyHandler) error
- func (c *Cluster) HandleGossipFunc(handler GossipHandler) HandlerID
- func (c *Cluster) HandleNodeMetadataChangeFunc(handler NodeMetadataChangeHandler) HandlerID
- func (c *Cluster) HandleNodeStateChangeFunc(handler NodeStateChangeHandler) HandlerID
- func (c *Cluster) Join(peers []string) error
- func (c *Cluster) Leave()
- func (c *Cluster) LocalMetadata() *Metadata
- func (c *Cluster) LocalNode() *Node
- func (c *Cluster) Logger() logger.Logger
- func (c *Cluster) NodeIsLocal(node *Node) bool
- func (c *Cluster) Nodes() []*Node
- func (c *Cluster) NodesToIDs(nodes []*Node) []NodeID
- func (c *Cluster) NumAliveNodes() int
- func (c *Cluster) NumDeadNodes() int
- func (c *Cluster) NumNodes() int
- func (c *Cluster) NumSuspectNodes() int
- func (c *Cluster) RemoveGossipHandler(id HandlerID) bool
- func (c *Cluster) RemoveNodeMetadataChangeHandler(id HandlerID) bool
- func (c *Cluster) RemoveNodeStateChangeHandler(id HandlerID) bool
- func (c *Cluster) Send(msgType MessageType, data interface{}) error
- func (c *Cluster) SendReliable(msgType MessageType, data interface{}) error
- func (c *Cluster) SendTagged(tag string, msgType MessageType, data interface{}) error
- func (c *Cluster) SendTaggedReliable(tag string, msgType MessageType, data interface{}) error
- func (c *Cluster) SendTo(dstNode *Node, msgType MessageType, data interface{}) error
- func (c *Cluster) SendToPeers(dstNodes []*Node, msgType MessageType, data interface{}) error
- func (c *Cluster) SendToPeersReliable(dstNodes []*Node, msgType MessageType, data interface{}) error
- func (c *Cluster) SendToReliable(dstNode *Node, msgType MessageType, data interface{}) error
- func (c *Cluster) SendToWithResponse(dstNode *Node, msgType MessageType, payload interface{}, ...) error
- func (c *Cluster) Start()
- func (c *Cluster) Stop()
- func (c *Cluster) UnregisterMessageType(msgType MessageType) bool
- type Config
- type DataNodeGroup
- func (dng *DataNodeGroup[T]) Close()
- func (dng *DataNodeGroup[T]) Contains(nodeID NodeID) bool
- func (dng *DataNodeGroup[T]) Count() int
- func (dng *DataNodeGroup[T]) GetDataNodes() []*T
- func (dng *DataNodeGroup[T]) GetNodeData(nodeID NodeID) *T
- func (dng *DataNodeGroup[T]) GetNodes(excludeIDs []NodeID) []*Node
- func (dng *DataNodeGroup[T]) SendToPeers(msgType MessageType, data interface{}) error
- func (dng *DataNodeGroup[T]) SendToPeersReliable(msgType MessageType, data interface{}) error
- func (dng *DataNodeGroup[T]) UpdateNodeData(nodeID NodeID, updateFn func(*Node, *T) error) error
- type DataNodeGroupOptions
- type DefaultResolver
- type EventHandlers
- type GossipHandler
- type HTTPTransport
- func (ht *HTTPTransport) HandleGossipRequest(w http.ResponseWriter, r *http.Request)
- func (ht *HTTPTransport) Name() string
- func (ht *HTTPTransport) PacketChannel() chan *Packet
- func (ht *HTTPTransport) Send(transportType TransportType, node *Node, packet *Packet) error
- func (ht *HTTPTransport) SendWithReply(node *Node, packet *Packet) (*Packet, error)
- func (ht *HTTPTransport) Start(ctx context.Context, wg *sync.WaitGroup) error
- type Handler
- type HandlerID
- type HealthCheckTask
- type HealthCheckType
- type HealthMonitor
- type LocalMetadata
- type MessageID
- type MessageType
- type Metadata
- func (md *Metadata) Delete(key string)
- func (md *Metadata) Exists(key string) bool
- func (md *Metadata) GetAll() map[string]interface{}
- func (md *Metadata) GetAllAsString() map[string]string
- func (md *Metadata) GetAllKeys() []string
- func (md *Metadata) GetBool(key string) bool
- func (md *Metadata) GetFloat32(key string) float32
- func (md *Metadata) GetFloat64(key string) float64
- func (md *Metadata) GetInt(key string) int
- func (md *Metadata) GetInt32(key string) int32
- func (md *Metadata) GetInt64(key string) int64
- func (md *Metadata) GetString(key string) string
- func (md *Metadata) GetTime(key string) time.Time
- func (md *Metadata) GetTimestamp() hlc.Timestamp
- func (md *Metadata) GetUint(key string) uint
- func (md *Metadata) GetUint32(key string) uint32
- func (md *Metadata) GetUint64(key string) uint64
- func (md *Metadata) SetBool(key string, value bool) LocalMetadata
- func (md *Metadata) SetFloat32(key string, value float32) LocalMetadata
- func (md *Metadata) SetFloat64(key string, value float64) LocalMetadata
- func (md *Metadata) SetInt(key string, value int) LocalMetadata
- func (md *Metadata) SetInt32(key string, value int32) LocalMetadata
- func (md *Metadata) SetInt64(key string, value int64) LocalMetadata
- func (md *Metadata) SetOnLocalChange(cb func(hlc.Timestamp, map[string]interface{}))
- func (md *Metadata) SetString(key, value string) LocalMetadata
- func (md *Metadata) SetTime(key string, value time.Time) LocalMetadata
- func (md *Metadata) SetUint(key string, value uint) LocalMetadata
- func (md *Metadata) SetUint32(key string, value uint32) LocalMetadata
- func (md *Metadata) SetUint64(key string, value uint64) LocalMetadata
- type MetadataReader
- type Node
- func (node *Node) AdvertisedAddr() string
- func (node *Node) Alive() bool
- func (node *Node) ClearAddress()
- func (node *Node) DeadOrLeft() bool
- func (node *Node) GetAddress() Address
- func (node *Node) GetObservedState() NodeState
- func (node *Node) GetTags() []string
- func (node *Node) HasTag(tag string) bool
- func (node *Node) IsAddressEmpty() bool
- func (node *Node) Removed() bool
- func (node *Node) SetAddress(addr Address)
- func (node *Node) Suspect() bool
- type NodeGroup
- func (ng *NodeGroup) Close()
- func (ng *NodeGroup) Contains(nodeID NodeID) bool
- func (ng *NodeGroup) Count() int
- func (ng *NodeGroup) GetNodes(excludeIDs []NodeID) []*Node
- func (ng *NodeGroup) SendToPeers(msgType MessageType, data interface{}) error
- func (ng *NodeGroup) SendToPeersReliable(msgType MessageType, data interface{}) error
- type NodeGroupOptions
- type NodeID
- type NodeMetadataChangeHandler
- type NodeState
- type NodeStateChangeHandler
- type NodeWithData
- type Packet
- func (p *Packet) AddRef() *Packet
- func (p *Packet) CanReply() bool
- func (p *Packet) Codec() codec.Serializer
- func (p *Packet) Payload() []byte
- func (p *Packet) Release()
- func (p *Packet) SendReply() (err error)
- func (p *Packet) SetCodec(codec codec.Serializer)
- func (p *Packet) SetConn(conn net.Conn)
- func (p *Packet) SetPayload(payload []byte)
- func (p *Packet) SetReplyChan(ch chan<- *Packet)
- func (p *Packet) Unmarshal(v interface{}) error
- type ReplyHandler
- type Resolver
- type SocketTransport
- func (st *SocketTransport) Name() string
- func (st *SocketTransport) PacketChannel() chan *Packet
- func (st *SocketTransport) Send(transportType TransportType, node *Node, packet *Packet) error
- func (st *SocketTransport) SendWithReply(node *Node, packet *Packet) (*Packet, error)
- func (st *SocketTransport) Start(ctx context.Context, wg *sync.WaitGroup) error
- type Transport
- type TransportType
Constants ¶
const ( MetadataAnyValue = "*" MetadataContainsPrefix = "~" )
const ( TransportBestEffort = iota TransportReliable )
const (
PROTOCOL_VERSION = 1
)
Variables ¶
var EmptyNodeID = NodeID(uuid.Nil)
var (
ErrNoTransportAvailable = fmt.Errorf("no transport available") // When there's no available transport between two nodes
)
var (
ErrUnsupportedAddressFormat = fmt.Errorf("unsupported address format")
)
Functions ¶
This section is empty.
Types ¶
type Address ¶
type ApplicationVersionCheck ¶
ApplicationVersionCheck is a function that checks if an application version is compatible
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
func NewCluster ¶
func (*Cluster) AliveNodes ¶
func (*Cluster) CalcFanOut ¶
CalcFanOut determines how many peers should receive a message when broadcasting information throughout the cluster. It implements a logarithmic scaling approach to balance network traffic and propagation speed.
func (*Cluster) CalcPayloadSize ¶
CalcPayloadSize determines how many items should be included in a gossip payload. This is used when exchanging state information to limit the size of individual gossip messages.
This is meant for controlling the volume of data, not the number of nodes to contact.
func (*Cluster) GetCandidates ¶
Get a random subset of nodes to use for gossiping or exchanging states with, excluding ourselves
func (*Cluster) GetNodeByIDString ¶
func (*Cluster) GetNodesByTag ¶ added in v0.10.0
func (*Cluster) HandleFunc ¶
func (c *Cluster) HandleFunc(msgType MessageType, handler Handler) error
Registers a handler to accept a message and automatically forward it to other nodes
func (*Cluster) HandleFuncWithReply ¶
func (c *Cluster) HandleFuncWithReply(msgType MessageType, replyHandler ReplyHandler) error
Registers a handler to accept a message and reply to the sender, always uses the reliable transport
func (*Cluster) HandleFuncWithResponse ¶ added in v0.10.0
func (c *Cluster) HandleFuncWithResponse(msgType MessageType, replyHandler ReplyHandler) error
func (*Cluster) HandleGossipFunc ¶
func (c *Cluster) HandleGossipFunc(handler GossipHandler) HandlerID
func (*Cluster) HandleNodeMetadataChangeFunc ¶
func (c *Cluster) HandleNodeMetadataChangeFunc(handler NodeMetadataChangeHandler) HandlerID
func (*Cluster) HandleNodeStateChangeFunc ¶
func (c *Cluster) HandleNodeStateChangeFunc(handler NodeStateChangeHandler) HandlerID
func (*Cluster) Leave ¶
func (c *Cluster) Leave()
Marks the local node as leaving and broadcasts this state to the cluster
func (*Cluster) LocalMetadata ¶
Get the local nodes metadata for read and write access
func (*Cluster) NodeIsLocal ¶
func (*Cluster) NodesToIDs ¶
func (*Cluster) NumAliveNodes ¶
Get the number of nodes that are currently alive
func (*Cluster) NumDeadNodes ¶
Get the number of nodes that are currently dead
func (*Cluster) NumSuspectNodes ¶
Get the number of nodes that are currently suspect
func (*Cluster) RemoveGossipHandler ¶
func (*Cluster) RemoveNodeMetadataChangeHandler ¶
func (*Cluster) RemoveNodeStateChangeHandler ¶
func (*Cluster) Send ¶
func (c *Cluster) Send(msgType MessageType, data interface{}) error
func (*Cluster) SendReliable ¶
func (c *Cluster) SendReliable(msgType MessageType, data interface{}) error
func (*Cluster) SendTagged ¶ added in v0.10.0
func (c *Cluster) SendTagged(tag string, msgType MessageType, data interface{}) error
func (*Cluster) SendTaggedReliable ¶ added in v0.10.0
func (c *Cluster) SendTaggedReliable(tag string, msgType MessageType, data interface{}) error
func (*Cluster) SendTo ¶
func (c *Cluster) SendTo(dstNode *Node, msgType MessageType, data interface{}) error
func (*Cluster) SendToPeers ¶
func (c *Cluster) SendToPeers(dstNodes []*Node, msgType MessageType, data interface{}) error
func (*Cluster) SendToPeersReliable ¶
func (c *Cluster) SendToPeersReliable(dstNodes []*Node, msgType MessageType, data interface{}) error
func (*Cluster) SendToReliable ¶
func (c *Cluster) SendToReliable(dstNode *Node, msgType MessageType, data interface{}) error
func (*Cluster) SendToWithResponse ¶
func (c *Cluster) SendToWithResponse(dstNode *Node, msgType MessageType, payload interface{}, responsePayload interface{}) error
Send a message to the peer then accept a response message. Uses a TCP connection to send the packet and receive the response.
func (*Cluster) UnregisterMessageType ¶
func (c *Cluster) UnregisterMessageType(msgType MessageType) bool
type Config ¶
type Config struct {
NodeID string // NodeID is the unique identifier for the node in the cluster, "" to generate a new one
BindAddr string // BindAddr is the address and port to bind to or the path for http transports
AdvertiseAddr string // Advertised address and port or URL for the node
ApplicationVersion string // ApplicationVersion is the version of the application, used for compatibility checks
Tags []string // Tags for tag-based message routing
EncryptionKey []byte // Encryption key for the messages, must be either 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
Transport Transport // Transport layer to communicate over UDP or TCP
Logger logger.Logger // Logger is the logger to use for logging messages
MsgCodec codec.Serializer // The codec to use for encoding and decoding messages
Compressor compression.Compressor // The compressor to use for compressing and decompressing messages, if not given messages will not be compressed
CompressMinSize int // The minimum size of a message before attempting to compress it
BearerToken string // Bearer token to use for authentication, if not given no authentication will be used
Cipher encryption.Cipher // The cipher to use for encrypting and decrypting messages
ApplicationVersionCheck ApplicationVersionCheck // The application version check to use for checking compatibility with other nodes
GossipInterval time.Duration // How often to send gossip messages
GossipMaxInterval time.Duration // Maximum interval between gossip messages
MetadataGossipInterval time.Duration // Fast metadata gossip (~500ms)
StateGossipInterval time.Duration // Medium state sync (~30-60s)
TCPDialTimeout time.Duration // TCPDialTimeout is the duration to wait for a TCP connection to be established
TCPDeadline time.Duration // TCPDeadline is the duration to wait for a TCP operation to complete
UDPDeadline time.Duration // UDPDeadline is the duration to wait for a UDP operation to complete
UDPMaxPacketSize int // UDPMaxSize is the maximum size of a UDP packet in bytes
TCPMaxPacketSize int // TCPMaxSize is the maximum size of a TCP packet in bytes
MsgHistoryGCInterval time.Duration // MsgHistoryGCInterval is the duration between garbage collection operations
MsgHistoryMaxAge time.Duration // MsgHistoryMaxAge is the maximum age of a message in the history
MsgHistoryShardCount int // MessageHistoryShardCount is the number of shards to use for storing message history, 16 for up to 50 nodes, 32 for up to 500 nodes and 64 for larger clusters.
NodeShardCount int // NodeShardCount is the number of shards to use for storing node information, 4 for up to 50 nodes, 16 for up to 500 nodes and 32 for larger clusters.
NumSendWorkers int // The number of workers to use for sending messages
SendQueueSize int // SendQueueSize is the size of the send queue
NumIncomingWorkers int // The number of workers to use for processing incoming messages
IncomingPacketQueueDepth int // Depth of the queue for incoming packets
PingTimeout time.Duration // Timeout for ping operations, should be less than HealthCheckInterval
FanOutMultiplier float64 // Scale of peer count for broadcast messages
StateExchangeMultiplier float64 // Scale of peer sampling for state exchange messages
TTLMultiplier float64 // Multiplier for TTL, used to determine how many hops a message can take
ForceReliableTransport bool // Force all messages to use reliable transport
Resolver Resolver // DNS resolver to use for address resolution, if not set uses default resolver
PreferIPv6 bool // Prefer IPv6 addresses when resolving hostnames (default false = prefer IPv4)
NodeCleanupInterval time.Duration // How often to run node cleanup
NodeRetentionTime time.Duration // How long to keep dead nodes before removal
LeavingNodeTimeout time.Duration // How long to wait before moving leaving nodes to dead
HealthCheckInterval time.Duration // How often to check node health (e.g., 2s)
SuspectTimeout time.Duration // Time before marking node suspect (e.g., 1.5s)
SuspectRetryInterval time.Duration // How often to retry suspect nodes (e.g., 1s)
DeadNodeTimeout time.Duration // Time before marking suspect node as dead (e.g., 15s)
DeadNodeRetryInterval time.Duration // How often to retry dead nodes
MaxDeadNodeRetryTime time.Duration // Stop retrying dead nodes after this time
HealthWorkerPoolSize int // Number of workers for health checks (e.g., 4)
HealthCheckQueueDepth int // Queue depth for health check tasks (e.g., 256)
JoinQueueSize int // Queue depth for joining tasks (e.g., 100)
NumJoinWorkers int // Number of workers for joining tasks (e.g., 2 - 3)
PeerRecoveryInterval time.Duration // How often to check peer connectivity (default: 30s)
}
func DefaultConfig ¶
func DefaultConfig() *Config
type DataNodeGroup ¶
type DataNodeGroup[T any] struct { // contains filtered or unexported fields }
DataNodeGroup groups nodes based on meta data and stores custom data with the nodes.
func NewDataNodeGroup ¶
func NewDataNodeGroup[T any]( cluster *Cluster, criteria map[string]string, options *DataNodeGroupOptions[T], ) *DataNodeGroup[T]
NewDataNodeGroup creates a new data node group
func (*DataNodeGroup[T]) Close ¶
func (dng *DataNodeGroup[T]) Close()
Close unregisters event handlers to allow for garbage collection
func (*DataNodeGroup[T]) Contains ¶
func (dng *DataNodeGroup[T]) Contains(nodeID NodeID) bool
Contains checks if a node with the given ID is in this group
func (*DataNodeGroup[T]) Count ¶
func (dng *DataNodeGroup[T]) Count() int
Count returns the number of alive nodes in this group
func (*DataNodeGroup[T]) GetDataNodes ¶
func (dng *DataNodeGroup[T]) GetDataNodes() []*T
GetDataNodes returns all alive nodes custom data
func (*DataNodeGroup[T]) GetNodeData ¶
func (dng *DataNodeGroup[T]) GetNodeData(nodeID NodeID) *T
GetNodeData returns a node's custom data if it exists in the group
func (*DataNodeGroup[T]) GetNodes ¶
func (dng *DataNodeGroup[T]) GetNodes(excludeIDs []NodeID) []*Node
GetNodes returns all alive nodes in this group, excluding specified node IDs
func (*DataNodeGroup[T]) SendToPeers ¶
func (dng *DataNodeGroup[T]) SendToPeers(msgType MessageType, data interface{}) error
SendToPeers sends a message to all peers in the group and if necessary gossips to random peers.
func (*DataNodeGroup[T]) SendToPeersReliable ¶
func (dng *DataNodeGroup[T]) SendToPeersReliable(msgType MessageType, data interface{}) error
SendToPeersReliable sends a message to all peers in the group reliably and if necessary gossips to random peers.
func (*DataNodeGroup[T]) UpdateNodeData ¶
func (dng *DataNodeGroup[T]) UpdateNodeData(nodeID NodeID, updateFn func(*Node, *T) error) error
UpdateNodeData allows updating a node's custom data safely
type DataNodeGroupOptions ¶
type DataNodeGroupOptions[T any] struct { // Callbacks OnNodeAdded func(node *Node, data *T) OnNodeRemoved func(node *Node, data *T) OnNodeUpdated func(node *Node, data *T) // Function to initialize custom data for a node DataInitializer func(node *Node) *T }
DataNodeGroupOptions contains configuration options
type DefaultResolver ¶ added in v0.5.0
type DefaultResolver struct{}
DefaultResolver implements the Resolver interface using Go's standard net package
func NewDefaultResolver ¶ added in v0.5.0
func NewDefaultResolver() *DefaultResolver
NewDefaultResolver creates a new DefaultResolver
type EventHandlers ¶
type EventHandlers[T any] struct { // contains filtered or unexported fields }
EventHandlers manages a collection of handlers of a specific type
func NewEventHandlers ¶
func NewEventHandlers[T any]() *EventHandlers[T]
NewEventHandlers creates a new handler collection for any type
func (*EventHandlers[T]) Add ¶
func (l *EventHandlers[T]) Add(handler T) HandlerID
Add registers a handler and returns its ID
func (*EventHandlers[T]) ForEach ¶
func (l *EventHandlers[T]) ForEach(fn func(T))
ForEach executes a function for each handler (optimized for iteration)
func (*EventHandlers[T]) Remove ¶
func (l *EventHandlers[T]) Remove(id HandlerID) bool
Remove unregisters a handler by its ID
type GossipHandler ¶
type GossipHandler func()
type HTTPTransport ¶ added in v0.8.0
type HTTPTransport struct {
// contains filtered or unexported fields
}
func NewHTTPTransport ¶ added in v0.8.0
func NewHTTPTransport(config *Config) *HTTPTransport
func (*HTTPTransport) HandleGossipRequest ¶ added in v0.8.0
func (ht *HTTPTransport) HandleGossipRequest(w http.ResponseWriter, r *http.Request)
func (*HTTPTransport) Name ¶ added in v0.8.0
func (ht *HTTPTransport) Name() string
func (*HTTPTransport) PacketChannel ¶ added in v0.8.0
func (ht *HTTPTransport) PacketChannel() chan *Packet
func (*HTTPTransport) Send ¶ added in v0.8.0
func (ht *HTTPTransport) Send(transportType TransportType, node *Node, packet *Packet) error
func (*HTTPTransport) SendWithReply ¶ added in v0.8.0
func (ht *HTTPTransport) SendWithReply(node *Node, packet *Packet) (*Packet, error)
type HealthCheckTask ¶ added in v0.8.0
type HealthCheckTask struct {
NodeID NodeID
TaskType HealthCheckType
Timestamp hlc.Timestamp
}
type HealthCheckType ¶ added in v0.8.0
type HealthCheckType int
const ( DirectPing HealthCheckType = iota SuspectRetry DeadNodeRetry )
type HealthMonitor ¶ added in v0.8.0
type HealthMonitor struct {
// contains filtered or unexported fields
}
type LocalMetadata ¶ added in v0.8.0
type LocalMetadata interface {
MetadataReader
SetString(key, value string) LocalMetadata
SetBool(key string, value bool) LocalMetadata
SetInt(key string, value int) LocalMetadata
SetInt32(key string, value int32) LocalMetadata
SetInt64(key string, value int64) LocalMetadata
SetUint(key string, value uint) LocalMetadata
SetUint32(key string, value uint32) LocalMetadata
SetUint64(key string, value uint64) LocalMetadata
SetFloat32(key string, value float32) LocalMetadata
SetFloat64(key string, value float64) LocalMetadata
SetTime(key string, value time.Time) LocalMetadata
Delete(key string)
}
LocalMetadata provides read-write access for local node
type MessageType ¶
type MessageType uint16
const ( ReservedMsgsStart MessageType = 64 // Reserved for future use UserMsg MessageType = 128 // User messages start here )
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
Metadata holds node metadata with type-safe accessors
func (*Metadata) GetAllAsString ¶
GetAllAsString returns all metadata as string values
func (*Metadata) GetAllKeys ¶
GetAllKeys returns all keys in the metadata
func (*Metadata) GetFloat32 ¶
GetFloat32 returns a 32-bit float value
func (*Metadata) GetFloat64 ¶
GetFloat64 returns a 64-bit float value (our base float conversion method)
func (*Metadata) GetInt64 ¶
GetInt64 returns a 64-bit integer value (our base integer conversion method)
func (*Metadata) GetTimestamp ¶
GetTimestamp returns the last modification timestamp
func (*Metadata) GetUint64 ¶
GetUint64 returns a 64-bit unsigned integer value (our base unsigned conversion method)
func (*Metadata) SetBool ¶
func (md *Metadata) SetBool(key string, value bool) LocalMetadata
SetBool sets a boolean value
func (*Metadata) SetFloat32 ¶
func (md *Metadata) SetFloat32(key string, value float32) LocalMetadata
SetFloat32 sets a 32-bit float value
func (*Metadata) SetFloat64 ¶
func (md *Metadata) SetFloat64(key string, value float64) LocalMetadata
SetFloat64 sets a 64-bit float value
func (*Metadata) SetInt ¶
func (md *Metadata) SetInt(key string, value int) LocalMetadata
SetInt sets an integer value
func (*Metadata) SetInt32 ¶
func (md *Metadata) SetInt32(key string, value int32) LocalMetadata
SetInt32 sets a 32-bit integer value
func (*Metadata) SetInt64 ¶
func (md *Metadata) SetInt64(key string, value int64) LocalMetadata
SetInt64 sets a 64-bit integer value
func (*Metadata) SetOnLocalChange ¶ added in v0.7.0
SetOnLocalChange sets a callback that is invoked whenever local setters (Set*/Delete) modify the metadata. Remote updates via update() do not trigger this callback.
func (*Metadata) SetString ¶
func (md *Metadata) SetString(key, value string) LocalMetadata
SetString sets a string value
func (*Metadata) SetTime ¶
func (md *Metadata) SetTime(key string, value time.Time) LocalMetadata
SetTime sets a time value
func (*Metadata) SetUint ¶
func (md *Metadata) SetUint(key string, value uint) LocalMetadata
SetUint sets an unsigned integer value
type MetadataReader ¶
type MetadataReader interface {
GetString(key string) string
GetBool(key string) bool
GetInt(key string) int
GetInt32(key string) int32
GetInt64(key string) int64
GetUint(key string) uint
GetUint32(key string) uint32
GetUint64(key string) uint64
GetFloat32(key string) float32
GetFloat64(key string) float64
GetTime(key string) time.Time
GetTimestamp() hlc.Timestamp
GetAll() map[string]interface{}
GetAllKeys() []string
GetAllAsString() map[string]string
Exists(key string) bool
}
MetadataReader provides read-only access to metadata
type Node ¶
type Node struct {
ID NodeID
Metadata MetadataReader
ProtocolVersion uint16
ApplicationVersion string
// contains filtered or unexported fields
}
Struct to hold our view of the state of a node within the cluster
func (*Node) AdvertisedAddr ¶ added in v0.10.0
AdvertiseAddr returns the node's advertise address string
func (*Node) ClearAddress ¶ added in v0.10.0
func (node *Node) ClearAddress()
ClearAddress clears the node's resolved address (thread-safe)
func (*Node) DeadOrLeft ¶
func (*Node) GetAddress ¶
GetAddress returns a copy of the node's resolved address (thread-safe)
func (*Node) GetObservedState ¶ added in v0.8.0
func (*Node) GetTags ¶ added in v0.10.0
GetTags returns a copy of the node's tags. Tags are immutable (set at node creation), but we return a defensive copy to prevent accidental modification by callers. Note: For performance-critical paths, consider using HasTag() instead of GetTags()
func (*Node) IsAddressEmpty ¶ added in v0.10.0
IsAddressEmpty checks if the node's address is empty (thread-safe)
func (*Node) SetAddress ¶ added in v0.10.0
SetAddress sets the node's resolved address (thread-safe)
type NodeGroup ¶
type NodeGroup struct {
// contains filtered or unexported fields
}
NodeGroup represents a group of nodes that match specific metadata criteria
func NewNodeGroup ¶
func NewNodeGroup(cluster *Cluster, criteria map[string]string, opts *NodeGroupOptions) *NodeGroup
NewNodeGroup creates a new node group that tracks nodes with matching metadata
func (*NodeGroup) SendToPeers ¶
func (ng *NodeGroup) SendToPeers(msgType MessageType, data interface{}) error
SendToPeers sends a message to all peers in the group and if necessary gossips to random peers.
func (*NodeGroup) SendToPeersReliable ¶
func (ng *NodeGroup) SendToPeersReliable(msgType MessageType, data interface{}) error
SendToPeersReliable sends a message to all peers in the group reliably and if necessary gossips to random peers.
type NodeGroupOptions ¶
type NodeMetadataChangeHandler ¶
type NodeMetadataChangeHandler func(*Node)
type NodeStateChangeHandler ¶
NodeStateChangeHandler and NodeMetadataChangeHandler are used to handle node state and metadata changes
type NodeWithData ¶
NodeWithData pairs a node with its custom data
type Packet ¶
type Packet struct {
MessageType MessageType `msgpack:"mt" json:"mt"`
SenderID NodeID `msgpack:"si" json:"si"`
TargetNodeID *NodeID `msgpack:"ti,omitempty" json:"ti,omitempty"` // Optional: for direct messages
Tag *string `msgpack:"tag,omitempty" json:"tag,omitempty"` // Optional: for tag-based routing
MessageID MessageID `msgpack:"mi" json:"mi"`
TTL uint8 `msgpack:"ttl" json:"ttl"`
// contains filtered or unexported fields
}
Packet holds the payload of a message being passed between nodes
func (*Packet) Codec ¶
func (p *Packet) Codec() codec.Serializer
func (*Packet) SendReply ¶ added in v0.8.0
SendReply sends a reply packet using the available reply mechanism
func (*Packet) SetCodec ¶ added in v0.8.0
func (p *Packet) SetCodec(codec codec.Serializer)
SetCodec sets the packet codec
func (*Packet) SetPayload ¶ added in v0.8.0
SetPayload sets the packet payload
func (*Packet) SetReplyChan ¶ added in v0.8.0
SetReplyChan sets the reply channel for this packet
type ReplyHandler ¶
Message handler for request / reply messages, must return reply data
type Resolver ¶ added in v0.5.0
type Resolver interface {
// LookupIP takes a hostname and converts it to a list of IP addresses
LookupIP(host string) ([]string, error)
// LookupSRV takes a service name and returns a list of service records as TCPAddr that match the service name
LookupSRV(service string) ([]*net.TCPAddr, error)
}
Resolver defines the interface for DNS resolution
type SocketTransport ¶ added in v0.8.0
type SocketTransport struct {
// contains filtered or unexported fields
}
func NewSocketTransport ¶ added in v0.8.0
func NewSocketTransport(config *Config) *SocketTransport
func (*SocketTransport) Name ¶ added in v0.8.0
func (st *SocketTransport) Name() string
func (*SocketTransport) PacketChannel ¶ added in v0.8.0
func (st *SocketTransport) PacketChannel() chan *Packet
func (*SocketTransport) Send ¶ added in v0.8.0
func (st *SocketTransport) Send(transportType TransportType, node *Node, packet *Packet) error
func (*SocketTransport) SendWithReply ¶ added in v0.8.0
func (st *SocketTransport) SendWithReply(node *Node, packet *Packet) (*Packet, error)
type Transport ¶
type Transport interface {
// Get the transport's name
Name() string
// Start the transport server
Start(ctx context.Context, wg *sync.WaitGroup) error
// PacketChannel returns the channel for incoming packets
PacketChannel() chan *Packet
// Send sends a packet to specific node using the specified transport type
Send(transportType TransportType, node *Node, packet *Packet) error
// SendWithReply sends a packet and waits for a reply
SendWithReply(node *Node, packet *Packet) (*Packet, error)
}
Transport defines the interface for packet-based communication
type TransportType ¶
type TransportType int
Source Files
¶
- address.go
- cluster.go
- cluster_handlers.go
- cluster_messages.go
- config.go
- data_node_group.go
- default_resolver.go
- events.go
- handler_registry.go
- health_monitor.go
- http_transport.go
- message_history.go
- metadata.go
- node.go
- node_group.go
- node_group_helpers.go
- node_list.go
- packet.go
- resolver.go
- socket_transport.go
- transport.go
- types.go