cluster

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package cluster holds the engine-neutral coordination primitives for the sharded live query matcher tier: how data maps to partitions, and how live shards divide those partitions among themselves by rendezvous (HRW) hashing. It depends on nothing engine-specific; Redis-backed membership and the shard runtime live alongside it / in adapters.

Index

Constants

View Source
const (
	OpSubscribe   = "subscribe"
	OpUnsubscribe = "unsubscribe"
	OpReanchor    = "reanchor"
)

Control op kinds carried gateway→shard.

View Source
const Partitions = 256

Partitions is the fixed number of data partitions. A change and the subscriptions over its (tenant, entity) always land on the same partition, so one shard owns both. Re-partitioning is an operational migration.

Variables

This section is empty.

Functions

func CtrlStream

func CtrlStream(shardID string) string

CtrlStream is the per-shard control request stream (subscribe/reanchor/...).

func DeltaChannel

func DeltaChannel(gatewayID string) string

DeltaChannel is the per-gateway delta stream the shards publish to.

func EventStream

func EventStream(p int) string

EventStream is the Redis stream a partition's events are published to and the owning shard tails.

func HeartbeatKey

func HeartbeatKey(shardID string) string

HeartbeatKey is a shard's liveness key.

func Owner

func Owner(p int, live []string) string

Owner returns the shard that owns partition p among the live set: the shard maximizing the rendezvous weight hash(shard, p). Returns "" if live is empty. Pure given the live set, so every node agrees without a coordinator, and removing a shard reassigns only that shard's partitions.

func Owns

func Owns(self string, p int, live []string) bool

Owns reports whether self owns partition p among the live set.

func PartitionOf

func PartitionOf(tenantID, entity string) int

PartitionOf maps a (tenant, entity) to its partition. Deterministic and process-independent (FNV-1a), so every node computes the same routing.

Types

type Control

type Control struct {
	Op        string
	SubID     string
	GatewayID string
	TenantID  string
	Query     livequery.LiveQuery
	Cursor    *livequery.Cursor // OpReanchor
	Limit     int               // OpReanchor
}

Control is a gateway→shard request, routed to the shard that owns the subscription's partition over that shard's control stream.

type ControlBus

type ControlBus interface {
	SendControl(ctx context.Context, shardID string, c Control) error
	Control(ctx context.Context, shardID string) (<-chan Control, func(), error)
}

ControlBus routes control messages to the owning shard. Redis backs it in production (per-shard request streams); an in-memory implementation drives the multi-process harness.

type DeltaBus

type DeltaBus interface {
	SendDelta(ctx context.Context, gatewayID string, d GatewayDelta) error
	Deltas(ctx context.Context, gatewayID string) (<-chan GatewayDelta, func(), error)
}

DeltaBus routes deltas from shards back to gateways (per-gateway channels).

type Gateway

type Gateway struct {
	// contains filtered or unexported fields
}

Gateway terminates client subscriptions: it routes control to the owning shard and demultiplexes its delta channel back to per-subscription streams. (This is the thin in-process gateway; a standalone SSE/WS gateway process builds on the same protocol.)

func NewGateway

func NewGateway(id string, deps GatewayDeps) *Gateway

NewGateway builds a gateway.

func (*Gateway) Reanchor

func (g *Gateway) Reanchor(ctx context.Context, subID string, q livequery.LiveQuery, cursor *livequery.Cursor, limit int) error

Reanchor routes a reanchor control to the current owner of the subscription's partition (recomputed, so it survives a failover).

func (*Gateway) Run

func (g *Gateway) Run(ctx context.Context) error

Run pumps this gateway's delta channel, demuxing each frame to the right subscription stream, until ctx is cancelled.

func (*Gateway) Subscribe

func (g *Gateway) Subscribe(ctx context.Context, q livequery.LiveQuery) (id string, stream <-chan livequery.LiveDelta, release func(), retErr error)

Subscribe registers a live query: it routes a subscribe control to the shard that owns the query's partition and returns the demuxed delta stream. The initial snapshot arrives on that stream as an OpReset followed by OpEnter rows (the same encoding a failover re-snapshot uses).

type GatewayDelta

type GatewayDelta struct {
	SubID string
	Delta livequery.LiveDelta
}

GatewayDelta is one delta tagged with its subscription, en route to a gateway.

type GatewayDeps

type GatewayDeps struct {
	Members Membership
	Control ControlBus
	Delta   DeltaBus
}

GatewayDeps wires a gateway.

type Membership

type Membership interface {
	// Heartbeat refreshes this shard's TTL liveness key; call it on a ticker
	// at an interval well under the TTL.
	Heartbeat(ctx context.Context, shardID string) error
	// LiveShards returns the currently-live shard ids (unexpired heartbeats),
	// sorted, so Owner is stable across callers.
	LiveShards(ctx context.Context) ([]string, error)
	// Leave removes this shard's heartbeat immediately (graceful shutdown).
	Leave(ctx context.Context, shardID string) error
}

Membership is the liveness substrate: shards refresh a TTL heartbeat, and any node can read the currently-live set. Redis backs it in production.

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

Shard is one matcher node: it heartbeats, owns a slice of partitions by HRW, serves subscribe/reanchor/unsubscribe control for partitions it owns, and on (re)assignment rebuilds the subscriptions it now owns from the durable registry and re-snapshots them (a transparent failover for clients).

func NewShard

func NewShard(id string, deps ShardDeps, opts ShardOptions) *Shard

NewShard builds a shard.

func (*Shard) Run

func (s *Shard) Run(ctx context.Context) error

Run drives the shard until ctx is cancelled. It heartbeats, reconciles ownership, and serves control. All shard-state mutation happens on this goroutine; only the per-subscription delta pumps run elsewhere.

type ShardDeps

type ShardDeps struct {
	Engine   *livequery.Engine
	Registry livequery.SubscriptionRegistry
	Members  Membership
	Control  ControlBus
	Delta    DeltaBus
}

ShardDeps wires a shard. Engine is a single-node live query engine whose feed tails this shard's partition streams (lq:events:{p}); the shard reuses it unchanged and adds only ownership + the gateway protocol around it.

type ShardOptions

type ShardOptions struct {
	HeartbeatInterval time.Duration
	OwnershipInterval time.Duration
}

ShardOptions tunes the loop cadences.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL