Documentation
¶
Overview ¶
Package coord implements Gantry's libp2p coordination RPCs.
Wire protocol: `/gantry/coord/1.0.0` (one libp2p stream per request/response pair, closed after reply). Framing: length-delimited protobuf via `go-msgio` - the design forbids gRPC (the design doc). Forward compatibility: additive changes bump the minor (e.g. `1.1.0`); breaking changes bump the major.
Two coordinated message families:
- `pull_intent_query` / `pull_intent_response` (the step 4) - a stateless probe asking a peer "do you have this digest cached, are you pulling it, or have you recently failed to pull it?". The responder fills hrw_rank from its own view of cluster membership so the requester can detect informer divergence (the design doc).
- `please_pull` / `please_pull_response` (the step 6) - asks a peer (the designated puller per HRW) to pull one or more digests of a single repo. The responder's in-flight map dedupes; we get STARTED / ALREADY_PULLING / RECENTLY_FAILED per-digest results back.
This package owns both the server-side stream handler and a typed client. Higher layers (cold-start orchestrator, mirror) interact only via the `ifaces.Coordinator` interface.
Index ¶
- Constants
- type Client
- func (c *Client) PleasePull(ctx context.Context, target ifaces.NodeID, registry, repository string, ...) ([]ifaces.PleasePullOutcome, error)
- func (c *Client) PullIntentQuery(ctx context.Context, target ifaces.NodeID, d digest.Digest) (ifaces.PullIntent, error)
- func (c *Client) ResolvePeerID(id ifaces.NodeID, pid peer.ID)
- type ClientOption
- type MetricsHooks
- type NegativeCache
- type NegativeEntry
- type Option
- type PullerPump
- type Server
Constants ¶
const DefaultMaxConcurrentStreams = 512
DefaultMaxConcurrentStreams caps simultaneous inbound coord streams. A hostile or buggy peer can open thousands of streams; without a cap each consumes a goroutine for up to streamHandshakeTimeout. libp2p's resource manager is the next defence layer; this is a cheap, predictable, server- local gate.
const DefaultStreamHandshakeTimeout = 5 * time.Second
DefaultStreamHandshakeTimeout bounds how long the server is willing to wait for the *first* envelope on an inbound stream and how long it gives itself to write the response. A peer that opens a stream and never sends bytes - accidental (NAT death) or malicious (slowloris-style resource exhaustion) - must not pin a goroutine indefinitely.
5s comfortably covers a healthy in-cluster round-trip while still bounding the worst case. The deadline is set on the underlying libp2p stream so both r.ReadMsg and w.WriteMsg observe it; dispatch runs under its own 2s context (see handleStream). Override via WithStreamHandshakeTimeout for tests.
const MaxMessageBytes = 1 << 20
MaxMessageBytes caps a single inbound Envelope. PullIntentRequest is tiny; PleasePullRequest grows linearly with batch size. 1 MiB is orders of magnitude beyond the realistic ceiling but keeps memory bounded under malformed input.
const ProtocolID protocol.ID = "/gantry/coord/1.0.0"
ProtocolID is the libp2p stream protocol the coord handler binds.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client opens a libp2p stream per RPC. Members is used to resolve a ifaces.NodeID to a libp2p peer.ID for the dial. ships a minimal NodeID->peer.ID mapping that accepts the libp2p peer.ID string form directly as the NodeID (matches what `internal/discovery.Host` returns from FindProviders). Real K8s-pod-name -> peer.ID mapping is owned by `internal/members` and surfaces through a richer Node type in +.
func NewClient ¶
func NewClient(h host.Host, opts ...ClientOption) *Client
NewClient returns a coord RPC client. h must be a running libp2p host already participating in the coord protocol's transports.
func (*Client) PleasePull ¶
func (c *Client) PleasePull(ctx context.Context, target ifaces.NodeID, registry, repository string, kind ifaces.OriginRefKind, digests []digest.Digest) ([]ifaces.PleasePullOutcome, error)
PleasePull implements ifaces.Coordinator.
type ClientOption ¶
type ClientOption func(*Client)
ClientOption configures a Client.
func WithClientLogger ¶
func WithClientLogger(l *slog.Logger) ClientOption
WithClientLogger overrides the logger.
func WithDialTimeout ¶
func WithDialTimeout(d time.Duration) ClientOption
WithDialTimeout overrides the per-RPC dial timeout (default 2s).
func WithPeerIDResolver ¶
WithPeerIDResolver installs a function that maps a NodeID to a libp2p peer.ID at dial time. The resolver is consulted before the static teach-cache populated by ResolvePeerID; returning (_, false) falls through to the cache and then to peer.Decode(NodeID). Used by main.go to bridge K8s node names -> libp2p peer.IDs published via members' pod-annotation announcements (the design doc).
func WithRPCTimeout ¶
func WithRPCTimeout(d time.Duration) ClientOption
WithRPCTimeout overrides the per-RPC end-to-end timeout (default 2s).
type MetricsHooks ¶
type MetricsHooks struct {
// OnPullIntentServed fires once per pull_intent_query handled.
OnPullIntentServed func()
// (wire or local) whose has_cached=false answer was caused by the
// primary or secondary storage backend returning ifaces.ErrUnavailable
// rather than a definitive miss. Operators use this to distinguish
// "DHT routes around us because we genuinely lack the blob" from
// "DHT routes around us because containerd is unreachable on this
// node" - the latter should also be caught by readiness, but the
// metric makes transient storage flaps observable independently of
// the readyz signal.
OnPullIntentStorageUnavailable func()
// OnPleasePullServed fires once per please_pull *request* handled
// (not per digest in the batch).
OnPleasePullServed func()
// OnPleasePullStarted is called once per digest the server
// transitions into in_flight from a please_pull batch.
OnPleasePullStarted func()
// OnStreamError fires for any malformed or oversized stream.
OnStreamError func()
}
MetricsHooks lets callers wire Prometheus counters/gauges without importing the metrics package. All fields may be nil.
type NegativeCache ¶
type NegativeCache interface {
Lookup(d digest.Digest) (entry NegativeEntry, ok bool)
}
NegativeCache is the read interface coord needs from the circuit-breaker . Returning ok == false means the digest has no negative-cache entry on this node.
type NegativeEntry ¶
type NegativeEntry struct {
CooldownUntil time.Time
Class ifaces.FailureClass
}
NegativeEntry mirrors the design doc state for a single digest.
type Option ¶
type Option func(*Server)
Option configures a Server.
func WithMaxConcurrentStreams ¶
WithMaxConcurrentStreams overrides DefaultMaxConcurrentStreams. Non-positive values are ignored.
func WithNegativeCache ¶
func WithNegativeCache(n NegativeCache) Option
WithNegativeCache attaches a the design doc read interface. nil is fine; the response just doesn't set recently_failed.
func WithPullerPump ¶
func WithPullerPump(p PullerPump) Option
WithPullerPump wires the please_pull handler to the local origin puller. Required for please_pull to do useful work; without it the handler returns OUTCOME_UNSPECIFIED.
func WithStreamHandshakeTimeout ¶
WithStreamHandshakeTimeout overrides DefaultStreamHandshakeTimeout. Intended for tests; non-positive values are ignored.
type PullerPump ¶
type PullerPump func(ctx context.Context, registry, repository string, d digest.Digest, kind ifaces.OriginRefKind) (startedAt time.Time, alreadyPulling bool, fail *NegativeEntry)
PullerPump is invoked by the please_pull handler with a fully- classified pull request. It MUST return promptly: the call happens inside the stream handler and the server response can't be written until pump returns. Long-running work (the actual origin pull) MUST be moved to a goroutine inside the pump's implementation.
The returned (started_at, alreadyPulling) tuple drives the wire- level OUTCOME_STARTED vs OUTCOME_ALREADY_PULLING decision.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server handles inbound coord streams: pull_intent_query and please_pull RPCs. One stream per request, closed after reply.
func NewServer ¶
func NewServer(store ifaces.LocalContentStore, members ifaces.Members, inflight *inflight.Map, opts ...Option) *Server
NewServer constructs a coord server. The store + members + inflight dependencies are required (everything else is optional via Option).
func (*Server) Bind ¶
Bind registers the stream handler on h. After Bind returns, peers dialing ProtocolID will be served by s.
func (*Server) LocalPullIntent ¶
LocalPullIntent implements ifaces.LocalIntentProvider. It returns the same PullIntent the wire-level pull_intent_query handler would produce for d, but without the libp2p stream round-trip - the cold-start orchestrator uses it to include self as a first-class participant in the rule cascade.
func (*Server) StartLocalPull ¶
func (s *Server) StartLocalPull(ctx context.Context, registry, repository string, kind ifaces.OriginRefKind, digests []digest.Digest) ([]ifaces.PleasePullOutcome, error)
StartLocalPull implements ifaces.LocalPullStarter. It runs the same pullerPump-driven path as servePleasePull but skips the libp2p stream layer entirely. Used by the cold-start orchestrator when rule 7 picks self as the designated puller - Coord.PleasePull(self) would round-trip through libp2p (or fail to dial) for no benefit.
Returns one PleasePullOutcome per input digest. A nil/zero pump (no WithPullerPump option) yields PleasePullUnspecified entries; that matches the server-side behaviour and is what the cold-start resolver expects when origin-pull is disabled.