controlplane

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package controlplane implements the clawker control plane — a privileged long-lived gRPC service that owns authoritative state for managed containers. Serves the AdminService surface (CLI ↔ CP) and supplies the auth + lifecycle plumbing shared with the agent listener (clawkerd ↔ CP, registered separately by cmd/clawker-cp).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AgentMethodScopes

func AgentMethodScopes() map[string]string

AgentMethodScopes maps every clawker.agent.v1.AgentService RPC to the OAuth2 scope it requires. Agents call these via the CP's clawker-net listener; AuthInterceptor wired with this map fails closed on unmapped methods (returns codes.Unauthenticated), so a new RPC added to the proto without a scope entry is rejected at runtime.

Register is the one-time-per-container CP-driven handshake — the scope name reflects "agent attests its own identity to CP" semantics.

func NewAdminServer

func NewAdminServer(fw *fwhandler.Handler, agents agent.Registry, log *logger.Logger) adminv1.AdminServiceServer

NewAdminServer returns the composite AdminServiceServer wired from the supplied domain handlers. agents is required — CP is the sole sqlite writer in this design, so any wiring path that reaches here without a registry is a programming error.

  • log defaults to logger.Nop() when nil. Production wiring passes the CP's structured logger.

func RegisterAgentClient

func RegisterAgentClient(ctx context.Context, hydraAdminURL string, jwkData []byte, tlsCfg *tls.Config) error

