cluster

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package cluster handles peer discovery and rendezvous-hash coordinator selection.

Peer discovery: the headless Kubernetes Service backing the Orca Deployment publishes Pod IPs in its A-record. We poll DNS at cluster.membership_refresh interval (default 5s) and snapshot the peer set.

Coordinator selection: rendezvous hashing on (peer_ip, ChunkKey) picks one coordinator per chunk across the cluster.

Internal RPC: each replica runs an HTTP/2 client to dial peers' internal listeners (mTLS in production, plain in dev). The listener side is in the server/internal handler.

Test seams

Production constructs a DNS-backed PeerSource implicitly from cfg.Cluster.Service + net.DefaultResolver. Tests substitute the entire mechanism with WithPeerSource (typically a mutable StaticPeerSource per replica).

Index

Constants

This section is empty.

Variables

View Source
var ErrPeerNotCoordinator = fmt.Errorf("cluster: peer is not the coordinator (409 Conflict)")

ErrPeerNotCoordinator is returned by FillFromPeer when the peer reports it is not the coordinator (membership disagreement).

Functions

func DecodeChunkKey

func DecodeChunkKey(values url.Values) (chunk.Key, int64, error)

DecodeChunkKey parses query params into a Key plus the authoritative object size. Used by the internal listener (server/internal/fill).

func Score

func Score(p Peer, key []byte) uint64

Score returns the rendezvous-hash score for (peer, key). Exposed so integration tests can craft phantom peers that deterministically win or lose against a real peer for a given key (used to induce membership disagreement scenarios).

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster manages peer discovery, rendezvous hashing, and the internal-RPC client.

func New

func New(parent context.Context, cfg config.Cluster, opts ...Option) (*Cluster, error)

New returns a Cluster and starts the membership-refresh goroutine.

func (*Cluster) Close

func (c *Cluster) Close(ctx context.Context) error

Close stops the refresh goroutine and waits for it to exit. If ctx is canceled before the goroutine exits (e.g. an in-flight DNS lookup is taking longer than the caller can tolerate) Close returns the context error. The underlying cancellation is always signalled, so the goroutine will exit eventually even if the caller stops waiting.

func (*Cluster) Coordinator

func (c *Cluster) Coordinator(k chunk.Key) Peer

Coordinator selects the rendezvous-hashed coordinator for a chunk.

Returns the Peer with the highest hash(peer || chunk_path) score. Peers() always returns at least one entry (self, via the bootstrap fallback in Peers and the never-empty post-condition of every branch in refresh), so this function does not need to handle an empty input.

func (*Cluster) FillFromPeer

func (c *Cluster) FillFromPeer(ctx context.Context, p Peer, k chunk.Key, objectSize int64) (io.ReadCloser, error)

FillFromPeer issues GET /internal/fill against the named peer and returns the streaming chunk body. Caller closes the returned reader. objectSize is the authoritative size of the object the chunk belongs to; it is forwarded to the peer so the leader can compute the correct per-chunk length (especially for the tail chunk) and set Content-Length on its response.

func (*Cluster) HasInitialSnapshot

func (c *Cluster) HasInitialSnapshot() bool

HasInitialSnapshot reports whether the cluster has loaded at least one peer-set snapshot (success or failure path - any value stored by refresh counts). Used by the app's /readyz endpoint to gate readiness on cluster discovery having completed its initial pass. Returns false only during the bootstrap window before refresh runs even once.

func (*Cluster) IsCoordinator

func (c *Cluster) IsCoordinator(k chunk.Key) bool

IsCoordinator reports whether this replica is the coordinator for k. Every code path producing a coord value stamps the Self flag authoritatively (dnsPeerSource matches by selfIP; StaticPeerSource by (selfIP, selfPort); the empty-peer-set fallback constructs c.self()), so checking Self is the single source of truth.

func (*Cluster) Peers

func (c *Cluster) Peers() []Peer

Peers returns the current peer-set snapshot.

type Option

type Option func(*Cluster)

Option configures a Cluster at construction time.

func WithHTTPClient

func WithHTTPClient(c *http.Client) Option

WithHTTPClient overrides the internal-RPC HTTP client. TEST-ONLY: production constructs the default client from cfg via newHTTPClient. Used by unit tests that need to inject a client with custom timeouts or transport behaviour for deterministic deadline coverage.

func WithLogger

func WithLogger(log *slog.Logger) Option

WithLogger overrides the cluster's structured logger. The default is slog.Default(). The logger receives debug-level emissions for every refresh cycle, coordinator selection, and FillFromPeer call, plus warn-level emissions for retained-previous-snapshot fallback.

func WithPeerSource

func WithPeerSource(s PeerSource) Option

WithPeerSource replaces the entire peer-discovery mechanism. This is the primary test seam; production code constructs the default DNS-backed source implicitly from cfg.Cluster.Service.

type Peer

type Peer struct {
	IP   string
	Port int  // 0 = use cfg.Cluster.InternalListen's port (production)
	Self bool // true when this Peer entry represents the local replica
}

Peer represents one replica in the current peer-set snapshot.

In production every Peer has Port == 0 because pod IPs are addressed on the same internal-listener port across the Deployment. Integration tests with multiple replicas sharing 127.0.0.1 set Port to the per-replica OS-assigned port; in that mode FillFromPeer dials peer.IP:peer.Port instead of falling back to cfg.Cluster.InternalListen's port.

type PeerSource

type PeerSource interface {
	Peers(ctx context.Context) ([]Peer, error)
}

PeerSource produces the current peer-set snapshot. The DNS-backed implementation queries the headless Service's A-record. Tests substitute a StaticPeerSource that returns a mutable list of peers with explicit Port values (so multiple replicas can share an IP).

Each returned Peer.Self must be authoritatively set by the source (the source knows the calling replica's identity at construction time, so it is the only place that can stamp Self correctly when peers share an IP).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL