agent

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: MIT Imports: 40 Imported by: 0

Documentation

Overview

Dialer-side helpers in package agent: CP-side outbound mTLS dial logic for the CP→clawkerd Session channel.

Single entry point: Dialer.DialAgent(ctx, containerID). The same function is invoked at CP boot (over the result of listAgentIDs) and from the typed-event subscriber on dockerevents.DockerEvent (filtered to container start/restart/unpause for purpose=agent containers) when an agent container reaches running state at runtime — so two callers, one dial path.

DialAgent is fire-and-forget: it spawns a goroutine that owns the dial, the Session stream, and the lifetime drain loop. All failures are logged at the Error level — callers don't need to handle errors. A failed dial leaves no resources behind; a successful dial is held open until ctx cancels (CP shutdown) or the peer closes.

Asymmetric trust model — load-bearing

CP is the overlord. The dial NEVER aborts on cert / identity grounds. Cert chain verification, peer CN match, and registry classification outcomes are captured on the establishResult and surfaced through the typed event surface — SessionConnected carries flat PeerCN/PeerThumbprint fields; AgentRegistered/AgentUntrusted carry the policy outcomes. Subscribers consume those events to enact policy (containment, alerting, eviction); the dialer holds no policy itself.

Why permissive: CP must always be able to reach clawkerd to issue containment commands (iptables lock, network detach, container kill). A compromised clawkerd presenting a bad cert is exactly when the channel must be UP so CP can react. Aborting on cert grounds would strand CP exactly at the moment governance is most needed.

The asymmetric counterpart lives in cmd/clawkerd/listener.go: the clawkerd-side listener is STRICT — CP CN pin + Client-Auth EKU + CA chain enforced at TLS layer. clawkerd refuses any peer that isn't CP. This pairs with the dialer's permissive client posture.

Connection establishment failures still happen on connectivity grounds (TCP timeout, container gone, retry exhausted, ctx cancelled) — those drive establishOutcome and SessionFailed.

FD-leak ceiling: a successful dial whose conn.Close() repeatedly fails would accumulate file descriptors and gRPC keepalive goroutines indefinitely. After closeErrCeiling consecutive close failures the dial loop bails for the target with a SessionFailed event (Reason carries the "fd-leak-ceiling" classification) so operators see the outcome instead of a silent leak. A successful close anywhere in the loop resets the counter.

Identity resolution flow for non-opt-out AgentService RPCs:

  1. The CP's agent listener enforces mTLS at the TLS layer (server cert + ClientAuth EKU + chain to the CLI CA).
  2. AuthInterceptor verifies the bearer token + per-method scope.
  3. IdentityInterceptor (this package) reads the peer cert via peer.FromContext, computes its SHA-256 thumbprint, and looks up the corresponding registry entry via Registry.Lookup, which does a constant-time CN compare against the row's pre-computed canonical CN.
  4. The resolved entry is attached to ctx via WithEntry; downstream handlers read it via EntryFromContext.

All rejections return codes.PermissionDenied with a generic envelope — no leak about which check failed.

AgentService.Register is opt-out (registry row doesn't exist pre-call) — the Register handler in register_handler.go does its own peer cert + IP + label cross-checks before writing the row.

Package agent owns the CP-side agent surface: the persisted identity registry, the AgentService.Register handler, the per-RPC IdentityInterceptor, the CP→clawkerd Session dialer, and the AgentRegistered/AgentUntrusted event types.

Registry rows record the (mTLS cert thumbprint, container_id, project, agent_name, canonical_cn) tuple. Reads are issued by:

  • IdentityInterceptor on every per-agent gRPC RPC (cert thumbprint → registry entry; CN cross-check inside Lookup)
  • the dialer at Hello time (classifyRegistry → drives AgentRegistered / AgentUntrusted publication)
  • AdminService.ListAgents

Writes are CP-only: the Register handler captures the live mTLS peer's thumbprint and writes the row. Eviction: startup orphan-row reap (in agent.Start), plus dockerevents container/destroy (subscribed in agent.Start). Stop/die/kill do NOT evict because a stopped container can be `docker start`-ed back into life.

Identity is channel-bound: the registry key is `(thumbprint, container_id)` (both UNIQUE in sqlite). The canonical CN composed from `(project, agent_name)` is stored as a column at Add time and compared via `subtle.ConstantTimeCompare` inside Lookup — no reconstruction at read time.

Index

Constants

This section is empty.

Variables

View Source
var ErrUnknownAgent = errors.New("agentregistry: unknown agent")

ErrUnknownAgent is returned by Lookup when no entry matches the thumbprint+CN pair. Distinguishable from "agent disconnected" because the thumbprint is channel-bound: the only way to fail a Lookup is for the cert to have never registered, to have been evicted, or for the peer cert's CN not to match the entry's stored canonical CN. All three failure modes collapse into one sentinel — the handler maps it to a generic codes.PermissionDenied (matching every other Connect rejection) so callers can't probe which half of the composite identity failed.

Functions

func EnsureSchema

func EnsureSchema(dbPath string, log *logger.Logger) error

EnsureSchema opens the database in writer mode (creating the file if missing), applies the schema, and closes. Called by the CP daemon at startup before NewSQLiteWriter so the schema apply path is observable independent of the long-lived writer handle.

func IdentityInterceptor

func IdentityInterceptor(reg Registry, optedOut map[string]bool, log *logger.Logger) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor)

IdentityInterceptor returns paired unary and stream interceptors that resolve mTLS peer identity to a registry entry on every non- opted-out method. Wired AFTER AuthInterceptor on the agent listener so token validation runs first and a missing-identity rejection never burns introspector traffic.

reg is required (panic on nil — wiring regression). optedOut is a data-driven policy map; entries are matched on full gRPC method path ("/clawker.agent.v1.AgentService/Connect"). log is replaced with logger.Nop() if nil so the panic-recovery path of grpc-go never sees a nil deref inside the interceptor.

Every rejection returns codes.PermissionDenied with the same generic envelope ("registration rejected") as Connect's other rejections — attackers must not learn which check failed.

func IdentityOptedOutMethods

func IdentityOptedOutMethods() map[string]bool

IdentityOptedOutMethods returns the data-driven policy map of agent RPC methods that are EXEMPT from the identity-required default. Only bootstrap RPCs that authenticate themselves belong here.

Register is exempt because the registry row keyed by the peer cert thumbprint does not exist yet — the entire point of the call is to CREATE that row. Going through the identity interceptor would reject every legitimate Register call with PermissionDenied. The Register handler does its own peer-cert capture and cross-checks (CN + container_id SAN + peer IP + container labels) so this opt-out doesn't strip security; it just relocates the gate from the interceptor to the handler.

The shape mirrors AgentMethodScopes(): a build-time test walks the AgentService_ServiceDesc and asserts every method has either an explicit opt-out entry or falls into the default identity-required path. Adding an RPC without a deliberate policy decision fails the test, not the runtime — exactly the fail-secure posture the package aims for.

func Start

func Start(ctx context.Context, deps StartDeps) (func(), error)

Start gathers the full agent worldview at CP boot and wires every ongoing agent-axis subscription. Called once from cmd/clawker-cp/main.go after the bus is up. Returns a cleanup func that unwinds in reverse order.

Steps, in order:

  1. Reap orphan registry rows. List every purpose=agent container (All:true, includes stopped). For each registry row whose container_id is missing from docker, evict. Heals the registry against `docker rm`s that landed while CP was down.
  2. Subscribe to dockerevents.DockerEvent for the evict path — filter on container/destroy; consumer evicts the registry row.
  3. Subscribe to dockerevents.DockerEvent for the dial path — filter on container/start|restart|unpause with purpose=agent; consumer calls dialer.DialAgent.

The previously-fragmented exports (Reap, registry Subscribe, dial Subscribe) are now unexported helpers behind this single function.

func WithEntry

func WithEntry(ctx context.Context, entry *Entry) context.Context

WithEntry attaches the resolved registry entry to ctx for downstream handlers. Exposed so test code and future identity-augmenting interceptors can attach an entry without the resolved-thumbprint dance; production code never needs to call this directly (the interceptor does).

Panics on a nil entry. A typed-nil pointer survives `(*Entry)(nil)` type assertions on the way back out of EntryFromContext as `(nil, true)` — a silent identity vacuum that downstream handlers would dereference. Mirrors agent.Add's panic-on-misuse posture so the wiring bug surfaces during development.

Types

type AgentRegistered

type AgentRegistered struct {
	ContainerID string
	AgentName   string
	Project     string
	Ok          bool
	Reason      string
	At          time.Time
}

AgentRegistered fires once per container lifetime, after the CP-driven Register handshake (success or failure). Steady-state "row already exists" reconnects do NOT re-fire — query State.Agents[ID].Registered for "is this agent registered now". Ok=false also drives AgentUntrusted{ReasonRegisterFailed}.

func (AgentRegistered) ApplyTo

func (e AgentRegistered) ApplyTo(s *overseer.State)

func (AgentRegistered) EventName

func (e AgentRegistered) EventName() string

func (AgentRegistered) MarshalZerologObject

func (e AgentRegistered) MarshalZerologObject(z *zerolog.Event)

func (AgentRegistered) OccurredAt

func (e AgentRegistered) OccurredAt() time.Time

type AgentUntrusted

type AgentUntrusted struct {
	ContainerID string
	AgentName   string
	Project     string
	Reason      overseer.UntrustedReason
	Detail      string
	At          time.Time
}

AgentUntrusted fires when an identity outcome violates the trust contract (thumbprint mismatch, CN mismatch, peer-IP mismatch, Register failed). Session stays open (asymmetric trust); subscribers enact policy.

func (AgentUntrusted) ApplyTo

func (e AgentUntrusted) ApplyTo(s *overseer.State)

func (AgentUntrusted) EventName

func (e AgentUntrusted) EventName() string

func (AgentUntrusted) MarshalZerologObject

func (e AgentUntrusted) MarshalZerologObject(z *zerolog.Event)

func (AgentUntrusted) OccurredAt

func (e AgentUntrusted) OccurredAt() time.Time

type ContainerInspector

type ContainerInspector interface {
	Inspect(ctx context.Context, containerID string) (mobycontainer.InspectResponse, error)
}

ContainerInspector is the docker-side seam the Register handler uses to resolve the container_id read from the cert URI SAN. Returns the moby InspectResponse so the handler can read labels (project + agent_name cross-check) and the clawker-net IP (peer-IP verification).

Implementations: in production, the moby client (wrapped by pkg/whail). In tests, a struct with an Inspect closure.

func NewMobyContainerInspector

func NewMobyContainerInspector(cli mobyclient.APIClient) ContainerInspector

NewMobyContainerInspector wraps a moby APIClient as a ContainerInspector for the Register handler.

type ContainerLister

type ContainerLister func(ctx context.Context) ([]string, error)

ContainerLister enumerates every `purpose=agent` container ID currently known to the docker daemon. The implementation MUST include stopped/exited containers — a stopped container can be `docker start`-ed back into life, and its registry row should survive that transition. Only `docker rm` (destroy) means the container is genuinely gone and the row is orphaned.

type Dialer

type Dialer struct {
	// contains filtered or unexported fields
}

Dialer captures the CP-side material every dial needs. Construct once at CP startup; share across all agent dials.

dialing is the dedup set: containerIDs currently being dialed (or already-Session-established). Initial poll and the dockerevents subscriber both call DialAgent for the same running container; the dedup keeps the second call from spinning a duplicate goroutine against an already-open Session. Membership lasts the lifetime of the dial goroutine — after the Session closes (peer drop, ctx cancel, retry timeout), the entry is removed and a future event for the same containerID dials fresh.

func New

func New(log *logger.Logger, docker mobyclient.APIClient, bus *overseer.Overseer, agents Registry, certPath, keyPath string, caPool *x509.CertPool, initExec *Executor) (*Dialer, error)

New constructs a Dialer. Returns an error if the CP client cert / key cannot be loaded — better to fail at CP startup than to defer the failure to the first dial.

bus is required: the dialer publishes typed Session* events (SessionConnecting / Connected / Failed / Broken) so other CP components can subscribe to connection lifecycle without coupling to the dialer directly. Pass a real *overseer.Overseer; tests can use an in-memory bus (it's cheap).

agents is required: every successful dial cross-checks the peer cert against the registry row keyed by container_id and dispatches the typed AgentRegistered / AgentUntrusted events accordingly. nil agents would strand worldview consumers without a registration signal.

func (*Dialer) DialAgent

func (d *Dialer) DialAgent(ctx context.Context, containerID string)

DialAgent opens a Session stream to the clawkerd listener inside the given agent container, sends Hello, awaits HelloAck, and holds the stream open until ctx cancels or the peer closes. Returns immediately — the dial + lifetime drain run on a background goroutine. All failures are logged.

Dedup: a no-op if a dial for the same containerID is already in flight. Initial poll + dockerevents subscriber both reach this function with overlapping IDs at CP startup; dedup keeps a second call from spinning a duplicate goroutine.

Retry: the dial goroutine retries connection establishment with exponential backoff + full jitter (cap connectMaxBackoff) until either the Session is established (Hello + HelloAck) or connectTotalTimeout elapses. Once established, no retry from the drain — a stream break ends the goroutine and removes the dedup entry, so a subsequent restart event re-dials.

type Entry

type Entry struct {
	// AgentName is the user-typed short name (e.g. "dev"); composed with
	// Project at Add time into the canonical CN that Lookup compares
	// against the peer cert's Subject.CommonName.
	AgentName string
	// Project is the clawker project slug under which the agent
	// registered. Empty string is allowed and matches the unscoped
	// 2-segment naming case (docker.ContainerName).
	Project      string
	ContainerID  string
	Thumbprint   [sha256.Size]byte
	RegisteredAt time.Time
	LastSeen     time.Time
}

Entry is one registered agent. Created CP-side at Register handler entry carrying the ContainerID, AgentName, Project, and the SHA-256 over the peer cert DER (Thumbprint, captured live from the mTLS handshake). LastSeen currently equals RegisteredAt; future per-agent RPCs will refresh LastSeen at their own boundary.

func EntryFromContext

func EntryFromContext(ctx context.Context) (*Entry, bool)

EntryFromContext returns the registry entry IdentityInterceptor attached to ctx. ok=false means the RPC is on the opt-out list (the handler verifies identity itself) or the interceptor was bypassed in a test that didn't set up identity wiring. Defensive against typed- nil context values: a nil entry returns ok=false even if the type assertion technically succeeds, so handlers can treat ok=true as "non-nil entry available".

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor dispatches the static CP-driven init plan against an open Session stream. Owns Recv during Run; the dialer's drainStream takes over after Run returns.

Plan idempotency contract: every Session establish runs the full plan, including reconnects after CP restart. Each step is idempotent (file-presence gates, append-if-missing, marker-file post-init), and AgentReady is no-op success when clawkerd already spawned the user CMD (spawnState's CAS rejects re-fork; handler replies Done{0}). Trade: a small volume of shell commands fires on every reconnect; gain: no per-container completed flag. Executor is shared across all containers — the dialer constructs one at CP boot and calls Run from a goroutine per DialAgent (one per agent container). Run holds no Executor-scoped mutable state: every call gets its own (ctx, stream, target) and drives its own stream's Recv loop in a single goroutine. The Run-owns-Recv invariant is per-stream (one Run, one stream, one Recv-driving goroutine), not per-Executor — concurrent Runs across different containers must not be serialized.

func NewExecutor

func NewExecutor(bus *overseer.Overseer, log *logger.Logger) (*Executor, error)

NewExecutor constructs an Executor. nil log is replaced with logger.Nop(). bus is required — Run publishes init events unconditionally, so a nil bus would NPE deep inside overseer.Publish on the first event dispatch. Returning an error lets the caller (cmd/clawker-cp/main.go) log the wiring bug to the structured log surface and degrade gracefully (initExec = nil → dialer logs agent_init_executor_unset per dial) instead of crashing CP and stranding the failure on os.Stderr where only `docker logs` sees it. Matches the nil-bus contract on agent.New for the dialer.

func (*Executor) Run

func (e *Executor) Run(ctx context.Context, stream clawkerdv1.ClawkerdService_SessionClient, target InitTarget) (runErr error)

Run dispatches the init plan one step at a time, awaiting Done or Error per command before sending the next. Publishes init events throughout. Returns the error that halted the run (transport failure, step failure surfaced as a Go error), or nil on full success.

Caller invariant: stream must already have completed Hello/HelloAck and any Register handshake. Run owns stream.Recv exclusively for its duration — but per-stream, not per-Executor. Concurrent Runs across different streams (the prod case: parallel agent containers after a CP restart) execute in parallel.

type Handler

type Handler struct {
	agentv1.UnimplementedAgentServiceServer
	// contains filtered or unexported fields
}

Handler serves the Register RPC on the CP's clawker-net agent listener. It captures the live mTLS peer's cert thumbprint, reads the container_id from the cert URI SAN (urn:clawker:container:<id>), cross-checks the request's identity claims against (a) the cert's canonical CN, (b) the docker container's labels, and (c) the peer IP versus the container's clawker-net IP, then writes the row.

All rejection paths return codes.PermissionDenied with a generic envelope. The structured log line carries the specific failure classification for operator triage.

func NewHandler

func NewHandler(registry Registry, inspector ContainerInspector, log *logger.Logger) (*Handler, error)

NewHandler constructs a Register handler. registry is the CP-owned agentregistry; inspector resolves docker containers; log defaults to logger.Nop() when nil. clock defaults to time.Now.

registry and inspector MUST be non-nil — either being nil is a wiring bug that would NPE on the first Register call. Reject at construction so the failure surfaces at CP startup rather than at the first agent boot.

func (*Handler) Register

func (h *Handler) Register(ctx context.Context, req *agentv1.RegisterRequest) (*agentv1.Welcome, error)

Register is the one-time-per-container handshake CP triggers via a RegisterRequired Command on the Session bidi stream. clawkerd presents a CLI-CA-signed cert with a URI SAN binding it to a specific container_id and calls Register; CP captures the thumbprint at handler entry and writes the agentregistry row.

Rejection paths (all return PermissionDenied, with the classification logged at Warn for operator triage):

  • peer.FromContext yields no TLS auth info or no certs (would only happen on a misconfigured listener)
  • cert URI SAN missing or malformed
  • cert CN does not equal canonical CN derived from request fields
  • request agent_name or project malformed
  • container_id from SAN is unknown to docker, or container labels don't match the request
  • peer IP doesn't match the container's clawker-net IP
  • existing row for this container_id has a different thumbprint (cert replay; CLI is the only legit cert source)

Idempotent retry: an existing row whose thumbprint matches the captured thumbprint returns Welcome silently — Session retries after the row was already written.

type InitCompleted

type InitCompleted struct {
	ContainerID string
	AgentName   string
	Project     string
	Duration    time.Duration
	At          time.Time
}

InitCompleted is the terminal success event for one init phase. Subscribers waiting for "agent ready to serve user work" should listen for this rather than SessionConnected — Session is connected long before init finishes.

func (InitCompleted) ApplyTo

func (e InitCompleted) ApplyTo(s *overseer.State)

func (InitCompleted) EventName

func (e InitCompleted) EventName() string

func (InitCompleted) MarshalZerologObject

func (e InitCompleted) MarshalZerologObject(z *zerolog.Event)

func (InitCompleted) OccurredAt

func (e InitCompleted) OccurredAt() time.Time

type InitFailed

type InitFailed struct {
	ContainerID string
	AgentName   string
	Project     string
	FailedStep  string
	Reason      overseer.InitFailureReason
	Detail      string
	Duration    time.Duration
	At          time.Time
}

InitFailed is the terminal failure event for one init phase. Carries the typed Reason classification and the human-readable Detail so subscribers can surface the proximate cause without re-deriving from the step event stream.

func (InitFailed) ApplyTo

func (e InitFailed) ApplyTo(s *overseer.State)

func (InitFailed) EventName

func (e InitFailed) EventName() string

func (InitFailed) MarshalZerologObject

func (e InitFailed) MarshalZerologObject(z *zerolog.Event)

func (InitFailed) OccurredAt

func (e InitFailed) OccurredAt() time.Time

type InitStarted

type InitStarted struct {
	ContainerID string
	AgentName   string
	Project     string
	StepCount   int
	At          time.Time
}

InitStarted fires when CP begins running the init plan against an established Session. Re-published on every Session reconnect that re-runs the plan. StepCount lets streaming subscribers render "1 of N" progress without re-deriving plan length.

func (InitStarted) ApplyTo

func (e InitStarted) ApplyTo(s *overseer.State)

func (InitStarted) EventName

func (e InitStarted) EventName() string

func (InitStarted) MarshalZerologObject

func (e InitStarted) MarshalZerologObject(z *zerolog.Event)

func (InitStarted) OccurredAt

func (e InitStarted) OccurredAt() time.Time

type InitStepCompleted

type InitStepCompleted struct {
	ContainerID string
	AgentName   string
	Project     string
	StepName    string
	StepIndex   int
	Duration    time.Duration
	ExitCode    int32
	At          time.Time
}

InitStepCompleted fires when a step's ShellCommand returns Done with exit_code == 0.

func (InitStepCompleted) ApplyTo

func (e InitStepCompleted) ApplyTo(s *overseer.State)

func (InitStepCompleted) EventName

func (e InitStepCompleted) EventName() string

func (InitStepCompleted) MarshalZerologObject

func (e InitStepCompleted) MarshalZerologObject(z *zerolog.Event)

func (InitStepCompleted) OccurredAt

func (e InitStepCompleted) OccurredAt() time.Time

type InitStepFailed

type InitStepFailed struct {
	ContainerID string
	AgentName   string
	Project     string
	StepName    string
	StepIndex   int
	Duration    time.Duration
	ExitCode    int32
	Reason      overseer.InitFailureReason
	Detail      string
	At          time.Time
}

InitStepFailed fires when a step terminates non-zero, errors, or times out. Reason is the typed classification subscribers branch on; Detail is the human-readable diagnostic (formatted ErrorCode + message + truncated stderr). The Executor halts the plan on this event and publishes a terminal InitFailed.

func (InitStepFailed) ApplyTo

func (e InitStepFailed) ApplyTo(s *overseer.State)

func (InitStepFailed) EventName

func (e InitStepFailed) EventName() string

func (InitStepFailed) MarshalZerologObject

func (e InitStepFailed) MarshalZerologObject(z *zerolog.Event)

func (InitStepFailed) OccurredAt

func (e InitStepFailed) OccurredAt() time.Time

type InitStepStarted

type InitStepStarted struct {
	ContainerID string
	AgentName   string
	Project     string
	StepName    string
	StepIndex   int
	StepCount   int
	At          time.Time
}

InitStepStarted fires when the Executor dispatches one step's ShellCommand. StepName is the wire-contract vocabulary subscribers match against ("config", "git", "ssh", "post-init", "agent-ready").

func (InitStepStarted) ApplyTo

func (e InitStepStarted) ApplyTo(s *overseer.State)

func (InitStepStarted) EventName

func (e InitStepStarted) EventName() string

func (InitStepStarted) MarshalZerologObject

func (e InitStepStarted) MarshalZerologObject(z *zerolog.Event)

func (InitStepStarted) OccurredAt

func (e InitStepStarted) OccurredAt() time.Time

type InitTarget

type InitTarget struct {
	ContainerID string
	AgentName   string
	Project     string
}

InitTarget identifies the agent the Executor is initializing. Threaded through every init event so subscribers see consistent identity fields without re-deriving from the registry.

type ReapDegraded

type ReapDegraded struct {
	Reason string
	At     time.Time
}

ReapDegraded fires when CP's startup reap of orphan registry rows fails. Rows for containers destroyed while CP was down may persist as ghosts until a successful future reap. Pure informational event (no ApplyTo).

func (ReapDegraded) EventName

func (e ReapDegraded) EventName() string

func (ReapDegraded) MarshalZerologObject

func (e ReapDegraded) MarshalZerologObject(z *zerolog.Event)

func (ReapDegraded) OccurredAt

func (e ReapDegraded) OccurredAt() time.Time

type Registry

type Registry interface {
	// Add inserts an entry keyed by (Entry.Thumbprint, Entry.ContainerID).
	// Container restart produces a new cert and a new thumbprint, so
	// re-registration creates a new entry; the dockerevents subscription
	// is responsible for evicting the stale one by container ID.
	//
	// Add computes the canonical agent CN from Entry.Project +
	// Entry.AgentName via auth.NewProjectSlug / auth.NewAgentName (the
	// err-returning typed constructors). Malformed identity strings
	// surface as a returned error, NOT as a panic — historical
	// MustProjectSlug / MustAgentName at Lookup time was a CP-wide
	// crash vector. Callers must validate at the wire boundary.
	//
	// Returns an error when the canonical-CN composition rejects the
	// inputs OR when the persistence layer (sqlite) rejects the write —
	// disk full, schema corruption, UNIQUE collision against a stale
	// row that hasn't been evicted yet. Callers translate the error
	// into the appropriate gRPC status.
	//
	// Add panics on programming-error invariants (zero thumbprint,
	// empty ContainerID, zero RegisteredAt). Empty AgentName is treated
	// as user-input violation and surfaces as an error from
	// auth.NewAgentName.
	Add(entry Entry) error
	// Lookup retrieves an entry by cert thumbprint and verifies that the
	// supplied peer cert CN matches the entry's pre-computed canonical
	// CN with subtle.ConstantTimeCompare. Mismatch on thumbprint OR CN
	// returns ErrUnknownAgent.
	Lookup(thumbprint [sha256.Size]byte, cn string) (*Entry, error)
	// LookupByContainerID returns the entry whose ContainerID matches,
	// without any CN cross-check. Used by the dialer at Hello time to
	// drive registry classification (Match / Miss / ThumbprintMismatch
	// / CNMismatch) and decide whether to send RegisterRequired or
	// publish AgentUntrusted. The dialer performs the thumbprint + CN
	// comparisons against the returned entry itself; this read
	// intentionally does not gate on either so a mismatch surfaces as
	// a typed local outcome rather than as ErrUnknownAgent.
	//
	// Returns (nil, ErrUnknownAgent) when no entry matches.
	LookupByContainerID(containerID string) (*Entry, error)
	// EvictByContainerID removes any entry whose ContainerID matches.
	// Returns the underlying persistence error so callers can decide
	// whether to retry, log, or abort. The dockerevents-driven and
	// reaper-driven callers log-and-proceed because a transient sqlite
	// failure must not stall the eviction pipeline; CLI-side `clawker
	// container remove` similarly logs at debug since registry hiccups
	// must not surface as remove failures (the row gets pruned later
	// by the dockerevents subscription).
	EvictByContainerID(containerID string) error
	// Snapshot returns a copy of every live entry, sorted by
	// (Project, AgentName) for deterministic output. Project is the
	// primary sort key because the same short AgentName can be reused
	// across different projects (the composite identity is
	// (project, agent)). Used by AdminService.ListAgents and the
	// `clawker controlplane agents` CLI; both rely on stable ordering
	// for diffability.
	Snapshot() []Entry
}

Registry is the consumer-facing contract.

func NewRegistry

func NewRegistry(log *logger.Logger) Registry

NewRegistry constructs an empty in-memory registry. Logger is required (use logger.Nop() in tests) so audit-trail messages on Add and Evict are captured even when production logging is otherwise disabled.

func NewSQLiteWriter

func NewSQLiteWriter(dbPath string, log *logger.Logger) (Registry, error)

NewSQLiteWriter opens (or creates) the sqlite database at dbPath, applies the schema, and returns a registry suitable for the writer process. CP is the SOLE writer in this design (the CLI no longer opens the registry DB), so a separate reader-mode constructor is not provided — every consumer that previously read from sqlite now goes through `f.AdminClient(ctx).ListAgents`.

The parent directory of dbPath must already exist.

type SessionBroken

type SessionBroken struct {
	ContainerID string
	AgentName   string
	Project     string
	Address     string
	Reason      string
	At          time.Time
}

SessionBroken fires when an established Session terminates. Reason classifies the cause (peer EOF, transport break, error string). Not published on intentional teardown (CP shutdown / ctx cancel) — see runDial for the suppression rationale.

func (SessionBroken) ApplyTo

func (e SessionBroken) ApplyTo(s *overseer.State)

func (SessionBroken) EventName

func (e SessionBroken) EventName() string

func (SessionBroken) MarshalZerologObject

func (e SessionBroken) MarshalZerologObject(z *zerolog.Event)

func (SessionBroken) OccurredAt

func (e SessionBroken) OccurredAt() time.Time

type SessionConnected

type SessionConnected struct {
	ContainerID    string
	AgentName      string
	Project        string
	Address        string
	Attempts       int
	PeerCN         string
	PeerThumbprint [sha256.Size]byte
	At             time.Time
}

SessionConnected fires when a Session establishes (mTLS dial + Hello handshake completes). Identity fields (PeerCN, PeerThumbprint) ride alongside the transport fields so subscribers and the worldview have everything in one event — Provenance struct is retired, AgentUntrusted/AgentRegistered carry the policy outcomes.

func (SessionConnected) ApplyTo

func (e SessionConnected) ApplyTo(s *overseer.State)

func (SessionConnected) EventName

func (e SessionConnected) EventName() string

func (SessionConnected) MarshalZerologObject

func (e SessionConnected) MarshalZerologObject(z *zerolog.Event)

func (SessionConnected) OccurredAt

func (e SessionConnected) OccurredAt() time.Time

type SessionConnecting

type SessionConnecting struct {
	ContainerID string
	AgentName   string
	Project     string
	Address     string
	At          time.Time
}

SessionConnecting fires when a dial attempt starts (first successful inspect of the container in a cycle). Carries the agent identity labels and the address being dialed.

func (SessionConnecting) ApplyTo

func (e SessionConnecting) ApplyTo(s *overseer.State)

func (SessionConnecting) EventName

func (e SessionConnecting) EventName() string

func (SessionConnecting) MarshalZerologObject

func (e SessionConnecting) MarshalZerologObject(z *zerolog.Event)

func (SessionConnecting) OccurredAt

func (e SessionConnecting) OccurredAt() time.Time

type SessionFailed

type SessionFailed struct {
	ContainerID string
	AgentName   string
	Project     string
	Address     string
	Reason      string
	Attempts    int
	At          time.Time
}

SessionFailed fires when the retry budget for a dial cycle exhausts before any attempt established a Session. Reason carries a short classification ("connect_total_timeout", "container_not_running"); the underlying dial error is in the log line, not on the event.

func (SessionFailed) ApplyTo

func (e SessionFailed) ApplyTo(s *overseer.State)

func (SessionFailed) EventName

func (e SessionFailed) EventName() string

func (SessionFailed) MarshalZerologObject

func (e SessionFailed) MarshalZerologObject(z *zerolog.Event)

func (SessionFailed) OccurredAt

func (e SessionFailed) OccurredAt() time.Time

type StartDeps

type StartDeps struct {
	Registry     Registry
	DockerLister ContainerLister
	Dialer       *Dialer
	Bus          *overseer.Overseer
	Log          *logger.Logger
}

StartDeps bundles the dependencies the umbrella `Start` procedure needs. Bus + Registry + Dialer + Docker lister are all owned by the CP startup path; passing them as a single struct keeps the call site in cmd/clawker-cp/main.go a single function call.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL