extproc

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: Apache-2.0 Imports: 30 Imported by: 0

README

pkg/envoy/extproc — Envoy ExternalProcessor

This gRPC service implements the Envoy ExternalProcessor protocol. For every inbound HTTP request, it performs the following steps:

  1. Validates the API Key (SecretKeyStore).
  2. Parses the request target (Sandbox ID + Port).
  3. Queries the corresponding Pod IP.
  4. Returns Header mutations to route traffic to the correct Sandbox Pod.

File Structure

File Description
server.go gRPC service entry point; contains the ProcessingRequest main loop.
router.go Envoy ExtProc router (bridges gRPC and handlers).
helper.go Core logic: authenticate(), extractTarget(), and RouteTarget.
activity_tracker.go Tracks Sandbox activity (used for idle timeout management).
helper_test.go Unit tests for extractTarget.

Routing Resolution Strategy (extractTarget)

Strategies are evaluated in order of priority (highest to lowest). The process stops at the first match:

Priority Strategy Format / Source
1 Explicit Headers x-sandbox-id + x-sandbox-port
2 Standard URL Path /sandboxes/<id>/<port>/...

Testing

To run the test suite:

go test ./pkg/envoy/extproc/... -v

Note: If you modify the extractTarget logic, you must update the corresponding test cases in helper_test.go to ensure alignment.

Documentation

Overview

Package extproc implements an Envoy ExternalProcessor (ExtProc) gRPC server.

Architecture:

Client → Envoy :8092 → ExtProc gRPC (this server)
                         ↳ Authenticate API key
                         ↳ Extract Sandbox ID (header or URL path)
                         ↳ Resolve sandbox Pod IP via Kubernetes
                         ↳ Inject "x-envoy-original-dst-host: <PodIP>:<Port>"
                         ↳ Rewrite the :path header if needed
       → Envoy ORIGINAL_DST cluster → Sandbox Pod FastAPI

Index

Constants

This section is empty.

Variables

View Source
var ErrSandboxRouteBadGateway = errors.New("sandbox is not in a healthy state")

ErrSandboxRouteBadGateway signals that the router knows the sandbox exists but cannot route to it right now. Mapped to HTTP 502, retryable. Any Pod state that is not "Running or Stopping + matching sandbox-id label + PodIP set" produces this error, so it also covers informer lag, label race windows, Starting (container swap in progress), Idle/Failed phases, and missing PodIP.

View Source
var ErrSandboxRouteNotFound = errors.New("sandbox route not found")

ErrSandboxRouteNotFound signals that the sandbox is unknown to the router: neither the cache nor (when fallback is enabled) the sandbox-id informer index returned any mapping. Mapped to HTTP 404 by the ExtProc server. Callers receiving this should treat the sandbox ID as definitively absent (never existed, or evicted long enough ago that every trace is gone).

Functions

func AdminKeyUnaryInterceptor

func AdminKeyUnaryInterceptor(mgr *apikey.AdminKeyManager) grpc.UnaryServerInterceptor

AdminKeyUnaryInterceptor returns a grpc.UnaryServerInterceptor that rejects any RPC not carrying a valid admin key in the `authorization: Bearer <key>` metadata. The caller should only register this interceptor when mgr is non-nil; passing nil yields an interceptor that rejects every request.

Types

type ActivityTracker

type ActivityTracker struct {
	// contains filtered or unexported fields
}

ActivityTracker is a lightweight in-memory store that records the last time a proxied HTTP request was observed for each sandbox. It is intentionally stateless with respect to Kubernetes — all K8s reads/writes are handled by IdleTimeoutReconciler in the agentbox controller manager.

Lifecycle:

  1. On startup, InitFromAnnotations is called for each Running pod that has a last-active annotation so that ExtProc restart does not reset history.
  2. On every proxied request, Touch is called (non-blocking, O(1)).
  3. IdleTimeoutReconciler polls LastActiveHandler to snapshot the map, writes the values back as pod annotations, and decides whether to release pods.

func NewActivityTracker

func NewActivityTracker() *ActivityTracker

NewActivityTracker creates an ActivityTracker with an empty state and no background GC. This is the zero-config constructor.

func NewActivityTrackerWithGC

func NewActivityTrackerWithGC(c client.Client, gcInterval time.Duration) *ActivityTracker

NewActivityTrackerWithGC creates an ActivityTracker with a background GC goroutine that periodically removes entries for sandboxes that are no longer in the Running state. gcInterval controls the GC frequency; 5–10 minutes is recommended for production use.

func (*ActivityTracker) InitFromAnnotations

func (t *ActivityTracker) InitFromAnnotations(sandboxID string, ts time.Time)

InitFromAnnotations seeds the in-memory map with a timestamp from a pod annotation. Call this once per Running pod at startup before serving traffic. If ts is the zero value it is ignored.

func (*ActivityTracker) LastActiveHandler

func (t *ActivityTracker) LastActiveHandler() http.HandlerFunc

LastActiveHandler returns an HTTP handler that serves a JSON object mapping sandboxID → RFC3339 timestamp for the IdleTimeoutReconciler to poll.

Response format:

{ "sandboxId1": "2026-03-17T10:00:00Z", ... }

func (*ActivityTracker) Remove

func (t *ActivityTracker) Remove(sandboxID string)

Remove deletes a sandbox entry from the tracker. Called when a pod is no longer Running so its entry does not accumulate indefinitely.

func (*ActivityTracker) StartGC

func (t *ActivityTracker) StartGC(ctx context.Context)

StartGC launches the background GC goroutine. It is a no-op when no client or gcInterval was provided. Call this only after the K8s cache is warm so that the first GC cycle does not spuriously evict entries.

func (*ActivityTracker) Touch

func (t *ActivityTracker) Touch(sandboxID string)

Touch records the current time as the last-active timestamp for sandboxID. It is safe to call from multiple goroutines and returns immediately.

type InternalGRPCServer

type InternalGRPCServer struct {
	ctrlplanev1.UnimplementedControlPlaneServiceServer

	Cache   *RouteCache
	Tracker *ActivityTracker
}

InternalGRPCServer implements the ControlPlaneService gRPC contract for the Controller → ExtProc control channel. It wraps the in-memory RouteCache (for PushRoute) and the ActivityTracker (for GetLastActive).

func NewInternalGRPCServer

func NewInternalGRPCServer(cache *RouteCache, tracker *ActivityTracker) *InternalGRPCServer

NewInternalGRPCServer constructs the server. Both dependencies may not be nil.

func (*InternalGRPCServer) EvictRoute

EvictRoute removes a sandbox_id from the cache. Invoked by the Controller when a Pod completes Stopping → Idle, so subsequent router queries for the released sandbox_id immediately fall through to the informer fallback path (if enabled) or return NotFound, rather than pointing to a stale Pod.

func (*InternalGRPCServer) GetLastActive

GetLastActive returns per-sandbox activity timestamps as RFC3339 strings. Replaces the former HTTP /internal/sandboxes/last-active endpoint.

func (*InternalGRPCServer) PushRoute

PushRoute registers a sandbox_id → (namespace, pod_name) mapping so the router can serve traffic without waiting for the informer's sandbox-id index to catch up. The mapping carries no phase or IP: those are read live from the Pod informer on every request.

type K8sSandboxRouter

type K8sSandboxRouter struct {
	// contains filtered or unexported fields
}

K8sSandboxRouter resolves sandbox routes using two independent data sources:

  • RouteCache, populated by Controller push/evict, answers "does sandbox X exist, and if so on which (ns, pod_name)?". It carries NO phase or IP — those would go stale across Pod lifecycle transitions.
  • Pod informer (via mgr.GetClient()), answers "what is the live state of (ns, pod_name)?". Consulted at request time to check phase + IP.

Each request reads the Pod exactly once: via client.Get by name on cache hit, or via the sandbox-id indexer on cache miss (when fallback is on). Both paths funnel into finalize() which applies the phase/label/IP checks.

The fallback to indexer is gated by enableFallback. With fallback OFF, cache misses are served as NotFound immediately — useful for verifying the pure-push model in testing.

func NewK8sSandboxRouter

func NewK8sSandboxRouter(c client.Client, defaultPort int, cache *RouteCache, enableFallback bool) *K8sSandboxRouter

NewK8sSandboxRouter creates a K8sSandboxRouter. The defaultPort parameter is kept for compatibility but is not used when the caller provides a port via headers or URL. cache may be nil, in which case the router only uses the informer path. When enableFallback is false, cache misses short-circuit to NotFound without consulting the sandbox-id informer index.

func (*K8sSandboxRouter) ResolveSandboxRoute

func (r *K8sSandboxRouter) ResolveSandboxRoute(ctx context.Context, sandboxID string, reqPort int) (*SandboxRoute, error)

ResolveSandboxRoute returns the live Pod IP for the given sandbox, or one of ErrSandboxRouteNotFound / ErrSandboxRouteBadGateway. Exactly one Pod read is performed per request.

type RouteCache

type RouteCache struct {
	// contains filtered or unexported fields
}

RouteCache is a lock-free (on the read path) cache of sandboxID → RouteEntry. It is written by the Controller via gRPC PushRoute and read by the Envoy ExtProc router on every proxied request. Writes are rare (once per sandbox Create) and reads are hot (every HTTP request through the gateway), so the cache uses a copy-on-write map snapshot:

  • Get: single atomic Load of a *map pointer, then a map lookup.
  • Put/Delete: acquire the writer mutex, clone the map, mutate the clone, atomic Store the new pointer.

Lazy expiry on Get returns (_, false) for stale entries without mutating the map. A background GC sweeper compacts the map periodically so its size remains bounded when many sandboxes are created and released.

The cache is a latency optimization, NOT a source of truth. The Informer fallback in K8sSandboxRouter always reconciles correctness.

func NewRouteCache

func NewRouteCache(ttl time.Duration) *RouteCache

NewRouteCache returns a RouteCache with the given per-entry TTL. A TTL of 1 minute is appropriate for the push-on-Create use case: the Informer normally catches up well within that window and takes over.

func (*RouteCache) Delete

func (c *RouteCache) Delete(sandboxID string)

Delete removes the entry for sandboxID. No-op if absent.

func (*RouteCache) Get

func (c *RouteCache) Get(sandboxID string) (RouteEntry, bool)

Get returns the entry for sandboxID and whether it is present and fresh. Expired entries are treated as misses (no map mutation happens on the read path — the sweeper reclaims them asynchronously).

func (*RouteCache) Len

func (c *RouteCache) Len() int

Len returns the current entry count. Useful for metrics and tests.

func (*RouteCache) Put

func (c *RouteCache) Put(sandboxID string, e RouteEntry)

Put inserts or overwrites the entry for sandboxID. ExpiresAt is set to time.Now().Add(ttl), so repeated Puts refresh the TTL. The caller should supply PodIP/Phase/Namespace/PodName; ExpiresAt in the argument is ignored.

func (*RouteCache) StartGC

func (c *RouteCache) StartGC(ctx context.Context, interval time.Duration)

StartGC launches a background goroutine that periodically drops expired entries. It is safe to call multiple times; each call starts a new goroutine that lives until ctx is cancelled. A single call per cache is the norm.

type RouteEntry

type RouteEntry struct {
	Namespace string
	PodName   string
	ExpiresAt time.Time
}

RouteEntry is one cached sandboxID -> (ns, pod_name) mapping. It carries neither Phase nor PodIP: those are read live from the Pod informer cache at request time so the entry never goes stale across Pod lifecycle transitions (Starting -> Running -> Stopping).

type RouteTarget

type RouteTarget struct {
	SandboxID     string
	Port          int
	RewrittenPath string
}

RouteTarget holds the extracted sandbox ID, port, and the rewritten path.

type SandboxRoute

type SandboxRoute struct {
	PodIP string
	Port  int
}

SandboxRoute holds the resolved upstream destination for a sandbox request.

func (*SandboxRoute) DestHost

func (r *SandboxRoute) DestHost() string

DestHost formats the route as "IP:Port" for the x-envoy-original-dst-host header.

type SandboxRouter

type SandboxRouter interface {
	ResolveSandboxRoute(ctx context.Context, sandboxID string, port int) (*SandboxRoute, error)
}

SandboxRouter resolves the destination Pod for a given sandbox ID. Implement this interface to swap in a cache-backed or test-double router.

type Server

type Server struct {
	extProcPb.UnimplementedExternalProcessorServer
	// contains filtered or unexported fields
}

Server is the gRPC ExtProc server that runs alongside the Gin REST API.

func New

func New(cfg ServerConfig, adminKeyMgr *apikey.AdminKeyManager, keyStore apikey.KeyStore, router SandboxRouter, activityTracker *ActivityTracker, clusterStore *cluster.Store, localClusterID string) *Server

New creates a new ExtProc gRPC Server.

  • bindAddr – TCP address to listen on (e.g. ":9002").
  • adminKeyMgr – Admin key manager (pass nil to disable auth / dev mode).
  • keyStore – Tenant key store for validating opaque tokens.
  • router – Resolves sandbox ID → Pod IP:Port.
  • activityTracker – Records per-sandbox last-active timestamps (may be nil).
  • clusterStore – Cross-cluster config store (nil to disable cross-cluster routing).
  • localClusterID – Identifier of the local cluster (empty to disable cross-cluster routing).

func (*Server) Process

Process implements envoy.service.ext_proc.v3.ExternalProcessor/Process.

The method runs a receive loop over the bidirectional stream. In the current configuration Envoy only sends RequestHeaders messages (all body/trailer modes are set to NONE/SKIP), so one exchange per HTTP request is expected.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start listens and serves until ctx is cancelled. It is designed to be run in a goroutine alongside the controller-manager and Gin API server.

type ServerConfig

type ServerConfig struct {
	BindAddr   string
	EnableAuth bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL