Documentation
¶
Overview ¶
Package coord implements Gantry's libp2p coordination RPCs.
Wire protocol: `/gantry/coord/1.0.0` (one libp2p stream per request/response pair, closed after reply). Framing: length-delimited protobuf via `go-msgio` - the design forbids gRPC (the design doc). Forward compatibility: additive changes bump the minor (e.g. `1.1.0`); breaking changes bump the major.
Two coordinated message families:
- `pull_intent_query` / `pull_intent_response` (the step 4) - a stateless probe asking a peer "do you have this digest cached, are you pulling it, or have you recently failed to pull it?". The responder fills hrw_rank from its own view of cluster membership so the requester can detect informer divergence (the design doc).
- `please_pull` / `please_pull_response` (the step 6) - asks a peer (the designated puller per HRW) to pull one or more digests of a single repo. The responder's in-flight map dedupes; we get STARTED / ALREADY_PULLING / RECENTLY_FAILED per-digest results back.
This package owns both the server-side stream handler and a typed client. Higher layers (cold-start orchestrator, mirror) interact only via the `ifaces.Coordinator` interface.
Index ¶
- Constants
- type Client
- func (c *Client) PleasePull(ctx context.Context, target ifaces.NodeID, registry, repository string, ...) ([]ifaces.PleasePullOutcome, error)
- func (c *Client) PullIntentQuery(ctx context.Context, target ifaces.NodeID, d digest.Digest) (ifaces.PullIntent, error)
- func (c *Client) ResolvePeerID(id ifaces.NodeID, pid peer.ID)
- type ClientOption
- type MetricsHooks
- type NegativeCache
- type NegativeEntry
- type Option
- func WithLogger(l *slog.Logger) Option
- func WithMaxConcurrentStreams(n int) Option
- func WithMaxDigestsPerPleasePull(n int) Option
- func WithMetrics(h MetricsHooks) Option
- func WithNegativeCache(n NegativeCache) Option
- func WithPeerAuthz(enforce bool) Option
- func WithPullerPump(p PullerPump) Option
- func WithStreamHandshakeTimeout(d time.Duration) Option
- type PullerPump
- type PumpResult
- type PumpStatus
- type Server
- func (s *Server) Bind(h host.Host)
- func (s *Server) LocalPullIntent(ctx context.Context, d digest.Digest) ifaces.PullIntent
- func (s *Server) StartLocalPull(ctx context.Context, registry, repository string, kind ifaces.OriginRefKind, ...) ([]ifaces.PleasePullOutcome, error)
- func (s *Server) Unbind(h host.Host)
Constants ¶
const DefaultMaxConcurrentStreams = 512
DefaultMaxConcurrentStreams caps simultaneous inbound coord streams. A hostile or buggy peer can open thousands of streams; without a cap each consumes a goroutine for up to streamHandshakeTimeout. libp2p's resource manager is the next defence layer; this is a cheap, predictable, server- local gate.
const DefaultMaxDigestsPerPleasePull = 256
DefaultMaxDigestsPerPleasePull caps a single inbound please_pull batch. It is well above normal OCI manifest child counts while preventing a 1 MiB coord envelope from expanding into thousands of pump calls.
const DefaultStreamHandshakeTimeout = 5 * time.Second
DefaultStreamHandshakeTimeout bounds how long the server is willing to wait for the *first* envelope on an inbound stream and how long it gives itself to write the response. A peer that opens a stream and never sends bytes - accidental (NAT death) or malicious (slowloris-style resource exhaustion) - must not pin a goroutine indefinitely.
5s comfortably covers a healthy in-cluster round-trip while still bounding the worst case. The deadline is set on the underlying libp2p stream so both r.ReadMsg and w.WriteMsg observe it; dispatch runs under its own 2s context (see handleStream). Override via WithStreamHandshakeTimeout for tests.
const MaxMessageBytes = 1 << 20
MaxMessageBytes caps a single inbound Envelope. PullIntentRequest is tiny; PleasePullRequest grows linearly with batch size. 1 MiB is orders of magnitude beyond the realistic ceiling but keeps memory bounded under malformed input.
const ProtocolID protocol.ID = "/gantry/coord/1.0.0"
ProtocolID is the libp2p stream protocol the coord handler binds.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client opens a libp2p stream per RPC. Members is used to resolve a ifaces.NodeID to a libp2p peer.ID for the dial. ships a minimal NodeID->peer.ID mapping that accepts the libp2p peer.ID string form directly as the NodeID (matches what `internal/discovery.Host` returns from FindProviders). Real K8s-pod-name -> peer.ID mapping is owned by `internal/members` and surfaces through a richer Node type in +.
func NewClient ¶
func NewClient(h host.Host, opts ...ClientOption) *Client
NewClient returns a coord RPC client. h must be a running libp2p host already participating in the coord protocol's transports.
func (*Client) PleasePull ¶
func (c *Client) PleasePull(ctx context.Context, target ifaces.NodeID, registry, repository string, kind ifaces.OriginRefKind, digests []digest.Digest) ([]ifaces.PleasePullOutcome, error)
PleasePull implements ifaces.Coordinator.
type ClientOption ¶
type ClientOption func(*Client)
ClientOption configures a Client.
func WithClientLogger ¶
func WithClientLogger(l *slog.Logger) ClientOption
WithClientLogger overrides the logger.
func WithClientMaxDigestsPerPleasePull ¶ added in v0.1.17
func WithClientMaxDigestsPerPleasePull(n int) ClientOption
WithClientMaxDigestsPerPleasePull overrides the client-side chunk size used for PleasePull. Non-positive values are ignored.
func WithDialTimeout ¶
func WithDialTimeout(d time.Duration) ClientOption
WithDialTimeout overrides the per-RPC dial timeout (default 2s).
func WithPeerIDResolver ¶
WithPeerIDResolver installs a function that maps a NodeID to a libp2p peer.ID at dial time. The resolver is consulted before the static teach-cache populated by ResolvePeerID; returning (_, false) falls through to the cache and then to peer.Decode(NodeID). Used by main.go to bridge K8s node names -> libp2p peer.IDs published via members' pod-annotation announcements (the design doc).
func WithRPCTimeout ¶
func WithRPCTimeout(d time.Duration) ClientOption
WithRPCTimeout overrides the per-RPC end-to-end timeout (default 2s).
type MetricsHooks ¶
type MetricsHooks struct {
// OnPullIntentServed fires once per pull_intent_query handled.
OnPullIntentServed func()
// (wire or local) whose has_cached=false answer was caused by the
// primary or secondary storage backend returning ifaces.ErrUnavailable
// rather than a definitive miss. Operators use this to distinguish
// "DHT routes around us because we genuinely lack the blob" from
// "DHT routes around us because containerd is unreachable on this
// node" - the latter should also be caught by readiness, but the
// metric makes transient storage flaps observable independently of
// the readyz signal.
OnPullIntentStorageUnavailable func()
// OnPleasePullServed fires once per please_pull *request* handled
// (not per digest in the batch).
OnPleasePullServed func()
// OnPleasePullStarted is called once per digest the server
// transitions into in_flight from a please_pull batch.
OnPleasePullStarted func()
// OnPleasePullDeclined fires once per digest the server declines to
// start (PumpDeclined): the puller-pump refused the work because the
// node is at its concurrent-pull ceiling or is shutting down. The
// digest is reported to the requester as OUTCOME_UNSPECIFIED. This is
// the load-shedding signal operators watch during large rollouts; a
// sustained nonzero rate means designated pullers are saturated and
// requesters are falling through to direct-origin fallback (NF5).
OnPleasePullDeclined func()
// OnStreamError fires once per inbound stream that is dropped without a
// normal reply: a malformed or oversized envelope, read/decode/deadline
// failures, the concurrent-stream limit, dispatch and serve errors, and
// response marshal/write failures. Enforce-mode authz rejections are
// deliberately NOT counted here (they are a policy decision, recorded by
// reason in OnUnauthorizedPeer), so enabling peer authz does not inflate
// the protocol-error signal.
OnStreamError func()
// libp2p peer ID is not present in the current membership view. The
// reason label distinguishes "unrecognized" (members have published
// peer IDs but none match remote) from "unevaluable" (no member has
// published a peer ID yet, so authorization cannot be evaluated - only
// reported in enforce mode). It fires in both observe-only and enforce
// mode so operators can size the false-positive rate before flipping
// enforcement on.
OnUnauthorizedPeer func(reason string)
}
MetricsHooks lets callers wire Prometheus counters/gauges without importing the metrics package. All fields may be nil.
type NegativeCache ¶
type NegativeCache interface {
Lookup(d digest.Digest) (entry NegativeEntry, ok bool)
}
NegativeCache is the read interface coord needs from the circuit-breaker . Returning ok == false means the digest has no negative-cache entry on this node.
type NegativeEntry ¶
type NegativeEntry struct {
CooldownUntil time.Time
Class ifaces.FailureClass
}
NegativeEntry mirrors the design doc state for a single digest.
type Option ¶
type Option func(*Server)
Option configures a Server.
func WithMaxConcurrentStreams ¶
WithMaxConcurrentStreams overrides DefaultMaxConcurrentStreams. Non-positive values are ignored.
func WithMaxDigestsPerPleasePull ¶ added in v0.1.17
WithMaxDigestsPerPleasePull overrides DefaultMaxDigestsPerPleasePull. Non-positive values are ignored.
func WithNegativeCache ¶
func WithNegativeCache(n NegativeCache) Option
WithNegativeCache attaches a the design doc read interface. nil is fine; the response just doesn't set recently_failed.
func WithPeerAuthz ¶ added in v0.1.17
WithPeerAuthz configures peer authorization for inbound coord requests.
Authorization compares the dialing peer's libp2p peer ID against the PeerID values published in the current membership view. When enforce is false (the default) an unrecognised peer is recorded via MetricsHooks.OnUnauthorizedPeer and still served, so operators can size the false-positive rate before flipping enforcement on. When enforce is true an unrecognised peer is rejected before its request is dispatched.
func WithPullerPump ¶
func WithPullerPump(p PullerPump) Option
WithPullerPump wires the please_pull handler to the local origin puller. Required for please_pull to do useful work; without it the handler returns OUTCOME_UNSPECIFIED.
func WithStreamHandshakeTimeout ¶
WithStreamHandshakeTimeout overrides DefaultStreamHandshakeTimeout. Intended for tests; non-positive values are ignored.
type PullerPump ¶
type PullerPump func(ctx context.Context, registry, repository string, d digest.Digest, kind ifaces.OriginRefKind) PumpResult
PullerPump is invoked by the please_pull handler with a fully- classified pull request. It MUST return promptly: the call happens inside the stream handler and the server response can't be written until pump returns. Long-running work (the actual origin pull) MUST be moved to a goroutine inside the pump's implementation.
PullerPump returns a PumpResult describing whether the digest started, piggy-backed on existing work, short-circuited on a recent failure, or was declined before background work was started.
type PumpResult ¶ added in v0.1.17
type PumpResult struct {
Status PumpStatus
StartedAt time.Time
CooldownUntil time.Time
FailureClass ifaces.FailureClass
}
PumpResult is the in-process result returned by PullerPump.
type PumpStatus ¶ added in v0.1.17
type PumpStatus int
PumpStatus is the in-process status returned by PullerPump.
const ( PumpStarted PumpStatus = iota PumpAlreadyPulling PumpRecentlyFailed PumpDeclined )
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server handles inbound coord streams: pull_intent_query and please_pull RPCs. One stream per request, closed after reply.
func NewServer ¶
func NewServer(store ifaces.LocalContentStore, members ifaces.Members, inflight *inflight.Map, opts ...Option) *Server
NewServer constructs a coord server. The store + members + inflight dependencies are required (everything else is optional via Option).
func (*Server) Bind ¶
Bind registers the stream handler on h. After Bind returns, peers dialing ProtocolID will be served by s.
func (*Server) LocalPullIntent ¶
LocalPullIntent implements ifaces.LocalIntentProvider. It returns the same PullIntent the wire-level pull_intent_query handler would produce for d, but without the libp2p stream round-trip - the cold-start orchestrator uses it to include self as a first-class participant in the rule cascade.
func (*Server) StartLocalPull ¶
func (s *Server) StartLocalPull(ctx context.Context, registry, repository string, kind ifaces.OriginRefKind, digests []digest.Digest) ([]ifaces.PleasePullOutcome, error)
StartLocalPull implements ifaces.LocalPullStarter. It runs the same pullerPump-driven path as servePleasePull but skips the libp2p stream layer entirely. Used by the cold-start orchestrator when rule 7 picks self as the designated puller - Coord.PleasePull(self) would round-trip through libp2p (or fail to dial) for no benefit.
Returns one PleasePullOutcome per input digest. A nil/zero pump (no WithPullerPump option) yields PleasePullUnspecified entries; that matches the server-side behaviour and is what the cold-start resolver expects when origin-pull is disabled.