RegisterAgentClient registers the clawker-agent OAuth2 client with Hydra via the admin API. Both clawker-cli and clawker-agent use the same public JWK (the CLI's signing key) — distinct client IDs keep the scope surface clean even though the signing key is shared. Idempotent: returns nil on 409 Conflict.

func RegisterCLIClient

func RegisterCLIClient(ctx context.Context, hydraAdminURL string, jwkData []byte, tlsCfg *tls.Config) error

RegisterCLIClient registers the clawker-cli OAuth2 client with Hydra via the admin API. The jwkData is the raw JSON of the CLI's public JWKS (bind-mounted from the host). Idempotent: returns nil if the client already exists (409 Conflict).

func WriteOryConfigs

func WriteOryConfigs(cp config.ControlPlaneSettings, hydraSecret string) error

WriteOryConfigs writes config files for Hydra, Kratos, and Oathkeeper to the config directory. Ports are read from ControlPlaneSettings. Called by the CP binary at startup before launching subprocesses. Idempotent — overwrites on every start so configs stay in sync with the binary version.

Types

type AgentWatcher

type AgentWatcher struct {
	// contains filtered or unexported fields
}

AgentWatcher polls the Docker daemon for clawker-managed agent containers and invokes a drain-to-zero callback once the agent count has been zero across MissedThreshold consecutive polls past the grace period. The CP uses this to self-terminate when no agents remain — there is no reason to keep the control plane (Envoy + CoreDNS + CP daemon) alive after the last agent drains.

INV-B2-007: strict drain-to-zero ordering is enforced by the caller via the onDrainToZero callback. The watcher guarantees only that the callback fires at most once, before Run returns.

func NewAgentWatcher

func NewAgentWatcher(
	log *logger.Logger,
	listAgents func(context.Context) (int, error),
	onDrainToZero func(context.Context) error,
	opts AgentWatcherOptions,
) *AgentWatcher

NewAgentWatcher constructs an AgentWatcher. Nil callbacks or negative option values panic — misconfig must fail loudly.

func (*AgentWatcher) Run

func (w *AgentWatcher) Run(ctx context.Context) error

Run blocks until ctx is cancelled, the drain-to-zero condition fires, or consecutive list errors exceed ListErrCeiling. On drain, Run invokes onDrainToZero synchronously and returns its error. On ctx cancel, returns ctx.Err(). On error ceiling, returns a wrapped error surfacing the last list failure.

The grace period is measured on wall-clock time from Run entry — any zero-count polls during the grace window count toward the miss streak, but the streak cannot reach the threshold before grace expires. This prevents a race where the CP starts up before any agents have been enrolled and immediately drains.

Run must be called at most once per watcher; a second call returns an error rather than spinning up a second poll loop.

type AgentWatcherOptions

type AgentWatcherOptions struct {
	PollInterval    time.Duration
	MissedThreshold int
	GracePeriod     time.Duration
	// ListErrCeiling bounds how many consecutive listAgents errors the
	// watcher tolerates before returning an error from Run. Prevents
	// "Docker is wedged" from leaving the CP blind and permanent.
	ListErrCeiling int
}

AgentWatcherOptions overrides default watcher tuning. Zero values select defaults (PollInterval 30s, MissedThreshold 2, GracePeriod 60s, ListErrCeiling 20). Negative values panic — silently snapping to a default would hide caller misconfig.

type AuthInterceptor

type AuthInterceptor struct {
	// contains filtered or unexported fields
}

AuthInterceptor validates bearer tokens via an Introspector and enforces per-method scope requirements.

Flow: extract bearer token from gRPC metadata → introspect → check active + scope → optional client_id pin → allow or deny. Fail-closed on any error.

func NewAuthInterceptor

func NewAuthInterceptor(introspector Introspector, methodScopes map[string]string, log *logger.Logger) *AuthInterceptor

NewAuthInterceptor creates an interceptor that validates tokens via the given Introspector. methodScopes maps gRPC method names (e.g. "/clawker.admin.v1.AdminService/Install") to required OAuth2 scopes (e.g. "admin").

func (*AuthInterceptor) GRPCServerOptions

func (a *AuthInterceptor) GRPCServerOptions() []grpc.ServerOption

GRPCServerOptions returns gRPC server options wired with both interceptors.

func (*AuthInterceptor) RequireClientID

func (a *AuthInterceptor) RequireClientID(clientID string) *AuthInterceptor

RequireClientID pins the interceptor to a specific OAuth2 client_id. Defense-in-depth on top of scope: when set, any token whose introspection result reports a different client_id is rejected with codes.PermissionDenied even if the scope check passed. Used on the agent listener (clientID = consts.ClientIDAgent) so a future Hydra misconfiguration that grants agent:self:register to a non-agent client doesn't silently let the wrong client through. The admin listener leaves this empty — admin scope is uniformly required and the CLI client_id is the only one Hydra is currently configured to grant it to. Returns the receiver for fluent chaining at construction.

func (*AuthInterceptor) StreamInterceptor

func (a *AuthInterceptor) StreamInterceptor() grpc.StreamServerInterceptor

StreamInterceptor returns a gRPC stream server interceptor.

func (*AuthInterceptor) UnaryInterceptor

func (a *AuthInterceptor) UnaryInterceptor() grpc.UnaryServerInterceptor

UnaryInterceptor returns a gRPC unary server interceptor.

type CPStartupOrchestrator

type CPStartupOrchestrator struct {
	// contains filtered or unexported fields
}

CPStartupOrchestrator manages the control plane's subprocess startup sequence and health reporting. The /healthz endpoint actively probes all internal service ports — it only returns 200 when every service is responding.

func NewCPStartupOrchestrator

func NewCPStartupOrchestrator() *CPStartupOrchestrator

NewCPStartupOrchestrator creates a new startup orchestrator. The probes are configured later via SetServiceProbes once TLS config and port values are available.

func (*CPStartupOrchestrator) HealthzHandler

func (o *CPStartupOrchestrator) HealthzHandler() http.Handler

HealthzHandler returns an http.Handler for the /healthz endpoint. Returns 200 only when SetReady was called AND all service probes pass. If any service is down, returns 503 with a JSON body.

func (*CPStartupOrchestrator) IsReady

func (o *CPStartupOrchestrator) IsReady() bool

IsReady returns whether the CP has completed all startup steps.

func (*CPStartupOrchestrator) SetReady

func (o *CPStartupOrchestrator) SetReady()

SetReady marks the CP as ready. Called after all startup steps (subprocesses, eBPF load, gRPC server) have succeeded.

func (*CPStartupOrchestrator) SetServiceProbes

func (o *CPStartupOrchestrator) SetServiceProbes(cp config.ControlPlaneSettings, tlsCfg *tls.Config)

SetServiceProbes configures the aggregate health probes from the ControlPlaneSettings. Called during CP startup after TLS config is built. All Ory services use HTTPS; the gRPC admin port is probed via raw TCP (gRPC health check would require a client).

type HealthCheck

type HealthCheck struct {
	URL      string        // e.g. "https://127.0.0.1:4444/health/alive"
	Interval time.Duration // polling interval
	Timeout  time.Duration // overall timeout before giving up
	TLS      *tls.Config   // optional TLS config for HTTPS health endpoints
}

HealthCheck defines how to check if a subprocess is healthy.

type HydraIntrospector

type HydraIntrospector struct {
	// contains filtered or unexported fields
}

HydraIntrospector implements Introspector by calling Hydra's admin introspection endpoint (POST /admin/oauth2/introspect, RFC 7662).

func NewHydraIntrospector

func NewHydraIntrospector(introspectURL string, tlsCfg *tls.Config) *HydraIntrospector

NewHydraIntrospector creates an introspector that validates tokens against the given Hydra admin introspection URL. The optional TLS config is used when Hydra serves HTTPS (self-signed cert).

func (*HydraIntrospector) Introspect

func (h *HydraIntrospector) Introspect(ctx context.Context, token, requiredScope string) (*IntrospectionResult, error)

type IntrospectionResult

type IntrospectionResult struct {
	Active   bool   `json:"active"`
	Scope    string `json:"scope"`
	ClientID string `json:"client_id"`
	Sub      string `json:"sub"`
	Exp      int64  `json:"exp"`
}

IntrospectionResult represents the relevant fields from an OAuth2 token introspection response (RFC 7662).

type Introspector

type Introspector interface {
	Introspect(ctx context.Context, token, requiredScope string) (*IntrospectionResult, error)
}

Introspector validates an OAuth2 bearer token and returns its introspection result. Implementations must be safe for concurrent use.

type ManagedSubprocess

type ManagedSubprocess struct {
	Name string
	Cmd  *exec.Cmd
	// contains filtered or unexported fields
}

ManagedSubprocess tracks a running subprocess.

type SubprocessManager

type SubprocessManager struct {
	// contains filtered or unexported fields
}

SubprocessManager starts, monitors, and shuts down a set of subprocesses. Fail-fast: if any subprocess exits unexpectedly, the crash is reported via CrashChan. Once Shutdown begins, subsequent subprocess exits are expected and are NOT reported as crashes. Shutdown is reverse order of start.

func NewSubprocessManager

func NewSubprocessManager(log *logger.Logger) *SubprocessManager

NewSubprocessManager creates a new subprocess manager.

func (*SubprocessManager) BeginShutdown

func (m *SubprocessManager) BeginShutdown()

BeginShutdown marks the manager as shutting down, suppressing crash reporting for any subprocess that exits from this point forward. Call this as early as possible in the shutdown sequence — before stopping servers or other components that may cause cascading subprocess exits. Shutdown() also sets this flag, but calling BeginShutdown early closes the window between signal receipt and Shutdown() invocation.

func (*SubprocessManager) CrashChan

func (m *SubprocessManager) CrashChan() <-chan error

CrashChan returns a channel that receives an error when any managed subprocess exits unexpectedly. The CP main loop selects on this alongside signal handling.

func (*SubprocessManager) ForwardSignal

func (m *SubprocessManager) ForwardSignal(sig os.Signal)

ForwardSignal sends the given signal to all managed subprocesses.

func (*SubprocessManager) Shutdown

func (m *SubprocessManager) Shutdown(timeout time.Duration)

Shutdown sends SIGTERM to all subprocesses in reverse start order, waits up to timeout for each to exit, then sends SIGKILL if needed. Sets the shuttingDown flag if BeginShutdown was not already called.

func (*SubprocessManager) Start

func (m *SubprocessManager) Start(name string, cmd *exec.Cmd) error

Start launches a subprocess and begins monitoring its PID. Stdout/stderr are forwarded to the CP's stderr (visible in docker logs).

func (*SubprocessManager) WaitHealthy

func (m *SubprocessManager) WaitHealthy(ctx context.Context, name string, check HealthCheck) error

WaitHealthy polls a health endpoint until it returns 200 or the timeout expires. Returns an error if the subprocess crashes before becoming healthy.

Directories

Path Synopsis
Package adminclient constructs the CLI's gRPC client to the control plane's AdminService.
Package adminclient constructs the CLI's gRPC client to the control plane's AdminService.
Dialer-side helpers in package agent: CP-side outbound mTLS dial logic for the CP→clawkerd Session channel.
Dialer-side helpers in package agent: CP-side outbound mTLS dial logic for the CP→clawkerd Session channel.
Package dockerevents subscribes to moby's container/network event stream and republishes it on the Overseer bus as a single typed envelope, DockerEvent, wrapping moby's events.Message verbatim.
Package dockerevents subscribes to moby's container/network event stream and republishes it on the Overseer bus as a single typed envelope, DockerEvent, wrapping moby's events.Message verbatim.
ebpf
Package ebpf provides eBPF-based traffic routing for clawker containers.
Package ebpf provides eBPF-based traffic routing for clawker containers.
ebpf/cmd command
ebpf-manager is the entrypoint binary for the clawker eBPF manager container.
ebpf-manager is the entrypoint binary for the clawker eBPF manager container.
Package overseer is the typed event bus + in-memory worldview state for the clawker control plane.
Package overseer is the typed event bus + in-memory worldview state for the clawker control plane.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL