cluster

package
v0.30.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 19 Imported by: 0

README

Cluster Package

A distributed coordination layer with consistent hashing, smart routing, and pluggable transports for building horizontally scalable systems.

Overview

The cluster package routes requests to the correct node using rendezvous hashing (HRW). Each node owns a subset of shards, and clients automatically route requests by key or shard ID. The transport abstraction allows swapping between in-memory (testing) and NATS (production).

flowchart LR
    subgraph Cluster
        Client["Client"] -->|"key hash"| Shard["Shard N"]
        Shard --> Node1["Node 1"]
        Shard --> Node2["Node 2"]
        Shard --> Node3["Node 3"]
    end
    Node1 --> Handler["Handler"]
    Node2 --> Handler
    Node3 --> Handler

Import

import "github.com/codewandler/clstr-go/core/cluster"

Quick Start

// Create transport
tr := cluster.NewInMemoryTransport()

// Create a node that owns some shards
node := cluster.NewNode(cluster.NodeOptions{
    NodeID:    "node-1",
    Transport: tr,
    Shards:    []uint32{0, 1, 2, 3},
    Handler: func(ctx context.Context, env cluster.Envelope) ([]byte, error) {
        return []byte("hello"), nil
    },
})

go node.Run(ctx)

// Create a client
client, _ := cluster.NewClient(cluster.ClientOptions{
    Transport: tr,
    NumShards: 16,
})

// Route by key (automatically maps to correct shard/node)
resp, _ := client.RequestKey(ctx, "user-123", "GetUser", payload)

Core Concepts

Envelope

The fundamental message wrapper for all cluster communication:

type Envelope struct {
    Shard       int               // Target shard (0..numShards-1)
    Type        string            // Message type identifier
    Data        []byte            // Serialized payload
    Headers     map[string]string // Custom headers
    TTLMs       int64             // Time-to-live in milliseconds
    CreatedAtMs int64             // Creation timestamp
}

Methods:

  • GetHeader(key string) (string, bool) — Retrieve header value
  • Expired() bool — Check if TTL exceeded
  • TTL() time.Duration — Get remaining TTL
  • Validate() error — Validate headers (rejects reserved x-clstr-* prefix)

Envelope Options:

cluster.WithHeader("trace-id", "abc123")  // Add custom header
cluster.WithTTL(5 * time.Second)          // Set request timeout
Transport

Three-tier interface for flexible implementations:

// Client-side: send requests
type ClientTransport interface {
    Request(ctx context.Context, env Envelope) ([]byte, error)
    Close() error
}

// Server-side: receive requests
type ServerTransport interface {
    SubscribeShard(ctx context.Context, shardID uint32, h ServerHandlerFunc) (Subscription, error)
    Close() error
}

// Bidirectional
type Transport interface {
    ClientTransport
    ServerTransport
}

type ServerHandlerFunc = func(ctx context.Context, env Envelope) ([]byte, error)
Node

A cluster member that owns shards and processes messages:

type NodeOptions struct {
    Log       *slog.Logger      // Optional (defaults to slog.Default())
    NodeID    string            // Optional (auto-generated if empty)
    Transport ServerTransport   // Required
    Shards    []uint32          // Required: owned shard IDs
    Handler   ServerHandlerFunc // Required
    Metrics   ClusterMetrics    // Optional: for instrumentation
}

node := cluster.NewNode(opts)
node.Run(ctx)  // Blocks until context cancelled

Auto-generated Node IDs: Format node-{nanoid} (e.g., node-Ab12Cd)

Client

Smart router that maps keys to shards:

type ClientOptions struct {
    Transport       ClientTransport  // Required
    NumShards       uint32           // Required
    Seed            string           // Optional: for deterministic hashing
    EnvelopeOptions []EnvelopeOption // Optional: apply to all requests
    Metrics         ClusterMetrics   // Optional: for instrumentation
}

client, _ := cluster.NewClient(opts)

Routing Methods:

// Direct shard routing
client.RequestShard(ctx, shardID, "MsgType", data)
client.NotifyShard(ctx, shardID, "MsgType", data)  // Fire-and-forget

// Key-based routing (auto shard selection)
client.RequestKey(ctx, "user-123", "GetUser", data)
client.NotifyKey(ctx, "user-123", "UpdateUser", data)

// Fluent API with scoped client
resp, _ := client.Key("user-123").Request(ctx, &GetUser{ID: "123"})
resp, _ := client.Shard(5).Request(ctx, &GetUser{ID: "123"})
ScopedClient

Pre-scoped client for a specific key or shard:

scoped := client.Key("tenant-123")

// Strongly-typed requests
resp, err := scoped.Request(ctx, &MyRequest{})

// Fire-and-forget
err := scoped.Notify(ctx, &MyNotification{})

// Query node info
info, err := scoped.GetNodeInfo(ctx)

Message Type Detection:

  1. messageType() string method (lowercase)
  2. MessageType() string method
  3. Reflection-based struct name (fallback)
Typed Request Helper

Strongly-typed request/response pattern:

type GetUser struct {
    ID string `json:"id"`
}

type UserResponse struct {
    ID   string `json:"id"`
    Name string `json:"name"`
}

req := cluster.NewRequest[GetUser, UserResponse](client.Key("user-123"))
user, err := req.Request(ctx, GetUser{ID: "123"})

Sharding

Consistent Hashing

Keys map to shards deterministically using BLAKE2b hashing:

// Map key to shard
shard := cluster.ShardFromString("user-123", numShards, seed)
Shard Distribution (Rendezvous/HRW)

Nodes claim shards using Highest Random Weight algorithm:

// Calculate which shards a node owns
nodeIDs := []string{"node-1", "node-2", "node-3"}
shards := cluster.ShardsForNode("node-1", nodeIDs, 256, "seed")

Properties:

  • Deterministic: all nodes agree on ownership
  • Minimal reshuffling when topology changes
  • No central coordinator needed

Handler Patterns

Per-Key Handlers

Cache handlers by key with LRU eviction:

handler := cluster.NewKeyHandler(func(key string) (cluster.ServerHandlerFunc, error) {
    // Create handler for this key (e.g., load actor, aggregate)
    return func(ctx context.Context, env cluster.Envelope) ([]byte, error) {
        return []byte("tenant: " + key), nil
    }, nil
})

With custom cache size:

handler := cluster.NewKeyHandlerWithOpts(createFunc, 2048)  // LRU size

Default cache size: 1024 entries

Actor Integration

Bridge cluster messages to actor model:

handler := cluster.NewActorHandler(func(key string) (actor.Actor, error) {
    return createActorForKey(key), nil
})

node := cluster.NewNode(cluster.NodeOptions{
    Transport: tr,
    Shards:    shards,
    Handler:   handler,
})

In-Memory Transport

Full-featured implementation for testing:

tr := cluster.NewInMemoryTransport(cluster.MemoryTransportOpts{
    MaxConcurrentHandlers: 100,        // Default: 100
    HandlerTimeout:        30*time.Second, // Default: 30s
})

// Add logging
tr = tr.WithLog(logger)

Features:

  • Bounded concurrency via semaphore
  • TTL enforcement with auto-expiration
  • Handler timeout enforcement
  • Graceful shutdown

Testing Utilities

// Create transport with cleanup
tr := cluster.CreateInMemoryTransport(t)

// Spin up multi-node cluster
cluster.CreateTestCluster(t, tr, 3, 256, "seed", handler)
// Creates nodes: node-0, node-1, node-2
// Distributes 256 shards across them

Error Types

cluster.ErrTransportClosed            // Transport was closed
cluster.ErrTransportNoShardSubscriber // No handler for shard
cluster.ErrEnvelopeExpired            // TTL exceeded
cluster.ErrReservedHeader             // Used x-clstr-* header
cluster.ErrHandlerTimeout             // Handler exceeded deadline
cluster.ErrKeyRequired                // Key missing for key-based routing
cluster.ErrMissingKeyHeader           // x-clstr-key header not set

