Documentation
¶
Overview ¶
Package extproc implements an Envoy ExternalProcessor (ExtProc) gRPC server.
Architecture:
Client → Envoy :8092 → ExtProc gRPC (this server)
↳ Authenticate API key
↳ Extract Sandbox ID (header or URL path)
↳ Resolve sandbox Pod IP via Kubernetes
↳ Inject "x-envoy-original-dst-host: <PodIP>:<Port>"
↳ Rewrite the :path header if needed
→ Envoy ORIGINAL_DST cluster → Sandbox Pod FastAPI
Index ¶
- Variables
- func AdminKeyUnaryInterceptor(mgr *apikey.AdminKeyManager) grpc.UnaryServerInterceptor
- type ActivityTracker
- type InternalGRPCServer
- func (s *InternalGRPCServer) EvictRoute(_ context.Context, req *ctrlplanev1.EvictRouteRequest) (*ctrlplanev1.EvictRouteResponse, error)
- func (s *InternalGRPCServer) GetLastActive(_ context.Context, _ *ctrlplanev1.GetLastActiveRequest) (*ctrlplanev1.GetLastActiveResponse, error)
- func (s *InternalGRPCServer) PushRoute(_ context.Context, req *ctrlplanev1.PushRouteRequest) (*ctrlplanev1.PushRouteResponse, error)
- type K8sSandboxRouter
- type RouteCache
- type RouteEntry
- type RouteTarget
- type SandboxRoute
- type SandboxRouter
- type Server
- type ServerConfig
Constants ¶
This section is empty.
Variables ¶
var ErrSandboxRouteBadGateway = errors.New("sandbox is not in a healthy state")
ErrSandboxRouteBadGateway signals that the router knows the sandbox exists but cannot route to it right now. Mapped to HTTP 502, retryable. Any Pod state that is not "Running or Stopping + matching sandbox-id label + PodIP set" produces this error, so it also covers informer lag, label race windows, Starting (container swap in progress), Idle/Failed phases, and missing PodIP.
var ErrSandboxRouteNotFound = errors.New("sandbox route not found")
ErrSandboxRouteNotFound signals that the sandbox is unknown to the router: neither the cache nor (when fallback is enabled) the sandbox-id informer index returned any mapping. Mapped to HTTP 404 by the ExtProc server. Callers receiving this should treat the sandbox ID as definitively absent (never existed, or evicted long enough ago that every trace is gone).
Functions ¶
func AdminKeyUnaryInterceptor ¶
func AdminKeyUnaryInterceptor(mgr *apikey.AdminKeyManager) grpc.UnaryServerInterceptor
AdminKeyUnaryInterceptor returns a grpc.UnaryServerInterceptor that rejects any RPC not carrying a valid admin key in the `authorization: Bearer <key>` metadata. The caller should only register this interceptor when mgr is non-nil; passing nil yields an interceptor that rejects every request.
Types ¶
type ActivityTracker ¶
type ActivityTracker struct {
// contains filtered or unexported fields
}
ActivityTracker is a lightweight in-memory store that records the last time a proxied HTTP request was observed for each sandbox. It is intentionally stateless with respect to Kubernetes — all K8s reads/writes are handled by IdleTimeoutReconciler in the agentbox controller manager.
Lifecycle:
- On startup, InitFromAnnotations is called for each Running pod that has a last-active annotation so that ExtProc restart does not reset history.
- On every proxied request, Touch is called (non-blocking, O(1)).
- IdleTimeoutReconciler polls LastActiveHandler to snapshot the map, writes the values back as pod annotations, and decides whether to release pods.
func NewActivityTracker ¶
func NewActivityTracker() *ActivityTracker
NewActivityTracker creates an ActivityTracker with an empty state and no background GC. This is the zero-config constructor.
func NewActivityTrackerWithGC ¶
func NewActivityTrackerWithGC(c client.Client, gcInterval time.Duration) *ActivityTracker
NewActivityTrackerWithGC creates an ActivityTracker with a background GC goroutine that periodically removes entries for sandboxes that are no longer in the Running state. gcInterval controls the GC frequency; 5–10 minutes is recommended for production use.
func (*ActivityTracker) InitFromAnnotations ¶
func (t *ActivityTracker) InitFromAnnotations(sandboxID string, ts time.Time)
InitFromAnnotations seeds the in-memory map with a timestamp from a pod annotation. Call this once per Running pod at startup before serving traffic. If ts is the zero value it is ignored.
func (*ActivityTracker) LastActiveHandler ¶
func (t *ActivityTracker) LastActiveHandler() http.HandlerFunc
LastActiveHandler returns an HTTP handler that serves a JSON object mapping sandboxID → RFC3339 timestamp for the IdleTimeoutReconciler to poll.
Response format:
{ "sandboxId1": "2026-03-17T10:00:00Z", ... }
func (*ActivityTracker) Remove ¶
func (t *ActivityTracker) Remove(sandboxID string)
Remove deletes a sandbox entry from the tracker. Called when a pod is no longer Running so its entry does not accumulate indefinitely.
func (*ActivityTracker) StartGC ¶
func (t *ActivityTracker) StartGC(ctx context.Context)
StartGC launches the background GC goroutine. It is a no-op when no client or gcInterval was provided. Call this only after the K8s cache is warm so that the first GC cycle does not spuriously evict entries.
func (*ActivityTracker) Touch ¶
func (t *ActivityTracker) Touch(sandboxID string)
Touch records the current time as the last-active timestamp for sandboxID. It is safe to call from multiple goroutines and returns immediately.
type InternalGRPCServer ¶
type InternalGRPCServer struct {
ctrlplanev1.UnimplementedControlPlaneServiceServer
Cache *RouteCache
Tracker *ActivityTracker
}
InternalGRPCServer implements the ControlPlaneService gRPC contract for the Controller → ExtProc control channel. It wraps the in-memory RouteCache (for PushRoute) and the ActivityTracker (for GetLastActive).
func NewInternalGRPCServer ¶
func NewInternalGRPCServer(cache *RouteCache, tracker *ActivityTracker) *InternalGRPCServer
NewInternalGRPCServer constructs the server. Both dependencies may not be nil.
func (*InternalGRPCServer) EvictRoute ¶
func (s *InternalGRPCServer) EvictRoute(_ context.Context, req *ctrlplanev1.EvictRouteRequest) (*ctrlplanev1.EvictRouteResponse, error)
EvictRoute removes a sandbox_id from the cache. Invoked by the Controller when a Pod completes Stopping → Idle, so subsequent router queries for the released sandbox_id immediately fall through to the informer fallback path (if enabled) or return NotFound, rather than pointing to a stale Pod.
func (*InternalGRPCServer) GetLastActive ¶
func (s *InternalGRPCServer) GetLastActive(_ context.Context, _ *ctrlplanev1.GetLastActiveRequest) (*ctrlplanev1.GetLastActiveResponse, error)
GetLastActive returns per-sandbox activity timestamps as RFC3339 strings. Replaces the former HTTP /internal/sandboxes/last-active endpoint.
func (*InternalGRPCServer) PushRoute ¶
func (s *InternalGRPCServer) PushRoute(_ context.Context, req *ctrlplanev1.PushRouteRequest) (*ctrlplanev1.PushRouteResponse, error)
PushRoute registers a sandbox_id → (namespace, pod_name) mapping so the router can serve traffic without waiting for the informer's sandbox-id index to catch up. The mapping carries no phase or IP: those are read live from the Pod informer on every request.
type K8sSandboxRouter ¶
type K8sSandboxRouter struct {
// contains filtered or unexported fields
}
K8sSandboxRouter resolves sandbox routes using two independent data sources:
- RouteCache, populated by Controller push/evict, answers "does sandbox X exist, and if so on which (ns, pod_name)?". It carries NO phase or IP — those would go stale across Pod lifecycle transitions.
- Pod informer (via mgr.GetClient()), answers "what is the live state of (ns, pod_name)?". Consulted at request time to check phase + IP.
Each request reads the Pod exactly once: via client.Get by name on cache hit, or via the sandbox-id indexer on cache miss (when fallback is on). Both paths funnel into finalize() which applies the phase/label/IP checks.
The fallback to indexer is gated by enableFallback. With fallback OFF, cache misses are served as NotFound immediately — useful for verifying the pure-push model in testing.
func NewK8sSandboxRouter ¶
func NewK8sSandboxRouter(c client.Client, defaultPort int, cache *RouteCache, enableFallback bool) *K8sSandboxRouter
NewK8sSandboxRouter creates a K8sSandboxRouter. The defaultPort parameter is kept for compatibility but is not used when the caller provides a port via headers or URL. cache may be nil, in which case the router only uses the informer path. When enableFallback is false, cache misses short-circuit to NotFound without consulting the sandbox-id informer index.
func (*K8sSandboxRouter) ResolveSandboxRoute ¶
func (r *K8sSandboxRouter) ResolveSandboxRoute(ctx context.Context, sandboxID string, reqPort int) (*SandboxRoute, error)
ResolveSandboxRoute returns the live Pod IP for the given sandbox, or one of ErrSandboxRouteNotFound / ErrSandboxRouteBadGateway. Exactly one Pod read is performed per request.
type RouteCache ¶
type RouteCache struct {
// contains filtered or unexported fields
}
RouteCache is a lock-free (on the read path) cache of sandboxID → RouteEntry. It is written by the Controller via gRPC PushRoute and read by the Envoy ExtProc router on every proxied request. Writes are rare (once per sandbox Create) and reads are hot (every HTTP request through the gateway), so the cache uses a copy-on-write map snapshot:
- Get: single atomic Load of a *map pointer, then a map lookup.
- Put/Delete: acquire the writer mutex, clone the map, mutate the clone, atomic Store the new pointer.
Lazy expiry on Get returns (_, false) for stale entries without mutating the map. A background GC sweeper compacts the map periodically so its size remains bounded when many sandboxes are created and released.
The cache is a latency optimization, NOT a source of truth. The Informer fallback in K8sSandboxRouter always reconciles correctness.
func NewRouteCache ¶
func NewRouteCache(ttl time.Duration) *RouteCache
NewRouteCache returns a RouteCache with the given per-entry TTL. A TTL of 1 minute is appropriate for the push-on-Create use case: the Informer normally catches up well within that window and takes over.
func (*RouteCache) Delete ¶
func (c *RouteCache) Delete(sandboxID string)
Delete removes the entry for sandboxID. No-op if absent.
func (*RouteCache) Get ¶
func (c *RouteCache) Get(sandboxID string) (RouteEntry, bool)
Get returns the entry for sandboxID and whether it is present and fresh. Expired entries are treated as misses (no map mutation happens on the read path — the sweeper reclaims them asynchronously).
func (*RouteCache) Len ¶
func (c *RouteCache) Len() int
Len returns the current entry count. Useful for metrics and tests.
func (*RouteCache) Put ¶
func (c *RouteCache) Put(sandboxID string, e RouteEntry)
Put inserts or overwrites the entry for sandboxID. ExpiresAt is set to time.Now().Add(ttl), so repeated Puts refresh the TTL. The caller should supply PodIP/Phase/Namespace/PodName; ExpiresAt in the argument is ignored.
func (*RouteCache) StartGC ¶
func (c *RouteCache) StartGC(ctx context.Context, interval time.Duration)
StartGC launches a background goroutine that periodically drops expired entries. It is safe to call multiple times; each call starts a new goroutine that lives until ctx is cancelled. A single call per cache is the norm.
type RouteEntry ¶
RouteEntry is one cached sandboxID -> (ns, pod_name) mapping. It carries neither Phase nor PodIP: those are read live from the Pod informer cache at request time so the entry never goes stale across Pod lifecycle transitions (Starting -> Running -> Stopping).
type RouteTarget ¶
RouteTarget holds the extracted sandbox ID, port, and the rewritten path.
type SandboxRoute ¶
SandboxRoute holds the resolved upstream destination for a sandbox request.
func (*SandboxRoute) DestHost ¶
func (r *SandboxRoute) DestHost() string
DestHost formats the route as "IP:Port" for the x-envoy-original-dst-host header.
type SandboxRouter ¶
type SandboxRouter interface {
ResolveSandboxRoute(ctx context.Context, sandboxID string, port int) (*SandboxRoute, error)
}
SandboxRouter resolves the destination Pod for a given sandbox ID. Implement this interface to swap in a cache-backed or test-double router.
type Server ¶
type Server struct {
extProcPb.UnimplementedExternalProcessorServer
// contains filtered or unexported fields
}
Server is the gRPC ExtProc server that runs alongside the Gin REST API.
func New ¶
func New(cfg ServerConfig, adminKeyMgr *apikey.AdminKeyManager, keyStore apikey.KeyStore, router SandboxRouter, activityTracker *ActivityTracker, clusterStore *cluster.Store, localClusterID string) *Server
New creates a new ExtProc gRPC Server.
- bindAddr – TCP address to listen on (e.g. ":9002").
- adminKeyMgr – Admin key manager (pass nil to disable auth / dev mode).
- keyStore – Tenant key store for validating opaque tokens.
- router – Resolves sandbox ID → Pod IP:Port.
- activityTracker – Records per-sandbox last-active timestamps (may be nil).
- clusterStore – Cross-cluster config store (nil to disable cross-cluster routing).
- localClusterID – Identifier of the local cluster (empty to disable cross-cluster routing).
func (*Server) Process ¶
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error
Process implements envoy.service.ext_proc.v3.ExternalProcessor/Process.
The method runs a receive loop over the bidirectional stream. In the current configuration Envoy only sends RequestHeaders messages (all body/trailer modes are set to NONE/SKIP), so one exchange per HTTP request is expected.