Documentation
¶
Overview ¶
Dialer-side helpers in package agent: CP-side outbound mTLS dial logic for the CP→clawkerd Session channel.
Single entry point: Dialer.DialAgent(ctx, containerID). The same function is invoked at CP boot (over the result of listAgentIDs) and from the typed-event subscriber on dockerevents.DockerEvent (filtered to container start/restart/unpause for purpose=agent containers) when an agent container reaches running state at runtime — so two callers, one dial path.
DialAgent is fire-and-forget: it spawns a goroutine that owns the dial, the Session stream, and the lifetime drain loop. All failures are logged at the Error level — callers don't need to handle errors. A failed dial leaves no resources behind; a successful dial is held open until ctx cancels (CP shutdown) or the peer closes.
Asymmetric trust model — load-bearing ¶
CP is the overlord. The dial NEVER aborts on cert / identity grounds. Cert chain verification and registry thumbprint classification outcomes are captured on the establishResult and surfaced through the typed event surface — SessionConnected carries flat PeerAgentFullName/PeerThumbprint fields (purely diagnostic, never a gate); AgentRegistered/AgentUntrusted carry the policy outcomes. Subscribers consume those events to enact policy (containment, alerting, eviction); the dialer holds no policy itself.
Why permissive: CP must always be able to reach clawkerd to issue containment commands (iptables lock, network detach, container kill). A compromised clawkerd presenting a bad cert is exactly when the channel must be UP so CP can react. Aborting on cert grounds would strand CP exactly at the moment governance is most needed.
The asymmetric counterpart lives in cmd/clawkerd/listener.go: the clawkerd-side listener is STRICT — CP CN pin + Client-Auth EKU + CA chain enforced at TLS layer. clawkerd refuses any peer that isn't CP. This pairs with the dialer's permissive client posture.
Connection establishment failures still happen on connectivity grounds (TCP timeout, container gone, retry exhausted, ctx cancelled) — those drive establishOutcome and SessionFailed.
FD-leak ceiling: a successful dial whose conn.Close() repeatedly fails would accumulate file descriptors and gRPC keepalive goroutines indefinitely. After closeErrCeiling consecutive close failures the dial loop bails for the target with a SessionFailed event (Reason carries the "fd-leak-ceiling" classification) so operators see the outcome instead of a silent leak. A successful close anywhere in the loop resets the counter.
Identity resolution flow for AgentService RPCs:
- The CP's agent listener enforces mTLS at the TLS layer (server cert + ClientAuth EKU + chain to the CLI CA).
- AuthInterceptor verifies the bearer token + per-method scope.
- IdentityInterceptor (this package) runs a universal three-stage gate on EVERY RPC including Register: (a) Subject.CommonName == consts.ContainerClawkerd (constant-time). The cert CN is the deterministic clawkerd binary identity; mismatch means the peer is not presenting a CLI-minted agent cert. (b) Resolve the kernel-attested peer IP to the purpose=agent container that owns it on clawker-net (via ContainerByPeerIP), reading the project/agent labels as the authoritative identity source — Docker is independent ground truth. (c) Compose the label-derived AgentFullName and constant-time compare it against the cert's urn:clawker:agent: URI SAN. The cert's SAN claim is VERIFIED against the IP-grounded label truth — never the basis of lookup.
- The resolved (containerID, project, agentName) is attached to ctx via WithResolvedContainer; downstream handlers read it via ResolvedContainerFromContext to avoid re-inspecting the container.
All rejections return codes.PermissionDenied with a generic envelope — no leak about which check failed.
Package agent owns the CP-side agent surface: the persisted identity registry, the AgentService.Register handler, the per-RPC IdentityInterceptor, the CP→clawkerd Session dialer, and the AgentRegistered/AgentUntrusted event types.
Registry rows record the (mTLS cert thumbprint, container_id, project, agent_name) tuple. Reads are issued by:
- the Register handler (idempotency / replay-protection at row write time, keyed by container_id)
- the dialer at Hello time (LookupByContainerID → drives AgentRegistered / AgentUntrusted publication via thumbprint compare against the live peer cert)
- AdminService.ListAgents (Snapshot)
Writes are CP-only: the Register handler captures the live mTLS peer's thumbprint and writes the row. Eviction: startup orphan-row reap (in agent.Start), plus dockerevents container/destroy (subscribed in agent.Start). Stop/die/kill do NOT evict because a stopped container can be `docker start`-ed back into life.
Identity is channel-bound: the registry key is `(thumbprint, container_id)` (both UNIQUE in sqlite). Agent-full-name composition from `(project, agent_name)` is no longer persisted — the IdentityInterceptor establishes the trust anchor via the kernel-attested peer IP and cross-checks the cert SAN against the label-derived AgentFullName at the gRPC boundary, so the registry holds only the per-row identity tuple.
Index ¶
- Variables
- func EnsureSchema(dbPath string, log *logger.Logger) error
- func IdentityInterceptor(peerLookup ContainerByPeerIP, log *logger.Logger) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor, error)
- func Start(ctx context.Context, deps StartDeps) (func(), error)
- func WithResolvedContainer(ctx context.Context, resolved ResolvedContainer) context.Context
- type AgentRegistered
- type AgentUntrusted
- type ContainerByPeerIP
- type ContainerLister
- type Dialer
- type Entry
- type Executor
- type Handler
- type InitCompleted
- type InitFailed
- type InitStarted
- type InitStepCompleted
- type InitStepFailed
- type InitStepStarted
- type InitTarget
- type MobyPeerLookup
- type ReapDegraded
- type Registry
- type ResolvedContainer
- type SessionBroken
- type SessionConnected
- type SessionConnecting
- type SessionFailed
- type StartDeps
Constants ¶
This section is empty.
Variables ¶
var ErrAmbiguousPeerIP = errors.New("multiple purpose=agent containers match peer IP")
ErrAmbiguousPeerIP is returned when two or more `purpose=agent` containers on clawker-net advertise endpoints with the same peer IP. Docker can transiently leave stale endpoints in NetworkSettings.Networks during restart cycles, and grounding the trust anchor on the first match would create a race window. Fail closed instead — operators see a distinct event and the trust gate rejects the RPC until the daemon state converges.
var ErrInvalidAgentLabel = errors.New("agent container has invalid identity label")
ErrInvalidAgentLabel is returned when the container matched by peer IP carries a missing or malformed dev.clawker.agent label. A missing dev.clawker.project label is NOT an error here — it is the legitimate global-scope-agent signal (no project namespace, producing 2-segment naming clawker.<agent>); auth.NewProjectSlug("") returns a zero-value slug with nil err. Distinguishing this from a clean no-match lets the trust gate emit a daemon-state diagnostic instead of a generic auth reject.
var ErrMalformedEntry = errors.New("agentregistry: malformed registry row")
ErrMalformedEntry is returned by sqlite reads when a row's agent_name / project / thumbprint fails re-validation (a value that landed pre-typed-boundary, a hand-edited DB, or a corrupted column). The Register handler treats this sentinel as "evict + re-write": the row's identity is unusable so it gets purged and replaced by the typed identity the middleware just resolved off the live peer IP + cert. Wrapped via fmt.Errorf so errors.Is on the wrapped error works for callers that don't unwrap themselves.
var ErrNoContainerForPeerIP = errors.New("no purpose=agent container with matching clawker-net IP")
ErrNoContainerForPeerIP is returned when no `purpose=agent` container on the clawker-net network has an endpoint IP matching the requested peer IP. Callers MUST treat this as a hard authentication failure.
var ErrUnknownAgent = errors.New("agentregistry: unknown agent")
ErrUnknownAgent is returned by LookupByContainerID when no entry matches. Thumbprint compare against the live peer cert is the dialer's job, not the registry's.
Functions ¶
func EnsureSchema ¶
EnsureSchema opens the database in writer mode (creating the file if missing), applies the schema, and closes. Called by the CP daemon at startup before NewSQLiteWriter so the schema apply path is observable independent of the long-lived writer handle.
func IdentityInterceptor ¶
func IdentityInterceptor(peerLookup ContainerByPeerIP, log *logger.Logger) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor, error)
IdentityInterceptor returns paired unary and stream interceptors that enforce the universal identity gate documented at the top of handler.go. Wired AFTER AuthInterceptor on the agent listener so token validation runs first.
peerLookup is required — a nil resolver would silently disable the trust gate and admit every RPC. Returns a non-nil error rather than panicking: this constructor runs post-eBPF-load in main.go, so a panic here would strand pinned eBPF programs with no supervisor. main.go logs the error structurally (event=agent_identity_unavailable) and refuses to bring up the agent listener, degrading the AgentService surface while CP + firewall + admin listener stay up.
log defaults to logger.Nop() if nil so the panic-recovery path of grpc-go never sees a nil deref.
Every rejection returns codes.PermissionDenied with the same generic envelope ("registration rejected") — attackers must not learn which check failed. The structured log carries the classification via a unique event= field per stage.
func Start ¶
Start gathers the full agent worldview at CP boot and wires every ongoing agent-axis subscription. Called once from cmd/clawker-cp/main.go after the bus is up. Returns a cleanup func that unwinds in reverse order.
Steps, in order:
- Reap orphan registry rows. List every purpose=agent container (All:true, includes stopped). For each registry row whose container_id is missing from docker, evict. Heals the registry against `docker rm`s that landed while CP was down.
- Subscribe to dockerevents.DockerEvent for the evict path — filter on container/destroy; consumer evicts the registry row.
- Subscribe to dockerevents.DockerEvent for the dial path — filter on container/start|restart|unpause with purpose=agent; consumer calls dialer.DialAgent.
The previously-fragmented exports (Reap, registry Subscribe, dial Subscribe) are now unexported helpers behind this single function.
func WithResolvedContainer ¶ added in v0.9.0
func WithResolvedContainer(ctx context.Context, resolved ResolvedContainer) context.Context
WithResolvedContainer attaches the peer-IP-resolved container identity to ctx for downstream handlers. Exposed so test code and future identity-augmenting interceptors can attach a resolved container directly; production code never needs to call this (the interceptor does it on success).
A zero-value ResolvedContainer (empty ContainerID) is silently dropped — ctx is returned unchanged so downstream ResolvedContainerFromContext sees ok=false. The CP must not panic on the serving path (root CLAUDE.md security contract); the read-side defensive ok=false is the floor against the silent-identity-vacuum hazard.
Types ¶
type AgentRegistered ¶
type AgentRegistered struct {
ContainerID string
AgentName string
Project string
Ok bool
Reason string
At time.Time
}
AgentRegistered fires once per container lifetime, after the CP-driven Register handshake (success or failure). Steady-state "row already exists" reconnects do NOT re-fire — query State.Agents[ID].Registered for "is this agent registered now". Ok=false also drives AgentUntrusted{ReasonRegisterFailed}.
func (AgentRegistered) ApplyTo ¶
func (e AgentRegistered) ApplyTo(s *overseer.State)
func (AgentRegistered) EventName ¶
func (e AgentRegistered) EventName() string
func (AgentRegistered) MarshalZerologObject ¶
func (e AgentRegistered) MarshalZerologObject(z *zerolog.Event)
func (AgentRegistered) OccurredAt ¶
func (e AgentRegistered) OccurredAt() time.Time
type AgentUntrusted ¶
type AgentUntrusted struct {
ContainerID string
AgentName string
Project string
Reason overseer.UntrustedReason
Detail string
At time.Time
}
AgentUntrusted fires when an identity outcome violates the trust contract (thumbprint mismatch, CN mismatch, peer-IP mismatch, Register failed). Session stays open (asymmetric trust); subscribers enact policy.
func (AgentUntrusted) ApplyTo ¶
func (e AgentUntrusted) ApplyTo(s *overseer.State)
func (AgentUntrusted) EventName ¶
func (e AgentUntrusted) EventName() string
func (AgentUntrusted) MarshalZerologObject ¶
func (e AgentUntrusted) MarshalZerologObject(z *zerolog.Event)
func (AgentUntrusted) OccurredAt ¶
func (e AgentUntrusted) OccurredAt() time.Time
type ContainerByPeerIP ¶ added in v0.9.0
type ContainerByPeerIP interface {
LookupByIP(ctx context.Context, ip netip.Addr) (ResolvedContainer, error)
}
ContainerByPeerIP resolves a live mTLS peer IP to the `purpose=agent` container owning that IP on clawker-net. Returns ErrNoContainerForPeerIP when nothing matches, ErrInvalidAgentLabel when the matching container's dev.clawker.agent label is missing or malformed (an absent dev.clawker.project label is a legitimate global-scope-agent signal, not an error), ErrAmbiguousPeerIP when two or more containers share the peer IP (Docker restart-race window — fails closed), or a wrapped daemon error.
type ContainerLister ¶
ContainerLister enumerates every `purpose=agent` container ID currently known to the docker daemon. The implementation MUST include stopped/exited containers — a stopped container can be `docker start`-ed back into life, and its registry row should survive that transition. Only `docker rm` (destroy) means the container is genuinely gone and the row is orphaned.
type Dialer ¶
type Dialer struct {
// contains filtered or unexported fields
}
Dialer captures the CP-side material every dial needs. Construct once at CP startup; share across all agent dials.
dialing is the dedup + cancel map: containerIDs currently being dialed (or already-Session-established) mapped to the cancel func for their per-dial ctx. Initial poll and the dockerevents subscriber both call DialAgent for the same running container; the dedup keeps the second call from spinning a duplicate goroutine against an already-open Session. Membership lasts the lifetime of the dial goroutine — after the Session closes (peer drop, ctx cancel, retry timeout), the entry is removed and a future event for the same containerID dials fresh.
CancelDial uses the stored cancel func to tear down a Session synchronously with a registry-evict (container/destroy) — without it, the dialer's runDial loop only notices the disappearance on the next reconnect attempt via outcomeContainerGone, leaving a doomed stream open during the interval.
func New ¶
func New(log *logger.Logger, docker mobyclient.APIClient, bus *overseer.Overseer, agents Registry, certPath, keyPath string, caPool *x509.CertPool, initExec *Executor) (*Dialer, error)
New constructs a Dialer. Returns an error if the CP client cert / key cannot be loaded — better to fail at CP startup than to defer the failure to the first dial.
bus is required: the dialer publishes typed Session* events (SessionConnecting / Connected / Failed / Broken) so other CP components can subscribe to connection lifecycle without coupling to the dialer directly. Pass a real *overseer.Overseer; tests can use an in-memory bus (it's cheap).
agents is required: every successful dial cross-checks the peer cert against the registry row keyed by container_id and dispatches the typed AgentRegistered / AgentUntrusted events accordingly. nil agents would strand worldview consumers without a registration signal.
func (*Dialer) CancelDial ¶ added in v0.9.0
CancelDial synchronously cancels the in-flight Session for containerID, if any. Called by the registry-evict subscriber on container/destroy so the dialer tears down the doomed stream immediately rather than waiting for the next reconnect to classify outcomeContainerGone. Safe to call when no dial is in flight (no-op) and concurrent-safe (mu-guarded). The goroutine's own cleanup runs the deferred delete after runDial returns.
func (*Dialer) DialAgent ¶
DialAgent opens a Session stream to the clawkerd listener inside the given agent container, sends Hello, awaits HelloAck, and holds the stream open until ctx cancels or the peer closes. Returns immediately — the dial + lifetime drain run on a background goroutine. All failures are logged.
Dedup: a no-op if a dial for the same containerID is already in flight. Initial poll + dockerevents subscriber both reach this function with overlapping IDs at CP startup; dedup keeps a second call from spinning a duplicate goroutine.
Retry: the dial goroutine retries connection establishment with exponential backoff + full jitter (cap connectMaxBackoff) until either the Session is established (Hello + HelloAck) or connectTotalTimeout elapses. Once established, no retry from the drain — a stream break ends the goroutine and removes the dedup entry, so a subsequent restart event re-dials.
type Entry ¶
type Entry struct {
// AgentName is the user-typed short name. Typed (auth.AgentName)
// purely for compile-time discipline so the registry can't hold a
// raw string where an AgentName is expected. auth.NewAgentName
// rejects only the empty case today; charset/length/form
// constraints are enforced downstream (Docker container/volume
// create, x509 URI SAN encoding, IdentityInterceptor's symmetric
// SAN-vs-label compare). User-typed input is normalized upstream
// by cmdutil.ProjectSlugify before it crosses into auth.
// Constructed by the Register handler at the wire boundary via
// auth.NewAgentName; re-validated via auth.NewAgentName during
// sqlite Snapshot reads — rows with an empty agent_name column
// are skipped, never panicked on.
AgentName auth.AgentName
// Project is the clawker project slug under which the agent
// registered. The zero value (auth.ProjectSlug{}) signals a
// global-scope agent (no project namespace), matching the
// 2-segment docker.ContainerName shape. Typed for the same reason
// as AgentName; auth.NewProjectSlug accepts any input including
// empty.
Project auth.ProjectSlug
ContainerID string
Thumbprint [sha256.Size]byte
RegisteredAt time.Time
LastSeen time.Time
}
Entry is one registered agent. Created CP-side at Register handler entry carrying the ContainerID, AgentName, Project, and the SHA-256 over the peer cert DER (Thumbprint, captured live from the mTLS handshake). LastSeen currently equals RegisteredAt; future per-agent RPCs will refresh LastSeen at their own boundary.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor dispatches the static CP-driven init plan against an open Session stream. Owns Recv during Run; the dialer's drainStream takes over after Run returns.
Plan idempotency contract: every Session establish runs the full plan, including reconnects after CP restart. Each step is idempotent (file-presence gates, append-if-missing, marker-file post-init), and AgentReady is no-op success when clawkerd already spawned the user CMD (spawnState's CAS rejects re-fork; handler replies Done{0}). Trade: a small volume of shell commands fires on every reconnect; gain: no per-container completed flag. Executor is shared across all containers — the dialer constructs one at CP boot and calls Run from a goroutine per DialAgent (one per agent container). Run holds no Executor-scoped mutable state: every call gets its own (ctx, stream, target) and drives its own stream's Recv loop in a single goroutine. The Run-owns-Recv invariant is per-stream (one Run, one stream, one Recv-driving goroutine), not per-Executor — concurrent Runs across different containers must not be serialized.
func NewExecutor ¶
NewExecutor constructs an Executor. nil log is replaced with logger.Nop(). bus is required — Run publishes init events unconditionally, so a nil bus would NPE deep inside overseer.Publish on the first event dispatch. Returning an error lets the caller (cmd/clawker-cp/main.go) log the wiring bug to the structured log surface and degrade gracefully (initExec = nil → dialer logs agent_init_executor_unset per dial) instead of crashing CP and stranding the failure on os.Stderr where only `docker logs` sees it. Matches the nil-bus contract on agent.New for the dialer.
func (*Executor) Run ¶
func (e *Executor) Run(ctx context.Context, stream clawkerdv1.ClawkerdService_SessionClient, target InitTarget) (runErr error)
Run dispatches the init plan one step at a time, awaiting Done or Error per command before sending the next. Publishes init events throughout. Returns the error that halted the run (transport failure, step failure surfaced as a Go error), or nil on full success.
Caller invariant: stream must already have completed Hello/HelloAck and any Register handshake. Run owns stream.Recv exclusively for its duration — but per-stream, not per-Executor. Concurrent Runs across different streams (the prod case: parallel agent containers after a CP restart) execute in parallel.
type Handler ¶
type Handler struct {
agentv1.UnimplementedAgentServiceServer
// contains filtered or unexported fields
}
Handler serves the Register RPC on the CP's clawker-net agent listener and is the SOLE writer of the agentregistry sqlite DB. IdentityInterceptor has already grounded the peer in the daemon-attested container identity (peer IP → purpose=agent container → labels → cross-checked cert SAN AgentFullName) and attached the resolved (containerID, project, agentName) to ctx. The handler reads that, captures the cert thumbprint at the gate that persists it, cross-checks the cert's container_id SAN + request fields against the resolved truth, and writes the registry row using label-derived (authoritative) values.
Trust ordering: daemon labels > cert claim > request claim. Persisting `resolved.*` keeps the registry aligned with the daemon view; the request body is treated as a client claim that must agree but never as the source of truth.
All identity-rejection paths return codes.PermissionDenied with a generic envelope; the structured log line carries the specific failure classification for operator triage. A missing resolved container in ctx is a wiring bug (interceptor not chained) and surfaces as codes.Internal.
func NewHandler ¶
NewHandler constructs a Register handler. registry MUST be non-nil (NPE on first Register otherwise — fail at construction so the regression surfaces at CP startup, not at the first agent boot). log defaults to logger.Nop() when nil. clock defaults to time.Now.
func (*Handler) Register ¶
func (h *Handler) Register(ctx context.Context, req *agentv1.RegisterRequest) (*agentv1.Welcome, error)
Register is the one-time-per-container handshake CP triggers via a RegisterRequired Command on the Session bidi stream. clawkerd presents a CLI-CA-signed cert and calls Register; the universal IdentityInterceptor has already pinned CN=ContainerClawkerd, resolved the peer IP to a purpose=agent container, and verified that the cert's urn:clawker:agent: SAN matches the label-derived AgentFullName. The handler captures the live mTLS peer's cert thumbprint, cross-checks the cert's urn:clawker:container: SAN and the request fields against the middleware-resolved identity, then writes the agentregistry row using the label-derived (authoritative) values.
Rejection paths (all return PermissionDenied unless noted; the classification is logged at Warn for operator triage):
- resolved container missing from ctx — middleware did not run on this RPC (wiring regression). Returns Internal.
- peer cert missing from ctx post-resolve — defense-in-depth; the interceptor pre-validates the cert, so reaching this branch means ctx was tampered between interceptor and handler.
- request agent_name or project malformed (InvalidArgument)
- request fields disagree with middleware-resolved labels (the client lying about its own identity in the RPC body, even though cert+labels agree)
- cert urn:clawker:container: SAN missing or doesn't match the middleware-resolved container_id
- existing row for this container_id has a different thumbprint (cert replay; CLI is the only legit cert source)
Idempotent retry: an existing row whose thumbprint matches the captured thumbprint returns Welcome silently — Session retries after the row was already written.
type InitCompleted ¶
type InitCompleted struct {
ContainerID string
AgentName string
Project string
Duration time.Duration
At time.Time
}
InitCompleted is the terminal success event for one init phase. Subscribers waiting for "agent ready to serve user work" should listen for this rather than SessionConnected — Session is connected long before init finishes.
func (InitCompleted) ApplyTo ¶
func (e InitCompleted) ApplyTo(s *overseer.State)
func (InitCompleted) EventName ¶
func (e InitCompleted) EventName() string
func (InitCompleted) MarshalZerologObject ¶
func (e InitCompleted) MarshalZerologObject(z *zerolog.Event)
func (InitCompleted) OccurredAt ¶
func (e InitCompleted) OccurredAt() time.Time
type InitFailed ¶
type InitFailed struct {
ContainerID string
AgentName string
Project string
FailedStep string
Reason overseer.InitFailureReason
Detail string
Duration time.Duration
At time.Time
}
InitFailed is the terminal failure event for one init phase. Carries the typed Reason classification and the human-readable Detail so subscribers can surface the proximate cause without re-deriving from the step event stream.
func (InitFailed) ApplyTo ¶
func (e InitFailed) ApplyTo(s *overseer.State)
func (InitFailed) EventName ¶
func (e InitFailed) EventName() string
func (InitFailed) MarshalZerologObject ¶
func (e InitFailed) MarshalZerologObject(z *zerolog.Event)
func (InitFailed) OccurredAt ¶
func (e InitFailed) OccurredAt() time.Time
type InitStarted ¶
type InitStarted struct {
ContainerID string
AgentName string
Project string
StepCount int
At time.Time
}
InitStarted fires when CP begins running the init plan against an established Session. Re-published on every Session reconnect that re-runs the plan. StepCount lets streaming subscribers render "1 of N" progress without re-deriving plan length.
func (InitStarted) ApplyTo ¶
func (e InitStarted) ApplyTo(s *overseer.State)
func (InitStarted) EventName ¶
func (e InitStarted) EventName() string
func (InitStarted) MarshalZerologObject ¶
func (e InitStarted) MarshalZerologObject(z *zerolog.Event)
func (InitStarted) OccurredAt ¶
func (e InitStarted) OccurredAt() time.Time
type InitStepCompleted ¶
type InitStepCompleted struct {
ContainerID string
AgentName string
Project string
StepName string
StepIndex int
Duration time.Duration
ExitCode int32
At time.Time
}
InitStepCompleted fires when a step's ShellCommand returns Done with exit_code == 0.
func (InitStepCompleted) ApplyTo ¶
func (e InitStepCompleted) ApplyTo(s *overseer.State)
func (InitStepCompleted) EventName ¶
func (e InitStepCompleted) EventName() string
func (InitStepCompleted) MarshalZerologObject ¶
func (e InitStepCompleted) MarshalZerologObject(z *zerolog.Event)
func (InitStepCompleted) OccurredAt ¶
func (e InitStepCompleted) OccurredAt() time.Time
type InitStepFailed ¶
type InitStepFailed struct {
ContainerID string
AgentName string
Project string
StepName string
StepIndex int
Duration time.Duration
ExitCode int32
Reason overseer.InitFailureReason
Detail string
At time.Time
}
InitStepFailed fires when a step terminates non-zero, errors, or times out. Reason is the typed classification subscribers branch on; Detail is the human-readable diagnostic (formatted ErrorCode + message + truncated stderr). The Executor halts the plan on this event and publishes a terminal InitFailed.
func (InitStepFailed) ApplyTo ¶
func (e InitStepFailed) ApplyTo(s *overseer.State)
func (InitStepFailed) EventName ¶
func (e InitStepFailed) EventName() string
func (InitStepFailed) MarshalZerologObject ¶
func (e InitStepFailed) MarshalZerologObject(z *zerolog.Event)
func (InitStepFailed) OccurredAt ¶
func (e InitStepFailed) OccurredAt() time.Time
type InitStepStarted ¶
type InitStepStarted struct {
ContainerID string
AgentName string
Project string
StepName string
StepIndex int
StepCount int
At time.Time
}
InitStepStarted fires when the Executor dispatches one step's ShellCommand. StepName is the wire-contract vocabulary subscribers match against ("config", "git", "ssh", "post-init", "agent-ready").
func (InitStepStarted) ApplyTo ¶
func (e InitStepStarted) ApplyTo(s *overseer.State)
func (InitStepStarted) EventName ¶
func (e InitStepStarted) EventName() string
func (InitStepStarted) MarshalZerologObject ¶
func (e InitStepStarted) MarshalZerologObject(z *zerolog.Event)
func (InitStepStarted) OccurredAt ¶
func (e InitStepStarted) OccurredAt() time.Time
type InitTarget ¶
InitTarget identifies the agent the Executor is initializing. Threaded through every init event so subscribers see consistent identity fields without re-deriving from the registry.
type MobyPeerLookup ¶ added in v0.9.0
type MobyPeerLookup struct {
// contains filtered or unexported fields
}
MobyPeerLookup is the production ContainerByPeerIP backed by the Docker daemon.
func NewMobyPeerLookup ¶ added in v0.9.0
func NewMobyPeerLookup(cli mobyclient.APIClient, log *logger.Logger) *MobyPeerLookup
NewMobyPeerLookup wraps a moby APIClient as a ContainerByPeerIP resolver. log defaults to logger.Nop() when nil.
func (*MobyPeerLookup) LookupByIP ¶ added in v0.9.0
func (m *MobyPeerLookup) LookupByIP(ctx context.Context, ip netip.Addr) (ResolvedContainer, error)
LookupByIP walks every `purpose=agent` container and returns the one whose clawker-net endpoint IP matches ip. The walk is exhaustive: ambiguous-IP advertisements (multiple containers with overlapping endpoint state during restart cycles) return ErrAmbiguousPeerIP rather than picking the first match. A transient per-container inspect failure is logged and iteration continues. A daemon-error wrap is returned only when NO candidate could be inspected — a real no-match is reported as ErrNoContainerForPeerIP even when one inspect along the way failed, so the peer_lookup_no_match audit signal stays useful.
type ReapDegraded ¶
ReapDegraded fires when CP's startup reap of orphan registry rows fails. Rows for containers destroyed while CP was down may persist as ghosts until a successful future reap. Pure informational event (no ApplyTo).
func (ReapDegraded) EventName ¶
func (e ReapDegraded) EventName() string
func (ReapDegraded) MarshalZerologObject ¶
func (e ReapDegraded) MarshalZerologObject(z *zerolog.Event)
func (ReapDegraded) OccurredAt ¶
func (e ReapDegraded) OccurredAt() time.Time
type Registry ¶
type Registry interface {
// Add inserts an entry keyed by (Entry.Thumbprint, Entry.ContainerID).
// Container restart produces a new cert and a new thumbprint, so
// re-registration creates a new entry; the dockerevents subscription
// is responsible for evicting the stale one by container ID.
//
// Returns an error when the persistence layer (sqlite) rejects the
// write — disk full, schema corruption, UNIQUE collision against a
// stale row that hasn't been evicted yet. Callers translate the
// error into the appropriate gRPC status.
//
// Add returns an error on programming-error invariants (zero
// thumbprint, empty ContainerID, zero AgentName, zero
// RegisteredAt). Both Registry implementations (in-memory and
// sqlite) propagate the error from `validateEntry` rather than
// panicking — Add lives on a gRPC handler goroutine reachable
// post-SetReady from Register, and a panic on that path would
// strand eBPF programs with no supervisor (see root CLAUDE.md).
// Register maps the error to codes.Internal: every field
// validateEntry checks is server-derived (thumbprint from the
// live peer cert, ContainerID/AgentName/Project from
// IdentityInterceptor's ResolvedContainer, RegisteredAt from
// h.clock()), so a failure here is a CP wiring bug, not bad
// client input. User-controlled identity strings are validated
// upstream at the wire boundary (auth.NewProjectSlug /
// auth.NewAgentName → codes.InvalidArgument).
Add(entry Entry) error
// LookupByContainerID returns the entry whose ContainerID matches.
// Used by the Register handler (idempotency / replay-protection)
// and by the dialer at Hello time to drive registry classification
// (Match / Miss / ThumbprintMismatch) and decide whether to send
// RegisterRequired or publish AgentUntrusted. The dialer performs
// the thumbprint comparison against the returned entry itself;
// this read intentionally does not gate on thumbprint so a
// mismatch surfaces as a typed local outcome rather than as
// ErrUnknownAgent.
//
// Returns (nil, ErrUnknownAgent) when no entry matches.
LookupByContainerID(containerID string) (*Entry, error)
// EvictByContainerID removes any entry whose ContainerID matches.
// Returns the underlying persistence error so callers can decide
// whether to retry, log, or abort. The dockerevents-driven and
// reaper-driven callers log-and-proceed because a transient sqlite
// failure must not stall the eviction pipeline; CLI-side `clawker
// container remove` similarly logs at debug since registry hiccups
// must not surface as remove failures (the row gets pruned later
// by the dockerevents subscription).
EvictByContainerID(containerID string) error
// Snapshot returns a copy of every live entry, sorted by
// (Project, AgentName) for deterministic output. Project is the
// primary sort key because the same short AgentName can be reused
// across different projects (the composite identity is
// (project, agent)). Used by AdminService.ListAgents and the
// `clawker controlplane agents` CLI; both rely on stable ordering
// for diffability.
//
// Returns a non-nil error on persistence failure (sqlite db.Query
// or rows.Err non-nil). Callers must NOT treat an empty result as
// authoritative when err != nil — reapOrphans would otherwise
// evict every registered agent on a transient query failure;
// ListAgents would surface "no agents" to operators while the
// registry is intact but unreadable.
Snapshot() ([]Entry, error)
}
Registry is the consumer-facing contract.
func NewRegistry ¶
NewRegistry constructs an empty in-memory registry. Logger is required (use logger.Nop() in tests) so audit-trail messages on Add and Evict are captured even when production logging is otherwise disabled.
func NewSQLiteWriter ¶
NewSQLiteWriter opens (or creates) the sqlite database at dbPath, applies the schema, and returns a registry suitable for the writer process. CP is the SOLE writer in this design (the CLI no longer opens the registry DB), so a separate reader-mode constructor is not provided — every consumer that previously read from sqlite now goes through `f.AdminClient(ctx).ListAgents`.
The parent directory of dbPath must already exist.
type ResolvedContainer ¶ added in v0.9.0
type ResolvedContainer struct {
ContainerID string
Project auth.ProjectSlug
AgentName auth.AgentName
}
ResolvedContainer is the authoritative identity of an agent container as resolved from its live mTLS peer IP. Project and AgentName are typed so a zero-value instance cannot carry empty strings into a downstream identity comparison; the resolver constructs these via auth.NewProjectSlug / auth.NewAgentName and rejects malformed labels at the seam.
func ResolvedContainerFromContext ¶ added in v0.9.0
func ResolvedContainerFromContext(ctx context.Context) (ResolvedContainer, bool)
ResolvedContainerFromContext returns the peer-IP-resolved container identity IdentityInterceptor attached to ctx. ok=false means the interceptor did not run on this RPC — a wiring bug for any AgentService handler that consumes resolved identity. Defensive against a zero-value resolved container (empty ContainerID): treats it as ok=false so handlers can rely on ok=true meaning a non-empty resolved container is available.
type SessionBroken ¶
type SessionBroken struct {
ContainerID string
AgentName string
Project string
Address string
Reason string
At time.Time
}
SessionBroken fires when an established Session terminates. Reason classifies the cause (peer EOF, transport break, error string). Not published on intentional teardown (CP shutdown / ctx cancel) — see runDial for the suppression rationale.
func (SessionBroken) ApplyTo ¶
func (e SessionBroken) ApplyTo(s *overseer.State)
func (SessionBroken) EventName ¶
func (e SessionBroken) EventName() string
func (SessionBroken) MarshalZerologObject ¶
func (e SessionBroken) MarshalZerologObject(z *zerolog.Event)
func (SessionBroken) OccurredAt ¶
func (e SessionBroken) OccurredAt() time.Time
type SessionConnected ¶
type SessionConnected struct {
ContainerID string
AgentName string
Project string
Address string
Attempts int
PeerAgentFullName string
PeerThumbprint [sha256.Size]byte
At time.Time
}
SessionConnected fires when a Session establishes (mTLS dial + Hello handshake completes). Identity fields (PeerAgentFullName, PeerThumbprint) ride alongside the transport fields so subscribers and the worldview have everything in one event — Provenance struct is retired, AgentUntrusted/AgentRegistered carry the policy outcomes.
func (SessionConnected) ApplyTo ¶
func (e SessionConnected) ApplyTo(s *overseer.State)
func (SessionConnected) EventName ¶
func (e SessionConnected) EventName() string
func (SessionConnected) MarshalZerologObject ¶
func (e SessionConnected) MarshalZerologObject(z *zerolog.Event)
func (SessionConnected) OccurredAt ¶
func (e SessionConnected) OccurredAt() time.Time
type SessionConnecting ¶
type SessionConnecting struct {
ContainerID string
AgentName string
Project string
Address string
At time.Time
}
SessionConnecting fires when a dial attempt starts (first successful inspect of the container in a cycle). Carries the agent identity labels and the address being dialed.
func (SessionConnecting) ApplyTo ¶
func (e SessionConnecting) ApplyTo(s *overseer.State)
func (SessionConnecting) EventName ¶
func (e SessionConnecting) EventName() string
func (SessionConnecting) MarshalZerologObject ¶
func (e SessionConnecting) MarshalZerologObject(z *zerolog.Event)
func (SessionConnecting) OccurredAt ¶
func (e SessionConnecting) OccurredAt() time.Time
type SessionFailed ¶
type SessionFailed struct {
ContainerID string
AgentName string
Project string
Address string
Reason string
Attempts int
At time.Time
}
SessionFailed fires when the retry budget for a dial cycle exhausts before any attempt established a Session. Reason carries a short classification ("connect_total_timeout", "container_not_running"); the underlying dial error is in the log line, not on the event.
func (SessionFailed) ApplyTo ¶
func (e SessionFailed) ApplyTo(s *overseer.State)
func (SessionFailed) EventName ¶
func (e SessionFailed) EventName() string
func (SessionFailed) MarshalZerologObject ¶
func (e SessionFailed) MarshalZerologObject(z *zerolog.Event)
func (SessionFailed) OccurredAt ¶
func (e SessionFailed) OccurredAt() time.Time
type StartDeps ¶
type StartDeps struct {
Registry Registry
DockerLister ContainerLister
PeerLookup ContainerByPeerIP
Dialer *Dialer
Bus *overseer.Overseer
Log *logger.Logger
}
StartDeps bundles the dependencies the umbrella `Start` procedure needs. Bus + Registry + Dialer + Docker lister + peer-IP resolver are all owned by the CP startup path; passing them as a single struct keeps the call site in cmd/clawker-cp/main.go a single function call. All fields are required.