Documentation
¶
Overview ¶
Package cluster provides distributed shard-based message routing with consistent hashing for building scalable, partitioned systems.
The cluster package enables horizontal scaling by partitioning work across multiple nodes. Each node owns a subset of shards, and messages are routed to the appropriate node based on a key-to-shard mapping.
Architecture ¶
The cluster consists of three main components:
- Client: Routes messages to shards via key or shard ID
- Node: Subscribes to shards and handles incoming messages
- Transport: Abstracts the underlying messaging infrastructure
Shard Distribution ¶
Shards are distributed across nodes using Highest Random Weight (HRW) consistent hashing via ShardsForNode. This ensures:
- Even distribution of shards across nodes
- Minimal shard movement when nodes join or leave
- Deterministic assignment given the same node list
Keys are mapped to shards using ShardFromString, which provides consistent, uniform distribution using BLAKE2b hashing.
Client Usage ¶
Create a client to send messages to the cluster:
client, err := cluster.NewClient(cluster.ClientOptions{
Transport: natsTransport,
NumShards: 64,
})
// Route by key (e.g., user ID, tenant ID)
resp, err := client.Key("user:123").Request(ctx, GetUserQuery{ID: "123"})
// Route directly to shard
resp, err := client.Shard(5).Request(ctx, payload)
Node Usage ¶
Create a node to handle messages for owned shards:
node := cluster.NewNode(cluster.NodeOptions{
NodeID: "node-1",
Transport: natsTransport,
Shards: cluster.ShardsForNode("node-1", allNodeIDs, 64, ""),
Handler: func(ctx context.Context, env cluster.Envelope) ([]byte, error) {
// Handle the message based on env.Type
return response, nil
},
})
node.Run(ctx)
Transport Layer ¶
The transport layer is abstracted via Transport, ClientTransport, and ServerTransport interfaces. The adapters/nats package provides a NATS JetStream implementation.
Envelope ¶
Messages are wrapped in Envelope which carries:
- Shard: Target shard number
- Type: Message type for routing to handlers
- Data: JSON-encoded payload
- Headers: Optional metadata (use WithHeader)
- TTL: Optional time-to-live (use WithTTL)
Error Handling ¶
Common errors include:
- ErrTransportNoShardSubscriber: No node owns the target shard
- ErrEnvelopeExpired: Message TTL exceeded before delivery
- ErrHandlerTimeout: Handler exceeded context deadline
Index ¶
- Constants
- Variables
- func CreateTestCluster(t *testing.T, tr ServerTransport, numNodes int, numShards uint32, ...)
- func ShardFromString(key string, numShards uint32, seed string) uint32
- func ShardsForNode(nodeID string, nodeIDs []string, numShards uint32, seed string) []uint32
- type ActorFactory
- type Client
- func (c *Client) Key(key string, opts ...EnvelopeOption) *ScopedClient
- func (c *Client) NotifyKey(ctx context.Context, key string, msgType string, data []byte) error
- func (c *Client) NotifyShard(ctx context.Context, shard uint32, msgType string, data []byte, ...) error
- func (c *Client) RequestKey(ctx context.Context, key string, msgType string, data []byte, ...) ([]byte, error)
- func (c *Client) RequestShard(ctx context.Context, shard uint32, msgType string, data []byte, ...) ([]byte, error)
- func (c *Client) Shard(shard uint32) *ScopedClient
- type ClientOptions
- type ClientTransport
- type ClusterMetrics
- type Envelope
- type EnvelopeOption
- type GetNodeInfoRequest
- type GetNodeInfoResponse
- type MemoryTransport
- type MemoryTransportOpts
- type Node
- type NodeOptions
- type Request
- type Requester
- type ScopedClient
- type ScopedHandlerOpts
- type ServerHandlerFunc
- func NewActorHandler(actFactory ActorFactory) ServerHandlerFunc
- func NewKeyHandler(createFunc func(key string) (ServerHandlerFunc, error)) ServerHandlerFunc
- func NewKeyHandlerWithOpts(createFunc func(key string) (ServerHandlerFunc, error), cacheSize int) ServerHandlerFunc
- func NewScopedHandler(opts ScopedHandlerOpts) ServerHandlerFunc
- type ServerTransport
- type Subscription
- type Transport
Constants ¶
const ( DefaultMaxConcurrentHandlers = 100 DefaultHandlerTimeout = 30 * time.Second )
const DefaultHandlerCacheSize = 1024
const (
MsgClusterNodeInfo = "clstr.node.info"
)
MsgClusterNodeInfo is the message type for querying node metadata.
Variables ¶
var ( // Transport errors ErrTransportClosed = errors.New("transport closed") ErrTransportNoShardSubscriber = errors.New("no subscriber for shard") // Envelope errors ErrEnvelopeExpired = errors.New("envelope TTL expired") ErrReservedHeader = errors.New("cannot set reserved header") // Handler errors ErrHandlerTimeout = errors.New("handler exceeded deadline") ErrKeyRequired = errors.New("key is required") ErrMissingKeyHeader = errors.New("missing x-clstr-key header") )
Functions ¶
func CreateTestCluster ¶
func CreateTestCluster( t *testing.T, tr ServerTransport, numNodes int, numShards uint32, shardSeed string, h ServerHandlerFunc, )
func ShardFromString ¶
ShardFromString computes a shard number (0 to numShards-1) from a string key. Uses BLAKE2b hashing for uniform distribution. The optional seed ensures consistent hashing across different clients.
func ShardsForNode ¶
ShardsForNode computes which shards a node should own using Highest Random Weight (rendezvous) hashing. This provides:
- Even distribution of shards across nodes
- Minimal shard movement when nodes join or leave
- Deterministic assignment given the same inputs
Parameters:
- nodeID: The ID of the node to compute shards for
- nodeIDs: All node IDs in the cluster (must include nodeID)
- numShards: Total number of shards in the cluster
- seed: Optional seed for consistent hashing
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client routes messages to cluster nodes based on shard assignment. Create via NewClient.
func NewClient ¶
func NewClient(opts ClientOptions) (*Client, error)
NewClient creates a new cluster client with the given options. Returns an error if required options (Transport, NumShards) are missing.
func (*Client) Key ¶
func (c *Client) Key(key string, opts ...EnvelopeOption) *ScopedClient
Key returns a client scoped to the shard determined by the given key. The key is hashed consistently to determine the target shard. Use this for entity-based routing (e.g., by user ID, tenant ID).
func (*Client) NotifyKey ¶
NotifyKey publishes using a string key -> shard mapping (e.g. tenantID, userID, deviceID).
func (*Client) NotifyShard ¶
func (c *Client) NotifyShard(ctx context.Context, shard uint32, msgType string, data []byte, opts ...EnvelopeOption) error
NotifyShard publishes directly to a shard (caller already knows shard).
func (*Client) RequestKey ¶
func (c *Client) RequestKey(ctx context.Context, key string, msgType string, data []byte, opts ...EnvelopeOption) ([]byte, error)
RequestKey requests using a string key -> shard mapping.
func (*Client) RequestShard ¶
func (c *Client) RequestShard(ctx context.Context, shard uint32, msgType string, data []byte, opts ...EnvelopeOption) ([]byte, error)
RequestShard sends a request directly to a shard.
func (*Client) Shard ¶
func (c *Client) Shard(shard uint32) *ScopedClient
Shard returns a client scoped to a specific shard number. Use this when you already know the target shard.
type ClientOptions ¶
type ClientOptions struct {
// Transport is the underlying messaging transport (required).
Transport ClientTransport
// NumShards is the total number of shards in the cluster (required).
NumShards uint32
// Seed is an optional string for consistent shard hashing across clients.
Seed string
// EnvelopeOptions are default options applied to all outgoing envelopes.
EnvelopeOptions []EnvelopeOption
// Metrics for client instrumentation. If nil, a no-op implementation is used.
Metrics ClusterMetrics
}
ClientOptions configures client creation.
type ClientTransport ¶
type ClientTransport interface {
// Request sends a message and waits for a reply.
// Returns the response data or an error (e.g., [ErrTransportNoShardSubscriber]).
Request(ctx context.Context, env Envelope) ([]byte, error)
// Close releases transport resources.
Close() error
}
ClientTransport is the interface for sending messages to the cluster.
type ClusterMetrics ¶
type ClusterMetrics interface {
// Client operations
RequestDuration(msgType string) metrics.Timer
RequestCompleted(msgType string, success bool)
NotifyCompleted(msgType string, success bool)
// Transport errors: no_subscriber, timeout, ttl_expired
TransportError(errorType string)
// Handler operations
HandlerDuration(msgType string) metrics.Timer
HandlerCompleted(msgType string, success bool)
HandlersActive(nodeID string, count int)
// Shards
ShardsOwned(nodeID string, count int)
}
ClusterMetrics defines the metrics interface for the Cluster pillar. All methods are thread-safe.
func NopClusterMetrics ¶
func NopClusterMetrics() ClusterMetrics
NopClusterMetrics returns a no-op ClusterMetrics implementation.
type Envelope ¶
type Envelope struct {
// Shard is the target shard number for routing.
Shard int `json:"shard"`
// Type is the message type name for handler dispatch.
Type string `json:"type"`
// Data is the JSON-encoded message payload.
Data []byte `json:"data"`
// ReplyTo is the reply address (set by transport for request-response).
ReplyTo string `json:"reply_to,omitempty"`
// Headers contains custom metadata.
Headers map[string]string `json:"headers,omitempty"`
// TTLMs is the time-to-live in milliseconds. Zero means no expiration.
TTLMs int64 `json:"ttl_ms,omitempty"`
// CreatedAtMs is the creation timestamp in milliseconds since epoch.
CreatedAtMs int64 `json:"created_at_ms,omitempty"`
}
Envelope is the message container for cluster communication. It wraps the payload with routing and metadata information.
func (Envelope) GetHeader ¶
GetHeader retrieves a header value by key. Returns empty string and false if the header is not set.
type EnvelopeOption ¶
type EnvelopeOption func(*Envelope)
EnvelopeOption configures envelope properties before sending.
func WithHeader ¶
func WithHeader(key, value string) EnvelopeOption
WithHeader adds a custom header to the envelope. Headers with the "x-clstr-" prefix are reserved and will cause validation to fail.
func WithTTL ¶
func WithTTL(ttl time.Duration) EnvelopeOption
WithTTL sets the time-to-live for the envelope.
type GetNodeInfoRequest ¶
type GetNodeInfoRequest struct{}
GetNodeInfoRequest is a request for node metadata. Send via ScopedClient.GetNodeInfo.
type GetNodeInfoResponse ¶
type GetNodeInfoResponse struct {
// NodeID is the unique identifier of the responding node.
NodeID string `json:"node_id"`
// Shards is the list of shard IDs owned by this node.
Shards []uint32 `json:"shards"`
}
GetNodeInfoResponse contains node metadata returned by GetNodeInfoRequest.
type MemoryTransport ¶
type MemoryTransport struct {
// contains filtered or unexported fields
}
func NewInMemoryTransport ¶
func NewInMemoryTransport(opts ...MemoryTransportOpts) *MemoryTransport
func (*MemoryTransport) Close ¶
func (t *MemoryTransport) Close() error
func (*MemoryTransport) SubscribeShard ¶
func (*MemoryTransport) WithLog ¶
func (t *MemoryTransport) WithLog(log *slog.Logger) *MemoryTransport
type MemoryTransportOpts ¶
type MemoryTransportOpts struct {
// MaxConcurrentHandlers limits concurrent handler goroutines.
// 0 = default (100), -1 = unlimited
MaxConcurrentHandlers int
// HandlerTimeout is the maximum duration for a handler to complete.
// 0 = default (30s)
HandlerTimeout time.Duration
}
MemoryTransportOpts configures the in-memory transport.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node represents a cluster member that owns and processes shards. Create via NewNode and start with Node.Run.
func NewNode ¶
func NewNode(opts NodeOptions) *Node
NewNode creates a new cluster node with the given options.
type NodeOptions ¶
type NodeOptions struct {
// Log is the logger for node operations. Defaults to slog.Default().
Log *slog.Logger
// NodeID is the unique identifier for this node. Auto-generated if empty.
NodeID string
// Transport is the server transport for receiving messages (required).
Transport ServerTransport
// Shards is the list of shard IDs this node owns.
// Use [ShardsForNode] to compute this based on cluster membership.
Shards []uint32
// Handler processes incoming messages for owned shards.
Handler ServerHandlerFunc
// Metrics for node instrumentation. If nil, a no-op implementation is used.
Metrics ClusterMetrics
}
NodeOptions configures node creation.
type Request ¶
Request is a typed request builder for making type-safe cluster requests.
func NewRequest ¶
NewRequest creates a typed request builder for the given requester. Use this for type-safe request/response patterns.
type Requester ¶
type Requester interface {
// contains filtered or unexported methods
}
Requester is implemented by types that can send raw requests.
type ScopedClient ¶
type ScopedClient struct {
// contains filtered or unexported fields
}
ScopedClient is a client bound to a specific shard, enabling fluent request/notify operations without repeatedly specifying the shard or key.
func (*ScopedClient) GetNodeInfo ¶
func (c *ScopedClient) GetNodeInfo(ctx context.Context) (res *GetNodeInfoResponse, err error)
GetNodeInfo queries the node that owns this shard for its metadata.
func (*ScopedClient) Notify ¶
func (c *ScopedClient) Notify(ctx context.Context, msg any, opts ...EnvelopeOption) error
Notify sends a fire-and-forget message to the scoped shard.
func (*ScopedClient) Request ¶
func (c *ScopedClient) Request(ctx context.Context, payload any, opts ...EnvelopeOption) ([]byte, error)
Request sends a request to the scoped shard and waits for the response. The payload is JSON-encoded and the message type is derived from the payload type.
type ScopedHandlerOpts ¶
type ServerHandlerFunc ¶
ServerHandlerFunc is the callback for processing incoming shard messages. It receives the envelope and returns a response or error.
func NewActorHandler ¶
func NewActorHandler(actFactory ActorFactory) ServerHandlerFunc
func NewKeyHandler ¶
func NewKeyHandler(createFunc func(key string) (ServerHandlerFunc, error)) ServerHandlerFunc
func NewKeyHandlerWithOpts ¶
func NewKeyHandlerWithOpts(createFunc func(key string) (ServerHandlerFunc, error), cacheSize int) ServerHandlerFunc
func NewScopedHandler ¶
func NewScopedHandler(opts ScopedHandlerOpts) ServerHandlerFunc
type ServerTransport ¶
type ServerTransport interface {
// SubscribeShard registers a handler for messages targeting the given shard.
// Returns a subscription that can be used to unsubscribe.
SubscribeShard(ctx context.Context, shardID uint32, h ServerHandlerFunc) (Subscription, error)
// Close releases transport resources and stops all subscriptions.
Close() error
}
ServerTransport is the interface for receiving messages from the cluster.
type Subscription ¶
type Subscription interface {
// Unsubscribe stops receiving messages for the subscribed shard.
Unsubscribe() error
}
Subscription represents an active shard subscription that can be unsubscribed.
type Transport ¶
type Transport interface {
ClientTransport
ServerTransport
}
Transport combines client and server transport capabilities. Implementations typically wrap a messaging system like NATS.