Built-in API

Query node information:

scoped := client.Key("any-key")
info, err := scoped.GetNodeInfo(ctx)
// info.NodeID: "node-1"
// info.Shards: [0, 5, 10, ...]

Complete Example

package main

import (
    "context"
    "encoding/json"
    "log/slog"

    "github.com/codewandler/clstr-go/core/cluster"
)

type GetUser struct {
    ID string `json:"id"`
}

func (GetUser) messageType() string { return "GetUser" }

type UserResponse struct {
    ID   string `json:"id"`
    Name string `json:"name"`
}

func main() {
    ctx := context.Background()

    // Create transport
    tr := cluster.NewInMemoryTransport()

    // Define handler
    handler := cluster.NewKeyHandler(func(key string) (cluster.ServerHandlerFunc, error) {
        return func(ctx context.Context, env cluster.Envelope) ([]byte, error) {
            var req GetUser
            json.Unmarshal(env.Data, &req)

            resp := UserResponse{ID: req.ID, Name: "User " + req.ID}
            return json.Marshal(resp)
        }, nil
    })

    // Calculate shard distribution
    nodeIDs := []string{"node-1", "node-2"}

    // Start nodes
    for _, nodeID := range nodeIDs {
        shards := cluster.ShardsForNode(nodeID, nodeIDs, 64, "")
        node := cluster.NewNode(cluster.NodeOptions{
            NodeID:    nodeID,
            Transport: tr,
            Shards:    shards,
            Handler:   handler,
        })
        go node.Run(ctx)
    }

    // Create client
    client, _ := cluster.NewClient(cluster.ClientOptions{
        Transport: tr,
        NumShards: 64,
    })

    // Make requests
    req := cluster.NewRequest[GetUser, UserResponse](client.Key("user-123"))
    user, _ := req.Request(ctx, GetUser{ID: "123"})
    slog.Info("got user", slog.String("name", user.Name))
}

Metrics

The cluster package supports pluggable metrics via the ClusterMetrics interface:

import promadapter "github.com/codewandler/clstr-go/adapters/prometheus"

// Create Prometheus metrics
metrics := promadapter.NewClusterMetrics(prometheus.DefaultRegisterer)

// Use with client
client, _ := cluster.NewClient(cluster.ClientOptions{
    Transport: tr,
    NumShards: 256,
    Metrics:   metrics,
})

// Use with node
node := cluster.NewNode(cluster.NodeOptions{
    Transport: tr,
    Shards:    shards,
    Handler:   handler,
    Metrics:   metrics,
})

Available Metrics:

Metric Type Labels Description
clstr_cluster_request_duration_seconds Histogram message_type Client request latency
clstr_cluster_requests_total Counter message_type, success Client requests
clstr_cluster_notifies_total Counter message_type, success Client notifications
clstr_cluster_transport_errors_total Counter error_type Transport errors (no_subscriber, timeout, ttl_expired, closed)
clstr_cluster_handler_duration_seconds Histogram message_type Handler execution time
clstr_cluster_handlers_total Counter message_type, success Handlers executed
clstr_cluster_handlers_active Gauge node_id Concurrent handlers
clstr_cluster_shards_owned Gauge node_id Shards owned by node

If no metrics are provided, a no-op implementation is used (zero overhead).

Key Types Reference

Type Description
Envelope Message wrapper with shard, type, data, headers, TTL
Transport Bidirectional transport interface
ClientTransport Client-side transport (send only)
ServerTransport Server-side transport (receive only)
Node Cluster member owning shards
Client Smart router with consistent hashing
ScopedClient Pre-scoped client for key or shard
Request[IN, OUT] Typed request/response helper
ServerHandlerFunc Handler callback signature
ClusterMetrics Metrics interface for instrumentation

Documentation

Overview

Package cluster provides distributed shard-based message routing with consistent hashing for building scalable, partitioned systems.

The cluster package enables horizontal scaling by partitioning work across multiple nodes. Each node owns a subset of shards, and messages are routed to the appropriate node based on a key-to-shard mapping.

Architecture

The cluster consists of three main components:

  • Client: Routes messages to shards via key or shard ID
  • Node: Subscribes to shards and handles incoming messages
  • Transport: Abstracts the underlying messaging infrastructure

Shard Distribution

Shards are distributed across nodes using Highest Random Weight (HRW) consistent hashing via ShardsForNode. This ensures:

  • Even distribution of shards across nodes
  • Minimal shard movement when nodes join or leave
  • Deterministic assignment given the same node list

Keys are mapped to shards using ShardFromString, which provides consistent, uniform distribution using BLAKE2b hashing.

Client Usage

Create a client to send messages to the cluster:

client, err := cluster.NewClient(cluster.ClientOptions{
    Transport: natsTransport,
    NumShards: 64,
})

// Route by key (e.g., user ID, tenant ID)
resp, err := client.Key("user:123").Request(ctx, GetUserQuery{ID: "123"})

// Route directly to shard
resp, err := client.Shard(5).Request(ctx, payload)

Node Usage

Create a node to handle messages for owned shards:

node := cluster.NewNode(cluster.NodeOptions{
    NodeID:    "node-1",
    Transport: natsTransport,
    Shards:    cluster.ShardsForNode("node-1", allNodeIDs, 64, ""),
    Handler: func(ctx context.Context, env cluster.Envelope) ([]byte, error) {
        // Handle the message based on env.Type
        return response, nil
    },
})
node.Run(ctx)

Transport Layer

The transport layer is abstracted via Transport, ClientTransport, and ServerTransport interfaces. The adapters/nats package provides a NATS JetStream implementation.

Envelope

Messages are wrapped in Envelope which carries:

  • Shard: Target shard number
  • Type: Message type for routing to handlers
  • Data: JSON-encoded payload
  • Headers: Optional metadata (use WithHeader)
  • TTL: Optional time-to-live (use WithTTL)

Error Handling

Common errors include:

Index

Constants

View Source
const (
	DefaultMaxConcurrentHandlers = 100
	DefaultHandlerTimeout        = 30 * time.Second
)
View Source
const DefaultHandlerCacheSize = 1024
View Source
const (
	MsgClusterNodeInfo = "clstr.node.info"
)

MsgClusterNodeInfo is the message type for querying node metadata.

Variables

View Source
var (
	// Transport errors
	ErrTransportClosed            = errors.New("transport closed")
	ErrTransportNoShardSubscriber = errors.New("no subscriber for shard")

	// Envelope errors
	ErrEnvelopeExpired = errors.New("envelope TTL expired")
	ErrReservedHeader  = errors.New("cannot set reserved header")

	// Handler errors
	ErrHandlerTimeout   = errors.New("handler exceeded deadline")
	ErrKeyRequired      = errors.New("key is required")
	ErrMissingKeyHeader = errors.New("missing x-clstr-key header")
)

Functions

func CreateTestCluster

func CreateTestCluster(
	t *testing.T,
	tr ServerTransport,
	numNodes int,
	numShards uint32,
	shardSeed string,
	h ServerHandlerFunc,
)

func ShardFromString

func ShardFromString(key string, numShards uint32, seed string) uint32

ShardFromString computes a shard number (0 to numShards-1) from a string key. Uses BLAKE2b hashing for uniform distribution. The optional seed ensures consistent hashing across different clients.

func ShardsForNode

func ShardsForNode(nodeID string, nodeIDs []string, numShards uint32, seed string) []uint32

ShardsForNode computes which shards a node should own using Highest Random Weight (rendezvous) hashing. This provides:

  • Even distribution of shards across nodes
  • Minimal shard movement when nodes join or leave
  • Deterministic assignment given the same inputs

Parameters:

  • nodeID: The ID of the node to compute shards for
  • nodeIDs: All node IDs in the cluster (must include nodeID)
  • numShards: Total number of shards in the cluster
  • seed: Optional seed for consistent hashing

Types

type ActorFactory

type ActorFactory func(key string) (actor.Actor, error)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client routes messages to cluster nodes based on shard assignment. Create via NewClient.

func NewClient

func NewClient(opts ClientOptions) (*Client, error)

NewClient creates a new cluster client with the given options. Returns an error if required options (Transport, NumShards) are missing.

func (*Client) Key

func (c *Client) Key(key string, opts ...EnvelopeOption) *ScopedClient

Key returns a client scoped to the shard determined by the given key. The key is hashed consistently to determine the target shard. Use this for entity-based routing (e.g., by user ID, tenant ID).

func (*Client) NotifyKey

func (c *Client) NotifyKey(ctx context.Context, key string, msgType string, data []byte) error

NotifyKey publishes using a string key -> shard mapping (e.g. tenantID, userID, deviceID).

func (*Client) NotifyShard

func (c *Client) NotifyShard(ctx context.Context, shard uint32, msgType string, data []byte, opts ...EnvelopeOption) error

NotifyShard publishes directly to a shard (caller already knows shard).

func (*Client) RequestKey

func (c *Client) RequestKey(ctx context.Context, key string, msgType string, data []byte, opts ...EnvelopeOption) ([]byte, error)

RequestKey requests using a string key -> shard mapping.

func (*Client) RequestShard

func (c *Client) RequestShard(ctx context.Context, shard uint32, msgType string, data []byte, opts ...EnvelopeOption) ([]byte, error)

RequestShard sends a request directly to a shard.

func (*Client) Shard

func (c *Client) Shard(shard uint32) *ScopedClient

Shard returns a client scoped to a specific shard number. Use this when you already know the target shard.

type ClientOptions

type ClientOptions struct {
	// Transport is the underlying messaging transport (required).
	Transport ClientTransport
	// NumShards is the total number of shards in the cluster (required).
	NumShards uint32
	// Seed is an optional string for consistent shard hashing across clients.
	Seed string
	// EnvelopeOptions are default options applied to all outgoing envelopes.
	EnvelopeOptions []EnvelopeOption
	// Metrics for client instrumentation. If nil, a no-op implementation is used.
	Metrics ClusterMetrics
}

ClientOptions configures client creation.

type ClientTransport

type ClientTransport interface {
	// Request sends a message and waits for a reply.
	// Returns the response data or an error (e.g., [ErrTransportNoShardSubscriber]).
	Request(ctx context.Context, env Envelope) ([]byte, error)

	// Close releases transport resources.
	Close() error
}

ClientTransport is the interface for sending messages to the cluster.

type ClusterMetrics

type ClusterMetrics interface {
	// Client operations
	RequestDuration(msgType string) metrics.Timer
	RequestCompleted(msgType string, success bool)
	NotifyCompleted(msgType string, success bool)

	// Transport errors: no_subscriber, timeout, ttl_expired
	TransportError(errorType string)

	// Handler operations
	HandlerDuration(msgType string) metrics.Timer
	HandlerCompleted(msgType string, success bool)
	HandlersActive(nodeID string, count int)

	// Shards
	ShardsOwned(nodeID string, count int)
}

ClusterMetrics defines the metrics interface for the Cluster pillar. All methods are thread-safe.

func NopClusterMetrics

func NopClusterMetrics() ClusterMetrics

NopClusterMetrics returns a no-op ClusterMetrics implementation.

type Envelope

type Envelope struct {
	// Shard is the target shard number for routing.
	Shard int `json:"shard"`
	// Type is the message type name for handler dispatch.
	Type string `json:"type"`
	// Data is the JSON-encoded message payload.
	Data []byte `json:"data"`
	// ReplyTo is the reply address (set by transport for request-response).
	ReplyTo string `json:"reply_to,omitempty"`
	// Headers contains custom metadata.
	Headers map[string]string `json:"headers,omitempty"`
	// TTLMs is the time-to-live in milliseconds. Zero means no expiration.
	TTLMs int64 `json:"ttl_ms,omitempty"`
	// CreatedAtMs is the creation timestamp in milliseconds since epoch.
	CreatedAtMs int64 `json:"created_at_ms,omitempty"`
}

Envelope is the message container for cluster communication. It wraps the payload with routing and metadata information.

func (Envelope) Expired

func (e Envelope) Expired() bool

Expired returns true if the envelope has a TTL set and has expired.

func (Envelope) GetHeader

func (e Envelope) GetHeader(key string) (string, bool)

GetHeader retrieves a header value by key. Returns empty string and false if the header is not set.

func (Envelope) TTL

func (e Envelope) TTL() time.Duration

TTL returns the remaining TTL duration. Returns 0 if no TTL is set or already expired.

func (Envelope) Validate

func (e Envelope) Validate() error

Validate checks the envelope for configuration errors. It returns ErrReservedHeader if any header uses the reserved "x-clstr-" prefix.

type EnvelopeOption

type EnvelopeOption func(*Envelope)

EnvelopeOption configures envelope properties before sending.

func WithHeader

func WithHeader(key, value string) EnvelopeOption

WithHeader adds a custom header to the envelope. Headers with the "x-clstr-" prefix are reserved and will cause validation to fail.

func WithTTL

func WithTTL(ttl time.Duration) EnvelopeOption

WithTTL sets the time-to-live for the envelope.

type GetNodeInfoRequest

type GetNodeInfoRequest struct{}

GetNodeInfoRequest is a request for node metadata. Send via ScopedClient.GetNodeInfo.

type GetNodeInfoResponse

type GetNodeInfoResponse struct {
	// NodeID is the unique identifier of the responding node.
	NodeID string `json:"node_id"`
	// Shards is the list of shard IDs owned by this node.
	Shards []uint32 `json:"shards"`
}

GetNodeInfoResponse contains node metadata returned by GetNodeInfoRequest.

type MemoryTransport

type MemoryTransport struct {
	// contains filtered or unexported fields
}

func NewInMemoryTransport

func NewInMemoryTransport(opts ...MemoryTransportOpts) *MemoryTransport

func (*MemoryTransport) Close

func (t *MemoryTransport) Close() error

func (*MemoryTransport) Request

func (t *MemoryTransport) Request(ctx context.Context, env Envelope) ([]byte, error)

func (*MemoryTransport) SubscribeShard

func (t *MemoryTransport) SubscribeShard(
	ctx context.Context,
	shardID uint32,
	h func(context.Context, Envelope) ([]byte, error),
) (Subscription, error)

func (*MemoryTransport) WithLog

func (t *MemoryTransport) WithLog(log *slog.Logger) *MemoryTransport

type MemoryTransportOpts

type MemoryTransportOpts struct {
	// MaxConcurrentHandlers limits concurrent handler goroutines.
	// 0 = default (100), -1 = unlimited
	MaxConcurrentHandlers int

	// HandlerTimeout is the maximum duration for a handler to complete.
	// 0 = default (30s)
	HandlerTimeout time.Duration
}

MemoryTransportOpts configures the in-memory transport.

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node represents a cluster member that owns and processes shards. Create via NewNode and start with Node.Run.

func NewNode

func NewNode(opts NodeOptions) *Node

NewNode creates a new cluster node with the given options.

func (*Node) Run

func (n *Node) Run(ctx context.Context) error

Run starts the node, subscribing to all owned shards and handling incoming messages. Returns an error if subscription fails for any shard.

type NodeOptions

type NodeOptions struct {
	// Log is the logger for node operations. Defaults to slog.Default().
	Log *slog.Logger
	// NodeID is the unique identifier for this node. Auto-generated if empty.
	NodeID string
	// Transport is the server transport for receiving messages (required).
	Transport ServerTransport
	// Shards is the list of shard IDs this node owns.
	// Use [ShardsForNode] to compute this based on cluster membership.
	Shards []uint32
	// Handler processes incoming messages for owned shards.
	Handler ServerHandlerFunc
	// Metrics for node instrumentation. If nil, a no-op implementation is used.
	Metrics ClusterMetrics
}

NodeOptions configures node creation.

type Request

type Request[IN any, OUT any] struct {
	// contains filtered or unexported fields
}

Request is a typed request builder for making type-safe cluster requests.

func NewRequest

func NewRequest[IN any, OUT any](requester Requester) *Request[IN, OUT]

NewRequest creates a typed request builder for the given requester. Use this for type-safe request/response patterns.

func (*Request[IN, OUT]) Request

func (r *Request[IN, OUT]) Request(ctx context.Context, payload IN) (out *OUT, err error)

Request sends the payload and deserializes the response into *OUT.

type Requester

type Requester interface {
	// contains filtered or unexported methods
}

Requester is implemented by types that can send raw requests.

type ScopedClient

type ScopedClient struct {
	// contains filtered or unexported fields
}

ScopedClient is a client bound to a specific shard, enabling fluent request/notify operations without repeatedly specifying the shard or key.

func (*ScopedClient) GetNodeInfo

func (c *ScopedClient) GetNodeInfo(ctx context.Context) (res *GetNodeInfoResponse, err error)

GetNodeInfo queries the node that owns this shard for its metadata.

func (*ScopedClient) Notify

func (c *ScopedClient) Notify(ctx context.Context, msg any, opts ...EnvelopeOption) error

Notify sends a fire-and-forget message to the scoped shard.

func (*ScopedClient) Request

func (c *ScopedClient) Request(ctx context.Context, payload any, opts ...EnvelopeOption) ([]byte, error)

Request sends a request to the scoped shard and waits for the response. The payload is JSON-encoded and the message type is derived from the payload type.

type ScopedHandlerOpts

type ScopedHandlerOpts struct {
	Extract func(env Envelope) (key string, err error)
	Create  func(key string) (ServerHandlerFunc, error)
	// CacheSize limits the number of cached handlers.
	// 0 = default (1024), -1 = unlimited (original behavior)
	CacheSize int
}

type ServerHandlerFunc

type ServerHandlerFunc = func(ctx context.Context, env Envelope) ([]byte, error)

ServerHandlerFunc is the callback for processing incoming shard messages. It receives the envelope and returns a response or error.

func NewActorHandler

func NewActorHandler(actFactory ActorFactory) ServerHandlerFunc

func NewKeyHandler

func NewKeyHandler(createFunc func(key string) (ServerHandlerFunc, error)) ServerHandlerFunc

func NewKeyHandlerWithOpts

func NewKeyHandlerWithOpts(createFunc func(key string) (ServerHandlerFunc, error), cacheSize int) ServerHandlerFunc

func NewScopedHandler

func NewScopedHandler(opts ScopedHandlerOpts) ServerHandlerFunc

type ServerTransport

type ServerTransport interface {
	// SubscribeShard registers a handler for messages targeting the given shard.
	// Returns a subscription that can be used to unsubscribe.
	SubscribeShard(ctx context.Context, shardID uint32, h ServerHandlerFunc) (Subscription, error)

	// Close releases transport resources and stops all subscriptions.
	Close() error
}

ServerTransport is the interface for receiving messages from the cluster.

type Subscription

type Subscription interface {
	// Unsubscribe stops receiving messages for the subscribed shard.
	Unsubscribe() error
}

Subscription represents an active shard subscription that can be unsubscribed.

type Transport

type Transport interface {
	ClientTransport
	ServerTransport
}

Transport combines client and server transport capabilities. Implementations typically wrap a messaging system like NATS.

func CreateInMemoryTransport

func CreateInMemoryTransport(t *testing.T) Transport

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL