coord

package
v0.1.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package coord implements Gantry's libp2p coordination RPCs.

Wire protocol: `/gantry/coord/1.0.0` (one libp2p stream per request/response pair, closed after reply). Framing: length-delimited protobuf via `go-msgio` - the design forbids gRPC (the design doc). Forward compatibility: additive changes bump the minor (e.g. `1.1.0`); breaking changes bump the major.

Two coordinated message families:

- `pull_intent_query` / `pull_intent_response` (the step 4) - a stateless probe asking a peer "do you have this digest cached, are you pulling it, or have you recently failed to pull it?". The responder fills hrw_rank from its own view of cluster membership so the requester can detect informer divergence (the design doc).

- `please_pull` / `please_pull_response` (the step 6) - asks a peer (the designated puller per HRW) to pull one or more digests of a single repo. The responder's in-flight map dedupes; we get STARTED / ALREADY_PULLING / RECENTLY_FAILED per-digest results back.

This package owns both the server-side stream handler and a typed client. Higher layers (cold-start orchestrator, mirror) interact only via the `ifaces.Coordinator` interface.

Index

Constants

View Source
const DefaultMaxConcurrentStreams = 512

DefaultMaxConcurrentStreams caps simultaneous inbound coord streams. A hostile or buggy peer can open thousands of streams; without a cap each consumes a goroutine for up to streamHandshakeTimeout. libp2p's resource manager is the next defence layer; this is a cheap, predictable, server- local gate.

View Source
const DefaultMaxDigestsPerPleasePull = 256

DefaultMaxDigestsPerPleasePull caps a single inbound please_pull batch. It is well above normal OCI manifest child counts while preventing a 1 MiB coord envelope from expanding into thousands of pump calls.

View Source
const DefaultStreamHandshakeTimeout = 5 * time.Second

DefaultStreamHandshakeTimeout bounds how long the server is willing to wait for the *first* envelope on an inbound stream and how long it gives itself to write the response. A peer that opens a stream and never sends bytes - accidental (NAT death) or malicious (slowloris-style resource exhaustion) - must not pin a goroutine indefinitely.

5s comfortably covers a healthy in-cluster round-trip while still bounding the worst case. The deadline is set on the underlying libp2p stream so both r.ReadMsg and w.WriteMsg observe it; dispatch runs under its own 2s context (see handleStream). Override via WithStreamHandshakeTimeout for tests.

View Source
const MaxMessageBytes = 1 << 20

MaxMessageBytes caps a single inbound Envelope. PullIntentRequest is tiny; PleasePullRequest grows linearly with batch size. 1 MiB is orders of magnitude beyond the realistic ceiling but keeps memory bounded under malformed input.

View Source
const ProtocolID protocol.ID = "/gantry/coord/1.0.0"

ProtocolID is the libp2p stream protocol the coord handler binds.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client opens a libp2p stream per RPC. Members is used to resolve a ifaces.NodeID to a libp2p peer.ID for the dial. ships a minimal NodeID->peer.ID mapping that accepts the libp2p peer.ID string form directly as the NodeID (matches what `internal/discovery.Host` returns from FindProviders). Real K8s-pod-name -> peer.ID mapping is owned by `internal/members` and surfaces through a richer Node type in +.

func NewClient

func NewClient(h host.Host, opts ...ClientOption) *Client

NewClient returns a coord RPC client. h must be a running libp2p host already participating in the coord protocol's transports.

func (*Client) PleasePull

func (c *Client) PleasePull(ctx context.Context, target ifaces.NodeID, registry, repository string, kind ifaces.OriginRefKind, digests []digest.Digest) ([]ifaces.PleasePullOutcome, error)

PleasePull implements ifaces.Coordinator.

func (*Client) PullIntentQuery

func (c *Client) PullIntentQuery(ctx context.Context, target ifaces.NodeID, d digest.Digest) (ifaces.PullIntent, error)

PullIntentQuery implements ifaces.Coordinator.

func (*Client) ResolvePeerID

func (c *Client) ResolvePeerID(id ifaces.NodeID, pid peer.ID)

ResolvePeerID lets external wiring teach the client how to map a NodeID to a libp2p peer.ID. Higher layers (members, discovery) own this mapping; we just cache lookups.

type ClientOption

type ClientOption func(*Client)

ClientOption configures a Client.

func WithClientLogger

func WithClientLogger(l *slog.Logger) ClientOption

WithClientLogger overrides the logger.

func WithClientMaxDigestsPerPleasePull added in v0.1.17

func WithClientMaxDigestsPerPleasePull(n int) ClientOption

WithClientMaxDigestsPerPleasePull overrides the client-side chunk size used for PleasePull. Non-positive values are ignored.

func WithDialTimeout

func WithDialTimeout(d time.Duration) ClientOption

WithDialTimeout overrides the per-RPC dial timeout (default 2s).

func WithPeerIDResolver

func WithPeerIDResolver(fn func(ifaces.NodeID) (peer.ID, bool)) ClientOption

WithPeerIDResolver installs a function that maps a NodeID to a libp2p peer.ID at dial time. The resolver is consulted before the static teach-cache populated by ResolvePeerID; returning (_, false) falls through to the cache and then to peer.Decode(NodeID). Used by main.go to bridge K8s node names -> libp2p peer.IDs published via members' pod-annotation announcements (the design doc).

func WithRPCTimeout

func WithRPCTimeout(d time.Duration) ClientOption

WithRPCTimeout overrides the per-RPC end-to-end timeout (default 2s).

type MetricsHooks

type MetricsHooks struct {
	// OnPullIntentServed fires once per pull_intent_query handled.
	OnPullIntentServed func()
	// OnPullIntentStorageUnavailable fires once per pull_intent_query
	// (wire or local) whose has_cached=false answer was caused by the
	// primary or secondary storage backend returning ifaces.ErrUnavailable
	// rather than a definitive miss. Operators use this to distinguish
	// "DHT routes around us because we genuinely lack the blob" from
	// "DHT routes around us because containerd is unreachable on this
	// node" - the latter should also be caught by readiness, but the
	// metric makes transient storage flaps observable independently of
	// the readyz signal.
	OnPullIntentStorageUnavailable func()
	// OnPleasePullServed fires once per please_pull *request* handled
	// (not per digest in the batch).
	OnPleasePullServed func()
	// OnPleasePullStarted is called once per digest the server
	// transitions into in_flight from a please_pull batch.
	OnPleasePullStarted func()
	// OnPleasePullDeclined fires once per digest the server declines to
	// start (PumpDeclined): the puller-pump refused the work because the
	// node is at its concurrent-pull ceiling or is shutting down. The
	// digest is reported to the requester as OUTCOME_UNSPECIFIED. This is
	// the load-shedding signal operators watch during large rollouts; a
	// sustained nonzero rate means designated pullers are saturated and
	// requesters are falling through to direct-origin fallback (NF5).
	OnPleasePullDeclined func()
	// OnStreamError fires once per inbound stream that is dropped without a
	// normal reply: a malformed or oversized envelope, read/decode/deadline
	// failures, the concurrent-stream limit, dispatch and serve errors, and
	// response marshal/write failures. Enforce-mode authz rejections are
	// deliberately NOT counted here (they are a policy decision, recorded by
	// reason in OnUnauthorizedPeer), so enabling peer authz does not inflate
	// the protocol-error signal.
	OnStreamError func()
	// OnUnauthorizedPeer fires once per inbound request whose remote
	// libp2p peer ID is not present in the current membership view. The
	// reason label distinguishes "unrecognized" (members have published
	// peer IDs but none match remote) from "unevaluable" (no member has
	// published a peer ID yet, so authorization cannot be evaluated - only
	// reported in enforce mode). It fires in both observe-only and enforce
	// mode so operators can size the false-positive rate before flipping
	// enforcement on.
	OnUnauthorizedPeer func(reason string)
}

MetricsHooks lets callers wire Prometheus counters/gauges without importing the metrics package. All fields may be nil.

type NegativeCache

type NegativeCache interface {
	Lookup(d digest.Digest) (entry NegativeEntry, ok bool)
}

NegativeCache is the read interface coord needs from the circuit-breaker . Returning ok == false means the digest has no negative-cache entry on this node.

type NegativeEntry

type NegativeEntry struct {
	CooldownUntil time.Time
	Class         ifaces.FailureClass
}

NegativeEntry mirrors the design doc state for a single digest.

type Option

type Option func(*Server)

Option configures a Server.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger plumbs a structured logger.

func WithMaxConcurrentStreams

func WithMaxConcurrentStreams(n int) Option

WithMaxConcurrentStreams overrides DefaultMaxConcurrentStreams. Non-positive values are ignored.

func WithMaxDigestsPerPleasePull added in v0.1.17

