coord

package
v0.1.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package coord implements Gantry's libp2p coordination RPCs.

Wire protocol: `/gantry/coord/1.0.0` (one libp2p stream per request/response pair, closed after reply). Framing: length-delimited protobuf via `go-msgio` - the design forbids gRPC (the design doc). Forward compatibility: additive changes bump the minor (e.g. `1.1.0`); breaking changes bump the major.

Two coordinated message families:

- `pull_intent_query` / `pull_intent_response` (the step 4) - a stateless probe asking a peer "do you have this digest cached, are you pulling it, or have you recently failed to pull it?". The responder fills hrw_rank from its own view of cluster membership so the requester can detect informer divergence (the design doc).

- `please_pull` / `please_pull_response` (the step 6) - asks a peer (the designated puller per HRW) to pull one or more digests of a single repo. The responder's in-flight map dedupes; we get STARTED / ALREADY_PULLING / RECENTLY_FAILED per-digest results back.

This package owns both the server-side stream handler and a typed client. Higher layers (cold-start orchestrator, mirror) interact only via the `ifaces.Coordinator` interface.

Index

Constants

View Source
const DefaultMaxConcurrentStreams = 512

DefaultMaxConcurrentStreams caps simultaneous inbound coord streams. A hostile or buggy peer can open thousands of streams; without a cap each consumes a goroutine for up to streamHandshakeTimeout. libp2p's resource manager is the next defence layer; this is a cheap, predictable, server- local gate.

View Source
const DefaultStreamHandshakeTimeout = 5 * time.Second

DefaultStreamHandshakeTimeout bounds how long the server is willing to wait for the *first* envelope on an inbound stream and how long it gives itself to write the response. A peer that opens a stream and never sends bytes - accidental (NAT death) or malicious (slowloris-style resource exhaustion) - must not pin a goroutine indefinitely.

5s comfortably covers a healthy in-cluster round-trip while still bounding the worst case. The deadline is set on the underlying libp2p stream so both r.ReadMsg and w.WriteMsg observe it; dispatch runs under its own 2s context (see handleStream). Override via WithStreamHandshakeTimeout for tests.

View Source
const MaxMessageBytes = 1 << 20

MaxMessageBytes caps a single inbound Envelope. PullIntentRequest is tiny; PleasePullRequest grows linearly with batch size. 1 MiB is orders of magnitude beyond the realistic ceiling but keeps memory bounded under malformed input.

View Source
const ProtocolID protocol.ID = "/gantry/coord/1.0.0"

ProtocolID is the libp2p stream protocol the coord handler binds.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client opens a libp2p stream per RPC. Members is used to resolve a ifaces.NodeID to a libp2p peer.ID for the dial. ships a minimal NodeID->peer.ID mapping that accepts the libp2p peer.ID string form directly as the NodeID (matches what `internal/discovery.Host` returns from FindProviders). Real K8s-pod-name -> peer.ID mapping is owned by `internal/members` and surfaces through a richer Node type in +.

func NewClient

func NewClient(h host.Host, opts ...ClientOption) *Client

NewClient returns a coord RPC client. h must be a running libp2p host already participating in the coord protocol's transports.

func (*Client) PleasePull

func (c *Client) PleasePull(ctx context.Context, target ifaces.NodeID, registry, repository string, kind ifaces.OriginRefKind, digests []digest.Digest) ([]ifaces.PleasePullOutcome, error)

PleasePull implements ifaces.Coordinator.

func (*Client) PullIntentQuery

func (c *Client) PullIntentQuery(ctx context.Context, target ifaces.NodeID, d digest.Digest) (ifaces.PullIntent, error)

PullIntentQuery implements ifaces.Coordinator.

func (*Client) ResolvePeerID

func (c *Client) ResolvePeerID(id ifaces.NodeID, pid peer.ID)

ResolvePeerID lets external wiring teach the client how to map a NodeID to a libp2p peer.ID. Higher layers (members, discovery) own this mapping; we just cache lookups.

type ClientOption

type ClientOption func(*Client)

ClientOption configures a Client.

func WithClientLogger

func WithClientLogger(l *slog.Logger) ClientOption

WithClientLogger overrides the logger.

func WithDialTimeout

func WithDialTimeout(d time.Duration) ClientOption

WithDialTimeout overrides the per-RPC dial timeout (default 2s).

func WithPeerIDResolver

func WithPeerIDResolver(fn func(ifaces.NodeID) (peer.ID, bool)) ClientOption

WithPeerIDResolver installs a function that maps a NodeID to a libp2p peer.ID at dial time. The resolver is consulted before the static teach-cache populated by ResolvePeerID; returning (_, false) falls through to the cache and then to peer.Decode(NodeID). Used by main.go to bridge K8s node names -> libp2p peer.IDs published via members' pod-annotation announcements (the design doc).

func WithRPCTimeout

func WithRPCTimeout(d time.Duration) ClientOption

WithRPCTimeout overrides the per-RPC end-to-end timeout (default 2s).

