Documentation
¶
Overview ¶
Package cdsub subscribes to containerd image/content events and forwards digest presence changes to its caller.
Design choices:
- The containerd RPC client is heavy and platform-bound (Linux + a containerd socket on the node). To keep the announce-loop testable on darwin/CI without a real containerd, the package depends on a small ImageSource interface. The concrete containerd implementation lives in internal/cdsub/source_containerd.go behind a Linux build tag. Tests in this file exercise the loop against a fake source.
- In containerd mode, callers wire WithNotifier to route every Image/Create, Image/Update, and content-delete digest into the advertiser. The advertiser owns the DHT announced set, verifies local serveability, and performs best-effort Withdraw.
- When no notifier is supplied, cdsub preserves the older direct dht.Provide path for tests and legacy experiments. Delete events remain a no-op in that compatibility mode.
- Exponential-backoff reconnect with the cap (max 30 s). On every successful reconnect the loop calls ImageSource.List(ctx) and re-Provides every digest, closing the window where in-flight events were missed during the disconnect.
Index ¶
- Constants
- type ContainerdSource
- func (s *ContainerdSource) Close() error
- func (s *ContainerdSource) ContentStore() content.Store
- func (s *ContainerdSource) Inventory(ctx context.Context) ([]gdigest.Digest, error)
- func (s *ContainerdSource) LeasesService() leases.Manager
- func (s *ContainerdSource) List(ctx context.Context) ([]ImageEvent, error)
- func (s *ContainerdSource) Namespace() string
- func (s *ContainerdSource) SetMediaTypeRecorder(fn func(d gdigest.Digest, mediaType string))
- func (s *ContainerdSource) Subscribe(ctx context.Context) (<-chan ImageEvent, error)
- type ContainerdSourceOption
- type ImageEvent
- type ImageEventKind
- type ImageSource
- type NoOpSource
- type Option
- func WithBackoff(initial, maxBackoff time.Duration) Option
- func WithLogger(l *slog.Logger) Option
- func WithMetrics(onAnnounce, onAnnounceError func(), onReconcile func(int), onReconnect func()) Option
- func WithNotifier(fn func(ctx context.Context, d digest.Digest, present bool)) Option
- func WithProvideTimeout(d time.Duration) Option
- type Subscriber
Constants ¶
const DefaultContainerdNamespace = "k8s.io"
DefaultContainerdNamespace is the namespace kubelet places pod containers in when using containerd as its CRI runtime.
const DefaultContainerdSocket = "/run/containerd/containerd.sock"
DefaultContainerdSocket is the conventional path for the containerd gRPC API socket on a kubelet-managed node.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ContainerdSource ¶
type ContainerdSource struct {
// contains filtered or unexported fields
}
ContainerdSource is a production ImageSource backed by containerd's events + image APIs. Construct via NewContainerdSource.
func NewContainerdSource ¶
func NewContainerdSource(socket, namespace string, opts ...ContainerdSourceOption) (*ContainerdSource, error)
NewContainerdSource dials containerd at socket and pins all subsequent calls to namespace. Returns an error if the socket cannot be reached or the gRPC handshake fails.
func (*ContainerdSource) Close ¶
func (s *ContainerdSource) Close() error
Close releases the underlying containerd gRPC client.
func (*ContainerdSource) ContentStore ¶
func (s *ContainerdSource) ContentStore() content.Store
ContentStore exposes the underlying containerd content store so other subsystems (notably containerdstore.Store wired in storage_mode= containerd) can share the same gRPC connection rather than opening a second one. Returns nil if the source is not connected.
func (*ContainerdSource) Inventory ¶
Inventory enumerates every sha256 digest currently present in the configured containerd namespace by walking ContentStore.Walk. The returned digests are guaranteed parseable as sha256; unsupported algorithms are silently skipped. This is the periodic reconciliation source of truth used by the advertiser to reflect bare content the image-events stream may have missed (e.g. content GC, manual content.Delete, missed events during a disconnect window).
Performance: uses Info-equivalent metadata only; does NOT open readers. Callers should bound concurrency and add jitter when scheduling reconciliation passes - see inventory performance constraints.
func (*ContainerdSource) LeasesService ¶
func (s *ContainerdSource) LeasesService() leases.Manager
LeasesService exposes the underlying containerd lease manager so containerdstore.Store can attach leases on Gantry ingest without opening a second gRPC connection. Returns nil if the source is not connected.
func (*ContainerdSource) List ¶
func (s *ContainerdSource) List(ctx context.Context) ([]ImageEvent, error)
List walks every image in the configured namespace and resolves its blob set from the local content store. Implements ImageSource.
The returned events are tagged EventCreate so the Subscriber's reconciliation loop announces them as fresh state on every reconnect.
func (*ContainerdSource) Namespace ¶
func (s *ContainerdSource) Namespace() string
Namespace returns the containerd namespace this source is bound to. Callers wiring a containerdstore.Store should use this so the two subsystems agree on namespace and don't diverge silently.
func (*ContainerdSource) SetMediaTypeRecorder ¶
func (s *ContainerdSource) SetMediaTypeRecorder(fn func(d gdigest.Digest, mediaType string))
SetMediaTypeRecorder registers a callback fired for every descriptor visited by the next List/Subscribe walk. The intent is to populate containerdstore.Store.RememberMediaType so the transfer endpoint can serve manifest GETs with the correct Content-Type without parsing the body. Safe to call concurrently with active walks; subsequent calls overwrite (the source is single-owner per process). Pass nil to disable. Plan "Descriptor index".
func (*ContainerdSource) Subscribe ¶
func (s *ContainerdSource) Subscribe(ctx context.Context) (<-chan ImageEvent, error)
Subscribe streams ImageEvents for the lifetime of ctx. Implements ImageSource. Closes the returned channel when the underlying containerd subscription errors or ctx is cancelled - the Subscriber reconnect loop then handles backoff + reconciliation.
type ContainerdSourceOption ¶
type ContainerdSourceOption func(*ContainerdSource)
ContainerdSourceOption configures a ContainerdSource.
func WithContainerdConnectTimeout ¶
func WithContainerdConnectTimeout(d time.Duration) ContainerdSourceOption
WithContainerdConnectTimeout overrides the dial-timeout default.
func WithContainerdLogger ¶
func WithContainerdLogger(l *slog.Logger) ContainerdSourceOption
WithContainerdLogger plumbs a structured logger.
type ImageEvent ¶
type ImageEvent struct {
Kind ImageEventKind
Registry string // host portion of the image reference, e.g. "registry.example.com"
Image string // full image reference, kept for logging
Digests []digest.Digest
}
ImageEvent is the source-agnostic representation of one containerd event.
type ImageEventKind ¶
type ImageEventKind int
ImageEventKind discriminates the three event types cdsub cares about.
const ( // EventCreate is an Image/Create - a new image landed. EventCreate ImageEventKind = iota // EventUpdate is an Image/Update - an existing image's target changed. EventUpdate // EventDelete is an Image/Delete - image removed. EventDelete )
type ImageSource ¶
type ImageSource interface {
// List returns every image currently present in containerd, filtered
// to the configured upstream registries. Used on startup and after
// every successful reconnect for reconciliation.
List(ctx context.Context) ([]ImageEvent, error)
// Subscribe streams ImageEvents for the lifetime of the returned
// context. Closing the channel signals "disconnected - caller should
// reconnect after backoff". Subscribe MUST exit cleanly when ctx is
// cancelled.
Subscribe(ctx context.Context) (<-chan ImageEvent, error)
}
ImageSource is the abstraction over containerd's image-events API. Implementations stream events to the returned channel and close it when the underlying connection is lost; the loop then reconnects.
type NoOpSource ¶
type NoOpSource struct{}
NoOpSource is an ImageSource that produces no events. Useful when containerd is unavailable (CI, darwin development) or when the agent is configured to skip image-event subscription. The Subscriber's reconnect loop will quietly call List -> Subscribe -> wait-on-ctx -> exit, keeping the announce machinery exercised at zero cost.
func (NoOpSource) List ¶
func (NoOpSource) List(_ context.Context) ([]ImageEvent, error)
List always returns an empty event slice.
func (NoOpSource) Subscribe ¶
func (NoOpSource) Subscribe(ctx context.Context) (<-chan ImageEvent, error)
Subscribe returns a channel that is closed only when ctx is cancelled.
type Option ¶
type Option func(*Subscriber)
Option configures a Subscriber.
func WithBackoff ¶
WithBackoff overrides the exponential reconnect backoff bounds.
func WithMetrics ¶
func WithMetrics(onAnnounce, onAnnounceError func(), onReconcile func(int), onReconnect func()) Option
WithMetrics registers metric callbacks. Any callback may be nil.
func WithNotifier ¶
WithNotifier routes digest presence changes to a central owner such as internal/advertise.Advertiser. When set, cdsub does not call DHT.Provide directly. present=true corresponds to create/update/list observations; present=false corresponds to delete observations.
func WithProvideTimeout ¶
WithProvideTimeout caps per-Provide RPC budget.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber walks the announce loop: List -> Subscribe -> reconnect on error.
func New ¶
func New(src ImageSource, dht ifaces.DHT, opts ...Option) *Subscriber
New builds a Subscriber. Run drives the loop until ctx is cancelled. dht may be nil when WithNotifier is supplied.
func (*Subscriber) Close ¶
func (s *Subscriber) Close()
Close releases any subscriber-owned resources. Safe to call multiple times.
func (*Subscriber) Run ¶
func (s *Subscriber) Run(ctx context.Context) error
Run blocks until ctx is cancelled. On each iteration:
1. Run reconciliation: List -> notify/provide every digest. 2. Subscribe and process events as they arrive. 3. On Subscribe error, channel close, or any non-context error, sleep with jittered exponential backoff and retry.