Documentation
¶
Overview ¶
Package controlplane implements the clawker control plane — a privileged long-lived gRPC service that owns authoritative state for managed containers. Serves the AdminService surface (CLI ↔ CP) and supplies the auth + lifecycle plumbing shared with the agent listener (clawkerd ↔ CP, registered separately by cmd/clawker-cp).
Index ¶
- func AgentMethodScopes() map[string]string
- func NewAdminServer(fw *fwhandler.Handler, agents agent.Registry, log *logger.Logger) adminv1.AdminServiceServer
- func RegisterAgentClient(ctx context.Context, hydraAdminURL string, jwkData []byte, tlsCfg *tls.Config) error
- func RegisterCLIClient(ctx context.Context, hydraAdminURL string, jwkData []byte, tlsCfg *tls.Config) error
- func WriteOryConfigs(cp config.ControlPlaneSettings, hydraSecret string) error
- type AgentWatcher
- type AgentWatcherOptions
- type AuthInterceptor
- type CPStartupOrchestrator
- type HealthCheck
- type HydraIntrospector
- type IntrospectionResult
- type Introspector
- type ManagedSubprocess
- type SubprocessManager
- func (m *SubprocessManager) BeginShutdown()
- func (m *SubprocessManager) CrashChan() <-chan error
- func (m *SubprocessManager) ForwardSignal(sig os.Signal)
- func (m *SubprocessManager) Shutdown(timeout time.Duration)
- func (m *SubprocessManager) Start(name string, cmd *exec.Cmd) error
- func (m *SubprocessManager) WaitHealthy(ctx context.Context, name string, check HealthCheck) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AgentMethodScopes ¶
AgentMethodScopes maps every clawker.agent.v1.AgentService RPC to the OAuth2 scope it requires. Agents call these via the CP's clawker-net listener; AuthInterceptor wired with this map fails closed on unmapped methods (returns codes.Unauthenticated), so a new RPC added to the proto without a scope entry is rejected at runtime.
Register is the one-time-per-container CP-driven handshake — the scope name reflects "agent attests its own identity to CP" semantics.
func NewAdminServer ¶
func NewAdminServer(fw *fwhandler.Handler, agents agent.Registry, log *logger.Logger) adminv1.AdminServiceServer
NewAdminServer returns the composite AdminServiceServer wired from the supplied domain handlers. agents is required — CP is the sole sqlite writer in this design, so any wiring path that reaches here without a registry is a programming error.
- log defaults to logger.Nop() when nil. Production wiring passes the CP's structured logger.
func RegisterAgentClient ¶
func RegisterAgentClient(ctx context.Context, hydraAdminURL string, jwkData []byte, tlsCfg *tls.Config) error
RegisterAgentClient registers the clawker-agent OAuth2 client with Hydra via the admin API. Both clawker-cli and clawker-agent use the same public JWK (the CLI's signing key) — distinct client IDs keep the scope surface clean even though the signing key is shared. Idempotent: returns nil on 409 Conflict.
func RegisterCLIClient ¶
func RegisterCLIClient(ctx context.Context, hydraAdminURL string, jwkData []byte, tlsCfg *tls.Config) error
RegisterCLIClient registers the clawker-cli OAuth2 client with Hydra via the admin API. The jwkData is the raw JSON of the CLI's public JWKS (bind-mounted from the host). Idempotent: returns nil if the client already exists (409 Conflict).
func WriteOryConfigs ¶
func WriteOryConfigs(cp config.ControlPlaneSettings, hydraSecret string) error
WriteOryConfigs writes config files for Hydra, Kratos, and Oathkeeper to the config directory. Ports are read from ControlPlaneSettings. Called by the CP binary at startup before launching subprocesses. Idempotent — overwrites on every start so configs stay in sync with the binary version.
Types ¶
type AgentWatcher ¶
type AgentWatcher struct {
// contains filtered or unexported fields
}
AgentWatcher polls the Docker daemon for clawker-managed agent containers and invokes a drain-to-zero callback once the agent count has been zero across MissedThreshold consecutive polls past the grace period. The CP uses this to self-terminate when no agents remain — there is no reason to keep the control plane (Envoy + CoreDNS + CP daemon) alive after the last agent drains.
INV-B2-007: strict drain-to-zero ordering is enforced by the caller via the onDrainToZero callback. The watcher guarantees only that the callback fires at most once, before Run returns.
func NewAgentWatcher ¶
func NewAgentWatcher( log *logger.Logger, listAgents func(context.Context) (int, error), onDrainToZero func(context.Context) error, opts AgentWatcherOptions, ) *AgentWatcher
NewAgentWatcher constructs an AgentWatcher. Nil callbacks or negative option values panic — misconfig must fail loudly.
func (*AgentWatcher) Run ¶
func (w *AgentWatcher) Run(ctx context.Context) error
Run blocks until ctx is cancelled, the drain-to-zero condition fires, or consecutive list errors exceed ListErrCeiling. On drain, Run invokes onDrainToZero synchronously and returns its error. On ctx cancel, returns ctx.Err(). On error ceiling, returns a wrapped error surfacing the last list failure.
The grace period is measured on wall-clock time from Run entry — any zero-count polls during the grace window count toward the miss streak, but the streak cannot reach the threshold before grace expires. This prevents a race where the CP starts up before any agents have been enrolled and immediately drains.
Run must be called at most once per watcher; a second call returns an error rather than spinning up a second poll loop.
type AgentWatcherOptions ¶
type AgentWatcherOptions struct {
PollInterval time.Duration
MissedThreshold int
GracePeriod time.Duration
// ListErrCeiling bounds how many consecutive listAgents errors the
// watcher tolerates before returning an error from Run. Prevents
// "Docker is wedged" from leaving the CP blind and permanent.
ListErrCeiling int
}
AgentWatcherOptions overrides default watcher tuning. Zero values select defaults (PollInterval 30s, MissedThreshold 2, GracePeriod 60s, ListErrCeiling 20). Negative values panic — silently snapping to a default would hide caller misconfig.
type AuthInterceptor ¶
type AuthInterceptor struct {
// contains filtered or unexported fields
}
AuthInterceptor validates bearer tokens via an Introspector and enforces per-method scope requirements.
Flow: extract bearer token from gRPC metadata → introspect → check active + scope → optional client_id pin → allow or deny. Fail-closed on any error.
func NewAuthInterceptor ¶
func NewAuthInterceptor(introspector Introspector, methodScopes map[string]string, log *logger.Logger) *AuthInterceptor
NewAuthInterceptor creates an interceptor that validates tokens via the given Introspector. methodScopes maps gRPC method names (e.g. "/clawker.admin.v1.AdminService/Install") to required OAuth2 scopes (e.g. "admin").
func (*AuthInterceptor) GRPCServerOptions ¶
func (a *AuthInterceptor) GRPCServerOptions() []grpc.ServerOption
GRPCServerOptions returns gRPC server options wired with both interceptors.
func (*AuthInterceptor) RequireClientID ¶
func (a *AuthInterceptor) RequireClientID(clientID string) *AuthInterceptor
RequireClientID pins the interceptor to a specific OAuth2 client_id. Defense-in-depth on top of scope: when set, any token whose introspection result reports a different client_id is rejected with codes.PermissionDenied even if the scope check passed. Used on the agent listener (clientID = consts.ClientIDAgent) so a future Hydra misconfiguration that grants agent:self:register to a non-agent client doesn't silently let the wrong client through. The admin listener leaves this empty — admin scope is uniformly required and the CLI client_id is the only one Hydra is currently configured to grant it to. Returns the receiver for fluent chaining at construction.
func (*AuthInterceptor) StreamInterceptor ¶
func (a *AuthInterceptor) StreamInterceptor() grpc.StreamServerInterceptor
StreamInterceptor returns a gRPC stream server interceptor.
func (*AuthInterceptor) UnaryInterceptor ¶
func (a *AuthInterceptor) UnaryInterceptor() grpc.UnaryServerInterceptor
UnaryInterceptor returns a gRPC unary server interceptor.
type CPStartupOrchestrator ¶
type CPStartupOrchestrator struct {
// contains filtered or unexported fields
}
CPStartupOrchestrator manages the control plane's subprocess startup sequence and health reporting. The /healthz endpoint actively probes all internal service ports — it only returns 200 when every service is responding.
func NewCPStartupOrchestrator ¶
func NewCPStartupOrchestrator() *CPStartupOrchestrator
NewCPStartupOrchestrator creates a new startup orchestrator. The probes are configured later via SetServiceProbes once TLS config and port values are available.
func (*CPStartupOrchestrator) HealthzHandler ¶
func (o *CPStartupOrchestrator) HealthzHandler() http.Handler
HealthzHandler returns an http.Handler for the /healthz endpoint. Returns 200 only when SetReady was called AND all service probes pass. If any service is down, returns 503 with a JSON body.
func (*CPStartupOrchestrator) IsReady ¶
func (o *CPStartupOrchestrator) IsReady() bool
IsReady returns whether the CP has completed all startup steps.
func (*CPStartupOrchestrator) SetReady ¶
func (o *CPStartupOrchestrator) SetReady()
SetReady marks the CP as ready. Called after all startup steps (subprocesses, eBPF load, gRPC server) have succeeded.
func (*CPStartupOrchestrator) SetServiceProbes ¶
func (o *CPStartupOrchestrator) SetServiceProbes(cp config.ControlPlaneSettings, tlsCfg *tls.Config)
SetServiceProbes configures the aggregate health probes from the ControlPlaneSettings. Called during CP startup after TLS config is built. All Ory services use HTTPS; the gRPC admin port is probed via raw TCP (gRPC health check would require a client).
type HealthCheck ¶
type HealthCheck struct {
URL string // e.g. "https://127.0.0.1:4444/health/alive"
Interval time.Duration // polling interval
Timeout time.Duration // overall timeout before giving up
TLS *tls.Config // optional TLS config for HTTPS health endpoints
}
HealthCheck defines how to check if a subprocess is healthy.
type HydraIntrospector ¶
type HydraIntrospector struct {
// contains filtered or unexported fields
}
HydraIntrospector implements Introspector by calling Hydra's admin introspection endpoint (POST /admin/oauth2/introspect, RFC 7662).
func NewHydraIntrospector ¶
func NewHydraIntrospector(introspectURL string, tlsCfg *tls.Config) *HydraIntrospector
NewHydraIntrospector creates an introspector that validates tokens against the given Hydra admin introspection URL. The optional TLS config is used when Hydra serves HTTPS (self-signed cert).
func (*HydraIntrospector) Introspect ¶
func (h *HydraIntrospector) Introspect(ctx context.Context, token, requiredScope string) (*IntrospectionResult, error)
type IntrospectionResult ¶
type IntrospectionResult struct {
Active bool `json:"active"`
Scope string `json:"scope"`
ClientID string `json:"client_id"`
Sub string `json:"sub"`
Exp int64 `json:"exp"`
}
IntrospectionResult represents the relevant fields from an OAuth2 token introspection response (RFC 7662).
type Introspector ¶
type Introspector interface {
Introspect(ctx context.Context, token, requiredScope string) (*IntrospectionResult, error)
}
Introspector validates an OAuth2 bearer token and returns its introspection result. Implementations must be safe for concurrent use.
type ManagedSubprocess ¶
type ManagedSubprocess struct {
Name string
Cmd *exec.Cmd
// contains filtered or unexported fields
}
ManagedSubprocess tracks a running subprocess.
type SubprocessManager ¶
type SubprocessManager struct {
// contains filtered or unexported fields
}
SubprocessManager starts, monitors, and shuts down a set of subprocesses. Fail-fast: if any subprocess exits unexpectedly, the crash is reported via CrashChan. Once Shutdown begins, subsequent subprocess exits are expected and are NOT reported as crashes. Shutdown is reverse order of start.
func NewSubprocessManager ¶
func NewSubprocessManager(log *logger.Logger) *SubprocessManager
NewSubprocessManager creates a new subprocess manager.
func (*SubprocessManager) BeginShutdown ¶
func (m *SubprocessManager) BeginShutdown()
BeginShutdown marks the manager as shutting down, suppressing crash reporting for any subprocess that exits from this point forward. Call this as early as possible in the shutdown sequence — before stopping servers or other components that may cause cascading subprocess exits. Shutdown() also sets this flag, but calling BeginShutdown early closes the window between signal receipt and Shutdown() invocation.
func (*SubprocessManager) CrashChan ¶
func (m *SubprocessManager) CrashChan() <-chan error
CrashChan returns a channel that receives an error when any managed subprocess exits unexpectedly. The CP main loop selects on this alongside signal handling.
func (*SubprocessManager) ForwardSignal ¶
func (m *SubprocessManager) ForwardSignal(sig os.Signal)
ForwardSignal sends the given signal to all managed subprocesses.
func (*SubprocessManager) Shutdown ¶
func (m *SubprocessManager) Shutdown(timeout time.Duration)
Shutdown sends SIGTERM to all subprocesses in reverse start order, waits up to timeout for each to exit, then sends SIGKILL if needed. Sets the shuttingDown flag if BeginShutdown was not already called.
func (*SubprocessManager) Start ¶
func (m *SubprocessManager) Start(name string, cmd *exec.Cmd) error
Start launches a subprocess and begins monitoring its PID. Stdout/stderr are forwarded to the CP's stderr (visible in docker logs).
func (*SubprocessManager) WaitHealthy ¶
func (m *SubprocessManager) WaitHealthy(ctx context.Context, name string, check HealthCheck) error
WaitHealthy polls a health endpoint until it returns 200 or the timeout expires. Returns an error if the subprocess crashes before becoming healthy.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package adminclient constructs the CLI's gRPC client to the control plane's AdminService.
|
Package adminclient constructs the CLI's gRPC client to the control plane's AdminService. |
|
Dialer-side helpers in package agent: CP-side outbound mTLS dial logic for the CP→clawkerd Session channel.
|
Dialer-side helpers in package agent: CP-side outbound mTLS dial logic for the CP→clawkerd Session channel. |
|
Package dockerevents subscribes to moby's container/network event stream and republishes it on the Overseer bus as a single typed envelope, DockerEvent, wrapping moby's events.Message verbatim.
|
Package dockerevents subscribes to moby's container/network event stream and republishes it on the Overseer bus as a single typed envelope, DockerEvent, wrapping moby's events.Message verbatim. |
|
ebpf
Package ebpf provides eBPF-based traffic routing for clawker containers.
|
Package ebpf provides eBPF-based traffic routing for clawker containers. |
|
ebpf/cmd
command
ebpf-manager is the entrypoint binary for the clawker eBPF manager container.
|
ebpf-manager is the entrypoint binary for the clawker eBPF manager container. |
|
Package infracerts issues short-lived mTLS client certificates for clawker infrastructure services (Envoy, CoreDNS, clawker-cp's own trusted-lane OTLP exporter, future hostproxy observability sidecars, ...) using a CLI-provisioned intermediate CA.
|
Package infracerts issues short-lived mTLS client certificates for clawker infrastructure services (Envoy, CoreDNS, clawker-cp's own trusted-lane OTLP exporter, future hostproxy observability sidecars, ...) using a CLI-provisioned intermediate CA. |
|
Package otelcerts mints and provisions short-lived mTLS client material for the trusted OTLP/infra lane.
|
Package otelcerts mints and provisions short-lived mTLS client material for the trusted OTLP/infra lane. |
|
Package overseer is the typed event bus + in-memory worldview state for the clawker control plane.
|
Package overseer is the typed event bus + in-memory worldview state for the clawker control plane. |