func WithMaxDigestsPerPleasePull(n int) Option

WithMaxDigestsPerPleasePull overrides DefaultMaxDigestsPerPleasePull. Non-positive values are ignored.

func WithMetrics

func WithMetrics(h MetricsHooks) Option

WithMetrics attaches metric callbacks.

func WithNegativeCache

func WithNegativeCache(n NegativeCache) Option

WithNegativeCache attaches a the design doc read interface. nil is fine; the response just doesn't set recently_failed.

func WithPeerAuthz added in v0.1.17

func WithPeerAuthz(enforce bool) Option

WithPeerAuthz configures peer authorization for inbound coord requests.

Authorization compares the dialing peer's libp2p peer ID against the PeerID values published in the current membership view. When enforce is false (the default) an unrecognised peer is recorded via MetricsHooks.OnUnauthorizedPeer and still served, so operators can size the false-positive rate before flipping enforcement on. When enforce is true an unrecognised peer is rejected before its request is dispatched.

func WithPullerPump

func WithPullerPump(p PullerPump) Option

WithPullerPump wires the please_pull handler to the local origin puller. Required for please_pull to do useful work; without it the handler returns OUTCOME_UNSPECIFIED.

func WithStreamHandshakeTimeout

func WithStreamHandshakeTimeout(d time.Duration) Option

WithStreamHandshakeTimeout overrides DefaultStreamHandshakeTimeout. Intended for tests; non-positive values are ignored.

type PullerPump

type PullerPump func(ctx context.Context, registry, repository string, d digest.Digest, kind ifaces.OriginRefKind) PumpResult

PullerPump is invoked by the please_pull handler with a fully- classified pull request. It MUST return promptly: the call happens inside the stream handler and the server response can't be written until pump returns. Long-running work (the actual origin pull) MUST be moved to a goroutine inside the pump's implementation.

PullerPump returns a PumpResult describing whether the digest started, piggy-backed on existing work, short-circuited on a recent failure, or was declined before background work was started.

type PumpResult added in v0.1.17

type PumpResult struct {
	Status        PumpStatus
	StartedAt     time.Time
	CooldownUntil time.Time
	FailureClass  ifaces.FailureClass
}

PumpResult is the in-process result returned by PullerPump.

type PumpStatus added in v0.1.17

type PumpStatus int

PumpStatus is the in-process status returned by PullerPump.

const (
	PumpStarted PumpStatus = iota
	PumpAlreadyPulling
	PumpRecentlyFailed
	PumpDeclined
)

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handles inbound coord streams: pull_intent_query and please_pull RPCs. One stream per request, closed after reply.

func NewServer

func NewServer(store ifaces.LocalContentStore, members ifaces.Members, inflight *inflight.Map, opts ...Option) *Server

NewServer constructs a coord server. The store + members + inflight dependencies are required (everything else is optional via Option).

func (*Server) Bind

func (s *Server) Bind(h host.Host)

Bind registers the stream handler on h. After Bind returns, peers dialing ProtocolID will be served by s.

func (*Server) LocalPullIntent

func (s *Server) LocalPullIntent(ctx context.Context, d digest.Digest) ifaces.PullIntent

LocalPullIntent implements ifaces.LocalIntentProvider. It returns the same PullIntent the wire-level pull_intent_query handler would produce for d, but without the libp2p stream round-trip - the cold-start orchestrator uses it to include self as a first-class participant in the rule cascade.

func (*Server) StartLocalPull

func (s *Server) StartLocalPull(ctx context.Context, registry, repository string, kind ifaces.OriginRefKind, digests []digest.Digest) ([]ifaces.PleasePullOutcome, error)

StartLocalPull implements ifaces.LocalPullStarter. It runs the same pullerPump-driven path as servePleasePull but skips the libp2p stream layer entirely. Used by the cold-start orchestrator when rule 7 picks self as the designated puller - Coord.PleasePull(self) would round-trip through libp2p (or fail to dial) for no benefit.

Returns one PleasePullOutcome per input digest. A nil/zero pump (no WithPullerPump option) yields PleasePullUnspecified entries; that matches the server-side behaviour and is what the cold-start resolver expects when origin-pull is disabled.

func (*Server) Unbind added in v0.1.17

func (s *Server) Unbind(h host.Host)

Unbind removes the stream handler registered by Bind. Shutdown calls this before waiting on puller-pump goroutines so no new inbound coord stream can start another background origin pull while the pump gate is draining.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL