Documentation
¶
Overview ¶
Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing. It depends on nothing engine-specific; Redis-backed membership and the shard runtime live alongside it / in adapters.
Index ¶
- Constants
- func CtrlStream(shardID string) string
- func DeltaChannel(gatewayID string) string
- func EventStream(p int) string
- func HeartbeatKey(shardID string) string
- func Owner(p int, live []string) string
- func Owns(self string, p int, live []string) bool
- func PartitionOf(tenantID, entity string) int
- type Control
- type ControlBus
- type DeltaBus
- type Gateway
- type GatewayDelta
- type GatewayDeps
- type Membership
- type Shard
- type ShardDeps
- type ShardOptions
Constants ¶
const ( OpSubscribe = "subscribe" OpUnsubscribe = "unsubscribe" OpReanchor = "reanchor" )
Control op kinds carried gateway→shard.
const Partitions = 256
Partitions is the fixed number of data partitions. A change and the subscriptions over its (tenant, entity) always land on the same partition, so one shard owns both. Re-partitioning is an operational migration.
Variables ¶
This section is empty.
Functions ¶
func CtrlStream ¶
CtrlStream is the per-shard control request stream (subscribe/reanchor/...).
func DeltaChannel ¶
DeltaChannel is the per-gateway delta stream the shards publish to.
func EventStream ¶
EventStream is the Redis stream a partition's events are published to and the owning shard tails.
func HeartbeatKey ¶
HeartbeatKey is a shard's liveness key.
func Owner ¶
Owner returns the shard that owns partition p among the live set: the shard maximizing the rendezvous weight hash(shard, p). Returns "" if live is empty. Pure given the live set, so every node agrees without a coordinator, and removing a shard reassigns only that shard's partitions.
func PartitionOf ¶
PartitionOf maps a (tenant, entity) to its partition. Deterministic and process-independent (FNV-1a), so every node computes the same routing.
Types ¶
type Control ¶
type Control struct {
Op string
SubID string
GatewayID string
TenantID string
Query livequery.LiveQuery
Cursor *livequery.Cursor // OpReanchor
Limit int // OpReanchor
}
Control is a gateway→shard request, routed to the shard that owns the subscription's partition over that shard's control stream.
type ControlBus ¶
type ControlBus interface {
SendControl(ctx context.Context, shardID string, c Control) error
Control(ctx context.Context, shardID string) (<-chan Control, func(), error)
}
ControlBus routes control messages to the owning shard. Redis backs it in production (per-shard request streams); an in-memory implementation drives the multi-process harness.
type DeltaBus ¶
type DeltaBus interface {
SendDelta(ctx context.Context, gatewayID string, d GatewayDelta) error
Deltas(ctx context.Context, gatewayID string) (<-chan GatewayDelta, func(), error)
}
DeltaBus routes deltas from shards back to gateways (per-gateway channels).
type Gateway ¶
type Gateway struct {
// contains filtered or unexported fields
}
Gateway terminates client subscriptions: it routes control to the owning shard and demultiplexes its delta channel back to per-subscription streams. (This is the thin in-process gateway; a standalone SSE/WS gateway process builds on the same protocol.)
func NewGateway ¶
func NewGateway(id string, deps GatewayDeps) *Gateway
NewGateway builds a gateway.
func (*Gateway) Reanchor ¶
func (g *Gateway) Reanchor(ctx context.Context, subID string, q livequery.LiveQuery, cursor *livequery.Cursor, limit int) error
Reanchor routes a reanchor control to the current owner of the subscription's partition (recomputed, so it survives a failover).
func (*Gateway) Run ¶
Run pumps this gateway's delta channel, demuxing each frame to the right subscription stream, until ctx is cancelled.
func (*Gateway) Subscribe ¶
func (g *Gateway) Subscribe(ctx context.Context, q livequery.LiveQuery) (id string, stream <-chan livequery.LiveDelta, release func(), retErr error)
Subscribe registers a live query: it routes a subscribe control to the shard that owns the query's partition and returns the demuxed delta stream. The initial snapshot arrives on that stream as an OpReset followed by OpEnter rows (the same encoding a failover re-snapshot uses).
type GatewayDelta ¶
GatewayDelta is one delta tagged with its subscription, en route to a gateway.
type GatewayDeps ¶
type GatewayDeps struct {
Members Membership
Control ControlBus
Delta DeltaBus
}
GatewayDeps wires a gateway.
type Membership ¶
type Membership interface {
// Heartbeat refreshes this shard's TTL liveness key; call it on a ticker
// at an interval well under the TTL.
Heartbeat(ctx context.Context, shardID string) error
// LiveShards returns the currently-live shard ids (unexpired heartbeats),
// sorted, so Owner is stable across callers.
LiveShards(ctx context.Context) ([]string, error)
// Leave removes this shard's heartbeat immediately (graceful shutdown).
Leave(ctx context.Context, shardID string) error
}
Membership is the liveness substrate: shards refresh a TTL heartbeat, and any node can read the currently-live set. Redis backs it in production.
type Shard ¶
type Shard struct {
// contains filtered or unexported fields
}
Shard is one matcher node: it heartbeats, owns a slice of partitions by HRW, serves subscribe/reanchor/unsubscribe control for partitions it owns, and on (re)assignment rebuilds the subscriptions it now owns from the durable registry and re-snapshots them (a transparent failover for clients).
type ShardDeps ¶
type ShardDeps struct {
Engine *livequery.Engine
Registry livequery.SubscriptionRegistry
Members Membership
Control ControlBus
Delta DeltaBus
}
ShardDeps wires a shard. Engine is a single-node live query engine whose feed tails this shard's partition streams (lq:events:{p}); the shard reuses it unchanged and adds only ownership + the gateway protocol around it.