Documentation
¶
Overview ¶
Package cluster handles peer discovery and rendezvous-hash coordinator selection.
Peer discovery: the headless Kubernetes Service backing the Orca Deployment publishes Pod IPs in its A-record. We poll DNS at cluster.membership_refresh interval (default 5s) and snapshot the peer set.
Coordinator selection: rendezvous hashing on (peer_ip, ChunkKey) picks one coordinator per chunk across the cluster.
Internal RPC: each replica runs an HTTP/2 client to dial peers' internal listeners (mTLS in production, plain in dev). The listener side is in the server/internal handler.
Test seams ¶
Production constructs a DNS-backed PeerSource implicitly from cfg.Cluster.Service + net.DefaultResolver. Tests substitute the entire mechanism with WithPeerSource (typically a mutable StaticPeerSource per replica).
Index ¶
- Variables
- func DecodeChunkKey(values url.Values) (chunk.Key, int64, error)
- func Score(p Peer, key []byte) uint64
- type Cluster
- func (c *Cluster) Close(ctx context.Context) error
- func (c *Cluster) Coordinator(k chunk.Key) Peer
- func (c *Cluster) FillFromPeer(ctx context.Context, p Peer, k chunk.Key, objectSize int64) (io.ReadCloser, error)
- func (c *Cluster) HasInitialSnapshot() bool
- func (c *Cluster) IsCoordinator(k chunk.Key) bool
- func (c *Cluster) Peers() []Peer
- type Option
- type Peer
- type PeerSource
Constants ¶
This section is empty.
Variables ¶
var ErrPeerNotCoordinator = fmt.Errorf("cluster: peer is not the coordinator (409 Conflict)")
ErrPeerNotCoordinator is returned by FillFromPeer when the peer reports it is not the coordinator (membership disagreement).
Functions ¶
func DecodeChunkKey ¶
DecodeChunkKey parses query params into a Key plus the authoritative object size. Used by the internal listener (server/internal/fill).
Types ¶
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster manages peer discovery, rendezvous hashing, and the internal-RPC client.
func (*Cluster) Close ¶
Close stops the refresh goroutine and waits for it to exit. If ctx is canceled before the goroutine exits (e.g. an in-flight DNS lookup is taking longer than the caller can tolerate) Close returns the context error. The underlying cancellation is always signalled, so the goroutine will exit eventually even if the caller stops waiting.
func (*Cluster) Coordinator ¶
Coordinator selects the rendezvous-hashed coordinator for a chunk.
Returns the Peer with the highest hash(peer || chunk_path) score. Peers() always returns at least one entry (self, via the bootstrap fallback in Peers and the never-empty post-condition of every branch in refresh), so this function does not need to handle an empty input.
func (*Cluster) FillFromPeer ¶
func (c *Cluster) FillFromPeer(ctx context.Context, p Peer, k chunk.Key, objectSize int64) (io.ReadCloser, error)
FillFromPeer issues GET /internal/fill against the named peer and returns the streaming chunk body. Caller closes the returned reader. objectSize is the authoritative size of the object the chunk belongs to; it is forwarded to the peer so the leader can compute the correct per-chunk length (especially for the tail chunk) and set Content-Length on its response.
func (*Cluster) HasInitialSnapshot ¶
HasInitialSnapshot reports whether the cluster has loaded at least one peer-set snapshot (success or failure path - any value stored by refresh counts). Used by the app's /readyz endpoint to gate readiness on cluster discovery having completed its initial pass. Returns false only during the bootstrap window before refresh runs even once.
func (*Cluster) IsCoordinator ¶
IsCoordinator reports whether this replica is the coordinator for k. Every code path producing a coord value stamps the Self flag authoritatively (dnsPeerSource matches by selfIP; StaticPeerSource by (selfIP, selfPort); the empty-peer-set fallback constructs c.self()), so checking Self is the single source of truth.
type Option ¶
type Option func(*Cluster)
Option configures a Cluster at construction time.
func WithHTTPClient ¶
WithHTTPClient overrides the internal-RPC HTTP client. TEST-ONLY: production constructs the default client from cfg via newHTTPClient. Used by unit tests that need to inject a client with custom timeouts or transport behaviour for deterministic deadline coverage.
func WithLogger ¶
WithLogger overrides the cluster's structured logger. The default is slog.Default(). The logger receives debug-level emissions for every refresh cycle, coordinator selection, and FillFromPeer call, plus warn-level emissions for retained-previous-snapshot fallback.
func WithPeerSource ¶
func WithPeerSource(s PeerSource) Option
WithPeerSource replaces the entire peer-discovery mechanism. This is the primary test seam; production code constructs the default DNS-backed source implicitly from cfg.Cluster.Service.
type Peer ¶
type Peer struct {
IP string
Port int // 0 = use cfg.Cluster.InternalListen's port (production)
Self bool // true when this Peer entry represents the local replica
}
Peer represents one replica in the current peer-set snapshot.
In production every Peer has Port == 0 because pod IPs are addressed on the same internal-listener port across the Deployment. Integration tests with multiple replicas sharing 127.0.0.1 set Port to the per-replica OS-assigned port; in that mode FillFromPeer dials peer.IP:peer.Port instead of falling back to cfg.Cluster.InternalListen's port.
type PeerSource ¶
PeerSource produces the current peer-set snapshot. The DNS-backed implementation queries the headless Service's A-record. Tests substitute a StaticPeerSource that returns a mutable list of peers with explicit Port values (so multiple replicas can share an IP).
Each returned Peer.Self must be authoritatively set by the source (the source knows the calling replica's identity at construction time, so it is the only place that can stamp Self correctly when peers share an IP).