type MetricsHooks

type MetricsHooks struct {
	// OnPullIntentServed fires once per pull_intent_query handled.
	OnPullIntentServed func()
	// OnPullIntentStorageUnavailable fires once per pull_intent_query
	// (wire or local) whose has_cached=false answer was caused by the
	// primary or secondary storage backend returning ifaces.ErrUnavailable
	// rather than a definitive miss. Operators use this to distinguish
	// "DHT routes around us because we genuinely lack the blob" from
	// "DHT routes around us because containerd is unreachable on this
	// node" - the latter should also be caught by readiness, but the
	// metric makes transient storage flaps observable independently of
	// the readyz signal.
	OnPullIntentStorageUnavailable func()
	// OnPleasePullServed fires once per please_pull *request* handled
	// (not per digest in the batch).
	OnPleasePullServed func()
	// OnPleasePullStarted is called once per digest the server
	// transitions into in_flight from a please_pull batch.
	OnPleasePullStarted func()
	// OnStreamError fires for any malformed or oversized stream.
	OnStreamError func()
}

MetricsHooks lets callers wire Prometheus counters/gauges without importing the metrics package. All fields may be nil.

type NegativeCache

type NegativeCache interface {
	Lookup(d digest.Digest) (entry NegativeEntry, ok bool)
}

NegativeCache is the read interface coord needs from the circuit-breaker . Returning ok == false means the digest has no negative-cache entry on this node.

type NegativeEntry

type NegativeEntry struct {
	CooldownUntil time.Time
	Class         ifaces.FailureClass
}

NegativeEntry mirrors the design doc state for a single digest.

type Option

type Option func(*Server)

Option configures a Server.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger plumbs a structured logger.

func WithMaxConcurrentStreams

func WithMaxConcurrentStreams(n int) Option

WithMaxConcurrentStreams overrides DefaultMaxConcurrentStreams. Non-positive values are ignored.

func WithMetrics

func WithMetrics(h MetricsHooks) Option

WithMetrics attaches metric callbacks.

func WithNegativeCache

func WithNegativeCache(n NegativeCache) Option

WithNegativeCache attaches a the design doc read interface. nil is fine; the response just doesn't set recently_failed.

func WithPullerPump

func WithPullerPump(p PullerPump) Option

WithPullerPump wires the please_pull handler to the local origin puller. Required for please_pull to do useful work; without it the handler returns OUTCOME_UNSPECIFIED.

func WithStreamHandshakeTimeout

func WithStreamHandshakeTimeout(d time.Duration) Option

WithStreamHandshakeTimeout overrides DefaultStreamHandshakeTimeout. Intended for tests; non-positive values are ignored.

type PullerPump

type PullerPump func(ctx context.Context, registry, repository string, d digest.Digest, kind ifaces.OriginRefKind) (startedAt time.Time, alreadyPulling bool, fail *NegativeEntry)

PullerPump is invoked by the please_pull handler with a fully- classified pull request. It MUST return promptly: the call happens inside the stream handler and the server response can't be written until pump returns. Long-running work (the actual origin pull) MUST be moved to a goroutine inside the pump's implementation.

The returned (started_at, alreadyPulling) tuple drives the wire- level OUTCOME_STARTED vs OUTCOME_ALREADY_PULLING decision.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handles inbound coord streams: pull_intent_query and please_pull RPCs. One stream per request, closed after reply.

func NewServer

func NewServer(store ifaces.LocalContentStore, members ifaces.Members, inflight *inflight.Map, opts ...Option) *Server

NewServer constructs a coord server. The store + members + inflight dependencies are required (everything else is optional via Option).

func (*Server) Bind

func (s *Server) Bind(h host.Host)

Bind registers the stream handler on h. After Bind returns, peers dialing ProtocolID will be served by s.

func (*Server) LocalPullIntent

func (s *Server) LocalPullIntent(ctx context.Context, d digest.Digest) ifaces.PullIntent

LocalPullIntent implements ifaces.LocalIntentProvider. It returns the same PullIntent the wire-level pull_intent_query handler would produce for d, but without the libp2p stream round-trip - the cold-start orchestrator uses it to include self as a first-class participant in the rule cascade.

func (*Server) StartLocalPull

func (s *Server) StartLocalPull(ctx context.Context, registry, repository string, kind ifaces.OriginRefKind, digests []digest.Digest) ([]ifaces.PleasePullOutcome, error)

StartLocalPull implements ifaces.LocalPullStarter. It runs the same pullerPump-driven path as servePleasePull but skips the libp2p stream layer entirely. Used by the cold-start orchestrator when rule 7 picks self as the designated puller - Coord.PleasePull(self) would round-trip through libp2p (or fail to dial) for no benefit.

Returns one PleasePullOutcome per input digest. A nil/zero pump (no WithPullerPump option) yields PleasePullUnspecified entries; that matches the server-side behaviour and is what the cold-start resolver expects when origin-pull is disabled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL