broker

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2026 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Overview

Package broker is the runtime for the paddock-broker Deployment. The broker holds upstream credentials (API keys, GitHub App private keys, PAT pools), validates caller identity via TokenReview, and issues per-run values through pluggable providers. See ADR-0012 and spec 0002 §6.

Index

Constants

View Source
const (
	ControllerSystemNamespace = "paddock-system"
	ControllerServiceAccount  = "paddock-controller-manager"
)

ControllerSystemNamespace is where the controller-manager lives. Callers from this (namespace, ServiceAccount) tuple are granted cross-namespace broker access; every other caller is scoped to its own namespace.

View Source
const TokenAudience = "paddock-broker"

TokenAudience is the audience claim the broker requires on every caller token. Configured on the run pod's ProjectedServiceAccountToken volume and on the controller's token source. Keeps broker tokens from being usable for anything else in the cluster.

Variables

View Source
var ErrUnauthenticated = errors.New("unauthenticated")

ErrUnauthenticated is returned for any token that fails validation.

Functions

func ReconstructLeases

func ReconstructLeases(ctx context.Context, c client.Client, reg *providers.Registry) error

ReconstructLeases re-acquires PATPool slot reservations from HarnessRun.status.issuedLeases at broker startup. Closes the F-14 "same PAT to two runs after broker restart" hazard without persisting bearer bytes on HarnessRun.status — bearer maps are deliberately not reconstructed; an old run's bearer fails closed, the controller drives a fresh Issue, and the fresh Issue picks a different slot from the reserved one.

Non-fatal: any per-lease error is logged + counted; the broker continues serving.

Types

type AdapterResolver

type AdapterResolver func(ctx context.Context, namespace, runName string) (string, error)

AdapterResolver looks up the loopback HTTP address (host:port, no scheme) of the adapter sidecar for the given run. Wired to a controller-runtime cache lookup in Task 11.

type AuditWriter

type AuditWriter struct {
	// Client is retained for backwards-compat: code that builds an
	// AuditWriter without a Sink defaults to a KubeSink wrapping it.
	Client client.Client
	// Sink, when non-nil, is the actual write target. Construct with
	// auditing.KubeSink{Client: c, Component: "broker"} or NoopSink.
	Sink auditing.Sink
}

AuditWriter retains the v0.3 broker-local API but delegates to the shared auditing.Sink. New broker code should consume an auditing.Sink directly via Server.Sink; AuditWriter is kept so that bootstrap code outside cmd/broker that constructs Server with a Client (e.g. tests) can keep working until callers migrate.

func NewAuditWriter

func NewAuditWriter(sink auditing.Sink) *AuditWriter

NewAuditWriter is the documented constructor. Use it instead of AuditWriter{...} literals so misconfiguration (no Sink, no Client) surfaces at construction time rather than as a write-time NPE.

The AuditWriter shim is intended for removal once all broker call sites consume auditing.Sink directly via Server.Sink. Adding the constructor here is the first step in that migration; the actual removal is a follow-up tracked in the engineering review's B-11 mini-card.

func (*AuditWriter) CredentialDenied

func (w *AuditWriter) CredentialDenied(ctx context.Context, e CredentialAudit) error

CredentialDenied records a failed Issue.

func (*AuditWriter) CredentialIssued

func (w *AuditWriter) CredentialIssued(ctx context.Context, e CredentialAudit) error

CredentialIssued records a successful Issue.

func (*AuditWriter) CredentialRenewalFailed

func (w *AuditWriter) CredentialRenewalFailed(ctx context.Context, namespace, runName, provider, leaseID string, err error)

CredentialRenewalFailed emits an audit event for a renewal failure.

func (*AuditWriter) CredentialRenewed

func (w *AuditWriter) CredentialRenewed(ctx context.Context, namespace, runName, provider, leaseID string, expiresAt time.Time)

CredentialRenewed emits an audit event for a successful renewal.

func (*AuditWriter) CredentialRevoked

func (w *AuditWriter) CredentialRevoked(ctx context.Context, e CredentialAudit) error

CredentialRevoked records a successful Revoke.

func (*AuditWriter) Write

Write is a thin shim that lets handlers emit pre-built AuditEvents (constructed via auditing.New* builders) through the same Sink as the older typed helpers. New broker code should consume an auditing.Sink directly via Server.Sink once the migration tracked in the B-11 mini-card lands; until then, this shim keeps callers off the non-exported sink() helper.

type Authenticator

type Authenticator struct {
	// Client is a kubernetes.Interface. Used to POST TokenReviews.
	Client kubernetes.Interface
}

Authenticator validates Bearer tokens via the K8s TokenReview API.

func (*Authenticator) Authenticate

func (a *Authenticator) Authenticate(ctx context.Context, bearer string) (CallerIdentity, error)

Authenticate validates the bearer token's audience and authenticity, and returns the caller's identity.

type CallerIdentity

type CallerIdentity struct {
	// Namespace is the ServiceAccount's namespace.
	Namespace string

	// ServiceAccount is the SA name.
	ServiceAccount string

	// IsController reports whether the caller is the paddock
	// controller-manager running in paddock-system. These callers are
	// permitted to ask about HarnessRuns in any namespace; other
	// callers are confined to their own namespace.
	IsController bool
}

CallerIdentity is the result of validating a caller's bearer token.

type CredentialAudit

type CredentialAudit struct {
	RunName        string
	Namespace      string
	CredentialName string
	Provider       string
	MatchedPolicy  string
	Reason         string
	When           time.Time
}

CredentialAudit is the emitter-side shape for a credential decision.

type InteractiveRouter

type InteractiveRouter struct {
	// contains filtered or unexported fields
}

InteractiveRouter holds per-(ns, run) in-memory state (attach counter, turn sequence) and reverse-proxies POST /prompts, /interrupt, /end to the adapter's loopback HTTP server.

func NewInteractiveRouter

func NewInteractiveRouter(resolver AdapterResolver) *InteractiveRouter

NewInteractiveRouter returns an InteractiveRouter that uses the provided resolver to locate adapter sidecars.

func (*InteractiveRouter) AttachedCount

func (r *InteractiveRouter) AttachedCount(namespace, runName string) int32

AttachedCount returns the current number of attached clients for (ns, name).

func (*InteractiveRouter) ForgetRun

func (r *InteractiveRouter) ForgetRun(namespace, runName string)

ForgetRun drops the cached state for (ns, name) on terminal run transitions.

func (*InteractiveRouter) ForwardEnd

func (r *InteractiveRouter) ForwardEnd(ctx context.Context, w http.ResponseWriter, req *http.Request, namespace, runName string)

ForwardEnd reverse-proxies a POST /end request to the adapter.

func (*InteractiveRouter) ForwardInterrupt

func (r *InteractiveRouter) ForwardInterrupt(ctx context.Context, w http.ResponseWriter, req *http.Request, namespace, runName string)

ForwardInterrupt reverse-proxies a POST /interrupt request to the adapter.

func (*InteractiveRouter) ForwardPrompt

func (r *InteractiveRouter) ForwardPrompt(ctx context.Context, w http.ResponseWriter, req *http.Request, namespace, runName string)

ForwardPrompt reverse-proxies a POST /prompts request to the adapter.

func (*InteractiveRouter) ForwardPromptWithBody

func (r *InteractiveRouter) ForwardPromptWithBody(ctx context.Context, w http.ResponseWriter, req *http.Request, namespace, runName string, body []byte)

ForwardPromptWithBody is the variant the broker uses when it has already constructed the upstream body (handlePrompts repacks {text, seq, submitter} for the adapter). The supplied body replaces req.Body for the upstream request.

func (*InteractiveRouter) NextTurnSeq

func (r *InteractiveRouter) NextTurnSeq(namespace, runName string) int32

NextTurnSeq returns the next monotonically-increasing turn sequence number for (ns, name), starting at 1.

func (*InteractiveRouter) OnAttach

func (r *InteractiveRouter) OnAttach(namespace, runName string)

OnAttach increments the count of clients attached to (ns, name).

func (*InteractiveRouter) OnDetach

func (r *InteractiveRouter) OnDetach(namespace, runName string)

OnDetach decrements the count of clients attached to (ns, name). Clamps at zero to defend against an unpaired Detach (e.g., a deferred cleanup running on a path where Attach never succeeded). A negative count would silently break the watchdog's idle-shutdown logic.

func (*InteractiveRouter) ResolveAdapter

func (r *InteractiveRouter) ResolveAdapter(ctx context.Context, namespace, runName string) (string, error)

ResolveAdapter returns the adapter sidecar's loopback address (host:port, no scheme) for the given run. Exposed so the broker's WebSocket handlers (which can't go through forward) can dial the adapter directly while keeping the resolver field private.

type RenewalWalker

type RenewalWalker struct {
	// contains filtered or unexported fields
}

RenewalWalker iterates a run's IssuedLeases and calls Renew on any provider that supports RenewableProvider when the lease's ExpiresAt is within a configurable window. Failure is non-fatal: the existing lease is preserved and an audit event is emitted.

func NewRenewalWalker

func NewRenewalWalker(registry map[string]providers.Provider, window time.Duration, audit *AuditWriter) *RenewalWalker

NewRenewalWalker constructs a RenewalWalker. audit may be nil to suppress audit emission (useful in tests that don't care about audit).

func (*RenewalWalker) WalkAndRenew

func (w *RenewalWalker) WalkAndRenew(ctx context.Context, namespace, runName string, leases []paddockv1alpha1.IssuedLease) ([]paddockv1alpha1.IssuedLease, error)

WalkAndRenew returns a copy of leases with ExpiresAt updated for any lease whose provider successfully renewed it. The original slice is not modified. Errors from individual providers are logged and recorded as audit events but do not cause WalkAndRenew to return an error.

Concurrency: callers must serialize calls per (namespace, runName). Two concurrent walkers for the same run can race-renew the same lease, doubling upstream API calls and creating a brief window where the in-memory token cache reflects whichever provider Renew won the lock. Task 11's prompt handler is the single intended caller and serializes per-run via the broker's interactiveRouter; do not add a second caller without revisiting this contract.

Slice safety: the returned slice is a fresh slice header (independent of the input), but `*metav1.Time` pointers inside it may alias the input's pointers. WalkAndRenew only ever assigns a *new* pointer to out[i].ExpiresAt (never mutates *out[i].ExpiresAt), so no aliased mutation reaches the caller's input. Future maintainers: do not write `*out[i].ExpiresAt = ...` without first cloning.

type RunLimiterRegistry

type RunLimiterRegistry struct {
	// contains filtered or unexported fields
}

RunLimiterRegistry holds per-(namespace, runName) token buckets used to bound how fast a single run can hit /v1/issue and /v1/substitute-auth. Sized for the proxy-per-connection substitute path; well above any legitimate workload.

func NewRunLimiterRegistry

func NewRunLimiterRegistry() *RunLimiterRegistry

NewRunLimiterRegistry returns a registry seeded with default rates.

func (*RunLimiterRegistry) Allow

func (r *RunLimiterRegistry) Allow(namespace, run, kind string) bool

Allow consumes one token from the named bucket for (namespace, run). kind is "issue" or "substitute". Returns true on admit; false on quota exhaustion. The caller is responsible for emitting an AuditEvent (Phase 2c fail-closed-on-audit-failure) before returning the 429.

func (*RunLimiterRegistry) Sweep

func (r *RunLimiterRegistry) Sweep(now time.Time)

Sweep drops entries untouched for >limiterIdleTTL. Call periodically from the broker's main goroutine.

type Server

type Server struct {
	// Client reads HarnessRuns, templates, BrokerPolicies, and
	// provider-backing Secrets.
	Client client.Client

	// Auth validates caller Bearer tokens.
	Auth TokenValidator

	// Providers holds every registered provider by Name().
	Providers *providers.Registry

	// Audit writes AuditEvents for every decision.
	Audit *AuditWriter

	// RunLimiter, when non-nil, gates /v1/issue and /v1/substitute-auth
	// against per-(namespace, run) token buckets. Tests may leave this
	// nil to bypass rate limiting; production wires it via cmd/broker.
	RunLimiter *RunLimiterRegistry

	// Router is required for /prompts, /interrupt, /end. May be nil at
	// construction time and wired by cmd/broker once the resolver is
	// available; handlers reject with 503 when nil.
	Router *InteractiveRouter

	// Renewer drives lazy credential renewal during /prompts. May be nil
	// in tests; the prompts handler skips renewal when unset.
	Renewer *RenewalWalker

	// RestConfig is the rest.Config the broker initialised at startup.
	// Used by the /shell handler to dial the K8s pods/exec subresource
	// via SPDY. May be nil in tests that don't exercise /shell;
	// handleShell returns 503 NotConfigured when nil.
	RestConfig *rest.Config
}

Server is the HTTP handler set for the broker. Register it on a net/http.Server configured for mTLS on :8443.

func (*Server) Register

func (s *Server) Register(mux *http.ServeMux)

Register installs the broker's handlers on the given mux.

type TokenValidator

type TokenValidator interface {
	Authenticate(ctx context.Context, bearer string) (CallerIdentity, error)
}

TokenValidator abstracts caller authentication so tests can supply a fake. Production wires Authenticator (TokenReview-backed).

Directories

Path Synopsis
Package api defines the wire shape of the broker's HTTP/JSON API.
Package api defines the wire shape of the broker's HTTP/JSON API.
Package providers implements the broker's pluggable credential backends.
Package providers implements the broker's pluggable credential backends.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL