cdsub

package
v0.1.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package cdsub subscribes to containerd image/content events and forwards digest presence changes to its caller.

Design choices:

- The containerd RPC client is heavy and platform-bound (Linux + a containerd socket on the node). To keep the announce-loop testable on darwin/CI without a real containerd, the package depends on a small ImageSource interface. The concrete containerd implementation lives in internal/cdsub/source_containerd.go behind a Linux build tag. Tests in this file exercise the loop against a fake source.

- In containerd mode, callers wire WithNotifier to route every Image/Create, Image/Update, and content-delete digest into the advertiser. The advertiser owns the DHT announced set, verifies local serveability, and performs best-effort Withdraw.

- When no notifier is supplied, cdsub preserves the older direct dht.Provide path for tests and legacy experiments. Delete events remain a no-op in that compatibility mode.

- Exponential-backoff reconnect with the cap (max 30 s). On every successful reconnect the loop calls ImageSource.List(ctx) and re-Provides every digest, closing the window where in-flight events were missed during the disconnect.

Index

Constants

View Source
const DefaultContainerdNamespace = "k8s.io"

DefaultContainerdNamespace is the namespace kubelet places pod containers in when using containerd as its CRI runtime.

View Source
const DefaultContainerdSocket = "/run/containerd/containerd.sock"

DefaultContainerdSocket is the conventional path for the containerd gRPC API socket on a kubelet-managed node.

Variables

This section is empty.

Functions

This section is empty.

Types

type ContainerdSource

type ContainerdSource struct {
	// contains filtered or unexported fields
}

ContainerdSource is a production ImageSource backed by containerd's events + image APIs. Construct via NewContainerdSource.

func NewContainerdSource

func NewContainerdSource(socket, namespace string, opts ...ContainerdSourceOption) (*ContainerdSource, error)

NewContainerdSource dials containerd at socket and pins all subsequent calls to namespace. Returns an error if the socket cannot be reached or the gRPC handshake fails.

func (*ContainerdSource) Close

func (s *ContainerdSource) Close() error

Close releases the underlying containerd gRPC client.

func (*ContainerdSource) ContentStore

func (s *ContainerdSource) ContentStore() content.Store

ContentStore exposes the underlying containerd content store so other subsystems (notably containerdstore.Store wired in storage_mode= containerd) can share the same gRPC connection rather than opening a second one. Returns nil if the source is not connected.

func (*ContainerdSource) Inventory

func (s *ContainerdSource) Inventory(ctx context.Context) ([]gdigest.Digest, error)

Inventory enumerates every sha256 digest currently present in the configured containerd namespace by walking ContentStore.Walk. The returned digests are guaranteed parseable as sha256; unsupported algorithms are silently skipped. This is the periodic reconciliation source of truth used by the advertiser to reflect bare content the image-events stream may have missed (e.g. content GC, manual content.Delete, missed events during a disconnect window).

Performance: uses Info-equivalent metadata only; does NOT open readers. Callers should bound concurrency and add jitter when scheduling reconciliation passes - see inventory performance constraints.

func (*ContainerdSource) LeasesService

func (s *ContainerdSource) LeasesService() leases.Manager

LeasesService exposes the underlying containerd lease manager so containerdstore.Store can attach leases on Gantry ingest without opening a second gRPC connection. Returns nil if the source is not connected.

func (*ContainerdSource) List

func (s *ContainerdSource) List(ctx context.Context) ([]ImageEvent, error)

List walks every image in the configured namespace and resolves its blob set from the local content store. Implements ImageSource.

The returned events are tagged EventCreate so the Subscriber's reconciliation loop announces them as fresh state on every reconnect.

func (*ContainerdSource) Namespace

func (s *ContainerdSource) Namespace() string

Namespace returns the containerd namespace this source is bound to. Callers wiring a containerdstore.Store should use this so the two subsystems agree on namespace and don't diverge silently.

func (*ContainerdSource) SetMediaTypeRecorder

func (s *ContainerdSource) SetMediaTypeRecorder(fn func(d gdigest.Digest, mediaType string))

SetMediaTypeRecorder registers a callback fired for every descriptor visited by the next List/Subscribe walk. The intent is to populate containerdstore.Store.RememberMediaType so the transfer endpoint can serve manifest GETs with the correct Content-Type without parsing the body. Safe to call concurrently with active walks; subsequent calls overwrite (the source is single-owner per process). Pass nil to disable. Plan "Descriptor index".

func (*ContainerdSource) Subscribe

func (s *ContainerdSource) Subscribe(ctx context.Context) (<-chan ImageEvent, error)

Subscribe streams ImageEvents for the lifetime of ctx. Implements ImageSource. Closes the returned channel when the underlying containerd subscription errors or ctx is cancelled - the Subscriber reconnect loop then handles backoff + reconciliation.

type ContainerdSourceOption

type ContainerdSourceOption func(*ContainerdSource)

ContainerdSourceOption configures a ContainerdSource.

func WithContainerdConnectTimeout

func WithContainerdConnectTimeout(d time.Duration) ContainerdSourceOption

WithContainerdConnectTimeout overrides the dial-timeout default.

func WithContainerdLogger

func WithContainerdLogger(l *slog.Logger) ContainerdSourceOption

WithContainerdLogger plumbs a structured logger.

type ImageEvent

type ImageEvent struct {
	Kind     ImageEventKind
	Registry string // host portion of the image reference, e.g. "registry.example.com"
	Image    string // full image reference, kept for logging
	Digests  []digest.Digest
}

ImageEvent is the source-agnostic representation of one containerd event.

type ImageEventKind

type ImageEventKind int

ImageEventKind discriminates the three event types cdsub cares about.

const (
	// EventCreate is an Image/Create - a new image landed.
	EventCreate ImageEventKind = iota
	// EventUpdate is an Image/Update - an existing image's target changed.
	EventUpdate
	// EventDelete is an Image/Delete - image removed.
	EventDelete
)

type ImageSource

type ImageSource interface {
	// List returns every image currently present in containerd, filtered
	// to the configured upstream registries. Used on startup and after
	// every successful reconnect for reconciliation.
	List(ctx context.Context) ([]ImageEvent, error)

	// Subscribe streams ImageEvents for the lifetime of the returned
	// context. Closing the channel signals "disconnected - caller should
	// reconnect after backoff". Subscribe MUST exit cleanly when ctx is
	// cancelled.
	Subscribe(ctx context.Context) (<-chan ImageEvent, error)
}

ImageSource is the abstraction over containerd's image-events API. Implementations stream events to the returned channel and close it when the underlying connection is lost; the loop then reconnects.

type NoOpSource

type NoOpSource struct{}

NoOpSource is an ImageSource that produces no events. Useful when containerd is unavailable (CI, darwin development) or when the agent is configured to skip image-event subscription. The Subscriber's reconnect loop will quietly call List -> Subscribe -> wait-on-ctx -> exit, keeping the announce machinery exercised at zero cost.

func (NoOpSource) List

func (NoOpSource) List(_ context.Context) ([]ImageEvent, error)

List always returns an empty event slice.

func (NoOpSource) Subscribe

func (NoOpSource) Subscribe(ctx context.Context) (<-chan ImageEvent, error)

Subscribe returns a channel that is closed only when ctx is cancelled.

type Option

type Option func(*Subscriber)

Option configures a Subscriber.

func WithBackoff

func WithBackoff(initial, maxBackoff time.Duration) Option

WithBackoff overrides the exponential reconnect backoff bounds.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger plumbs a structured logger.

func WithMetrics

func WithMetrics(onAnnounce, onAnnounceError func(), onReconcile func(int), onReconnect func()) Option

WithMetrics registers metric callbacks. Any callback may be nil.

func WithNotifier

func WithNotifier(fn func(ctx context.Context, d digest.Digest, present bool)) Option

WithNotifier routes digest presence changes to a central owner such as internal/advertise.Advertiser. When set, cdsub does not call DHT.Provide directly. present=true corresponds to create/update/list observations; present=false corresponds to delete observations.

func WithProvideTimeout

func WithProvideTimeout(d time.Duration) Option

WithProvideTimeout caps per-Provide RPC budget.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber walks the announce loop: List -> Subscribe -> reconnect on error.

func New

func New(src ImageSource, dht ifaces.DHT, opts ...Option) *Subscriber

New builds a Subscriber. Run drives the loop until ctx is cancelled. dht may be nil when WithNotifier is supplied.

func (*Subscriber) Close

func (s *Subscriber) Close()

Close releases any subscriber-owned resources. Safe to call multiple times.

func (*Subscriber) Run

func (s *Subscriber) Run(ctx context.Context) error

Run blocks until ctx is cancelled. On each iteration:

1. Run reconciliation: List -> notify/provide every digest. 2. Subscribe and process events as they arrive. 3. On Subscribe error, channel close, or any non-context error, sleep with jittered exponential backoff and retry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL