Documentation
¶
Overview ¶
Package engine is the KV-event subscriber. It runs as a sidecar next to a vLLM engine replica, subscribes to the engine's KV cache events over ZMQ, decodes them, and reports cache state to the inferencecache-server over gRPC.
Two independent paths share one gRPC client:
- Event path: ZMQ → EventBatch → ReportCacheState (prefix adds) + PublishEvent (removals/clears), debounced on a short window.
- Stats path: HTTP GET against the engine's Prometheus /metrics → MetricsScraper → StatsReporter → ReportCacheState (stats-only CacheStateUpdate populating cacheMemoryBytes / hitRate / pressure on its own cadence, default ~10s).
Metadata only — never KV tensors or prompt text. Fail-soft on both paths: neither a ZMQ drop nor a scrape failure can stall the engine. The package is built into the kvevent-subscriber binary (cmd/kvevent-subscriber).
Index ¶
- func StoredPrefixes(ev BlockStored) []*icpb.PrefixEntry
- type AllBlocksCleared
- type BlockRemoved
- type BlockStored
- type CacheTier
- type Config
- func (c Config) ClearedEvent(tsSeconds float64) *icpb.CacheEvent
- func (c Config) RemovedEvents(ev BlockRemoved, tsSeconds float64) []*icpb.CacheEvent
- func (c Config) StatsUpdate(tsUs int64, stats *icpb.ReplicaStats) *icpb.CacheStateUpdate
- func (c Config) StoredUpdate(ev BlockStored, tsSeconds float64) *icpb.CacheStateUpdate
- func (c Config) Update(tsUs int64, prefixes []*icpb.PrefixEntry) *icpb.CacheStateUpdate
- func (c Config) Validate() error
- type Event
- type EventBatch
- type MetricsScraper
- type Reporter
- type ReporterOption
- type ScraperConfig
- type StatsReporter
- type StatsReporterOption
- type Subscriber
- type SubscriberOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StoredPrefixes ¶
func StoredPrefixes(ev BlockStored) []*icpb.PrefixEntry
StoredPrefixes renders a BlockStored event as PrefixEntry values: one per block hash. token_count is cumulative: vLLM block hashes chain their parent, so block i's hash identifies the prefix up to and including block i and therefore covers (i+1) blocks of this event. This keeps the ranking signal (longer prefixes rank higher) instead of a flat per-block count. It counts only within-event tokens — the parent prefix's length isn't in the event — so it's a lower bound, and it never uses token contents.
Types ¶
type AllBlocksCleared ¶
type AllBlocksCleared struct{}
AllBlocksCleared reports that the engine flushed its entire KV cache.
type BlockRemoved ¶
type BlockRemoved struct {
BlockHashes [][]byte
}
BlockRemoved reports KV blocks that were evicted. BlockHashes are opaque bytes (see BlockStored).
type BlockStored ¶
BlockStored reports KV blocks that became resident. BlockSize is the number of tokens per block; a block covers BlockSize tokens of the prefix.
BlockHashes are opaque prefix-hash bytes. vLLM's ExternalBlockHash is a union of bytes and int — integer hashes are normalized to 8-byte big-endian here, so downstream only ever sees opaque bytes (matching the contract's prefix_hash).
type CacheTier ¶
type CacheTier string
CacheTier selects which vLLM cache-usage gauge feeds cache_memory_bytes. vLLM 0.21+ exposes a single unified `vllm:kv_cache_usage_perc`; older vLLM exposed `vllm:gpu_cache_usage_perc` and (on some builds) `vllm:cpu_cache_usage_perc`. "auto" probes that fallback chain — kv → gpu → cpu — so the scraper degrades across vLLM releases without operator action.
func ValidCacheTierNames ¶
func ValidCacheTierNames() []CacheTier
ValidCacheTierNames returns the accepted --cache-tier values in fallback order. Returns a fresh slice each call so callers cannot mutate the canonical set.
type Config ¶
type Config struct {
// ReplicaID identifies the engine replica these events come from. Required;
// it is the index key the server attributes prefixes/stats to.
ReplicaID string
// ModelID is the served model identifier. Required.
ModelID string
// TenantID is the tenant namespace. Optional (empty = shared/default).
TenantID string
// HashScheme names the engine's prefix-hash domain (e.g. "vllm"). Required
// and non-empty: an empty scheme fails open server-side (dropped on ingest),
// so reporting with one would silently lose the data.
HashScheme string
}
Config is the per-engine-replica identity the subscriber stamps onto every report. vLLM's KV-cache events carry none of this, so the subscriber must be told which replica/model/tenant it watches and which hash scheme the engine uses (so the server keeps different engines' hashes in disjoint domains).
func (Config) ClearedEvent ¶
func (c Config) ClearedEvent(tsSeconds float64) *icpb.CacheEvent
ClearedEvent builds the ALL_CLEARED CacheEvent for an AllBlocksCleared event.
func (Config) RemovedEvents ¶
func (c Config) RemovedEvents(ev BlockRemoved, tsSeconds float64) []*icpb.CacheEvent
RemovedEvents builds one PREFIX_EVICTED CacheEvent per removed block hash. CacheEvent carries a single prefix_hash, so a BlockRemoved with N hashes maps to N events.
func (Config) StatsUpdate ¶
func (c Config) StatsUpdate(tsUs int64, stats *icpb.ReplicaStats) *icpb.CacheStateUpdate
StatsUpdate stamps the replica/model/tenant/hash_scheme identity onto a scraped ReplicaStats and produces a stats-only CacheStateUpdate (empty prefixes). The contract treats a CSU as an additive delta, so a stats-only update refreshes liveness + the per-replica stats without touching prefixes. Returns nil if stats is nil.
The nested stats are rebuilt by-field rather than copied — proto messages embed a sync.Mutex via MessageState, so go vet rejects value copies.
func (Config) StoredUpdate ¶
func (c Config) StoredUpdate(ev BlockStored, tsSeconds float64) *icpb.CacheStateUpdate
StoredUpdate builds the CacheStateUpdate for a single BlockStored event. Returns nil if the event carries no hashes (nothing to report).
func (Config) Update ¶
func (c Config) Update(tsUs int64, prefixes []*icpb.PrefixEntry) *icpb.CacheStateUpdate
Update stamps the replica/model/tenant/hash_scheme identity onto a set of prefixes. Returns nil for an empty prefix set (nothing to report).
type Event ¶
type Event interface {
// contains filtered or unexported methods
}
Event is one decoded KV-cache event. The concrete types are BlockStored, BlockRemoved, and AllBlocksCleared.
type EventBatch ¶
type EventBatch struct {
// TimestampSeconds is the engine's batch timestamp (Unix seconds, float).
TimestampSeconds float64
Events []Event
}
EventBatch is one decoded vLLM event batch.
func DecodeEventBatch ¶
func DecodeEventBatch(payload []byte) (*EventBatch, error)
DecodeEventBatch decodes one msgpack EventBatch payload (the last ZMQ frame). Unknown event tags are skipped (forward-compatible); a malformed batch is an error so the caller can drop it without corrupting state.
type MetricsScraper ¶
type MetricsScraper struct {
// contains filtered or unexported fields
}
MetricsScraper polls vLLM's Prometheus /metrics endpoint and projects the payload into a ReplicaStats. It is fail-soft: any HTTP/parse error returns a zero ReplicaStats + the error so the caller logs and retries on the next tick.
Hit rate is a sliding signal — the per-scrape delta of (prefix_cache_hits_total / prefix_cache_queries_total) — so the very first scrape returns hit_rate=0 (no delta available) and the previous values are cached on the scraper for the next tick. A counter reset (engine restart) resets the delta state too: the tick that observes the reset returns 0 and the subsequent tick produces a fresh delta from the new baseline.
func NewMetricsScraper ¶
func NewMetricsScraper(httpClient *http.Client, cfg ScraperConfig, logger *slog.Logger) *MetricsScraper
NewMetricsScraper builds a scraper. If httpClient is nil, a fresh *http.Client with the configured Timeout is used.
func (*MetricsScraper) Scrape ¶
func (s *MetricsScraper) Scrape(ctx context.Context) (*icpb.ReplicaStats, error)
Scrape performs one GET against the engine /metrics endpoint and returns the derived ReplicaStats. On error a zero-valued *icpb.ReplicaStats is returned alongside the error — the caller logs and skips the tick.
type Reporter ¶
type Reporter struct {
// contains filtered or unexported fields
}
Reporter forwards decoded KV-cache events to the policy server over gRPC.
Adds (BlockStored) are accumulated and flushed on a short window — this debounces high engine event rates. Each flush sends one CacheStateUpdate on a fresh, time-bounded ReportCacheState stream; removals (BlockRemoved → PREFIX_EVICTED, AllBlocksCleared → ALL_CLEARED) go via a time-bounded unary PublishEvent.
Every RPC uses its own bounded context, so a stalled or unreachable server can never block the loop for longer than rpcTimeout — the cache is an optimization and must never stall the engine. Errors are logged and dropped (soft state); Run only returns on context cancellation or input close.
func NewReporter ¶
func NewReporter(client icpb.InferenceCacheClient, cfg Config, opts ...ReporterOption) *Reporter
NewReporter builds a Reporter for one engine replica.
type ReporterOption ¶
type ReporterOption func(*Reporter)
ReporterOption configures a Reporter.
func WithLogger ¶
func WithLogger(l *slog.Logger) ReporterOption
WithLogger sets the logger (default slog.Default()).
func WithRPCTimeout ¶
func WithRPCTimeout(d time.Duration) ReporterOption
WithRPCTimeout bounds each gRPC call/flush (default 5s).
func WithWindow ¶
func WithWindow(d time.Duration) ReporterOption
WithWindow sets the add-batching/debounce flush window (default 100ms).
type ScraperConfig ¶
type ScraperConfig struct {
// URL is the engine's Prometheus /metrics endpoint.
URL string
// Tier selects which cache-usage gauge feeds cache_memory_bytes.
// Defaults to CacheTierAuto.
Tier CacheTier
// ModelLabel filters every series by the `model_name` Prometheus label
// (the one vLLM stamps on its metrics). When non-empty, only series whose
// label matches participate in the scrape — so a /metrics endpoint that
// happens to expose multiple models cannot attribute another model's
// usage/load/hit-rate to the replica this scraper is configured for. When
// empty, every series is included (the legacy aggregate behaviour).
ModelLabel string
// CacheSizeBytes is the engine's total KV-cache capacity, used to map a
// usage_perc gauge (0..1) to bytes. When zero, cache_memory_bytes is
// emitted as 0 — the ranker doesn't consume this field, so an honest
// "unknown" is preferred over a fabricated number.
CacheSizeBytes int64
// MaxConcurrencyCeiling is the denominator for the pressure proxy:
// pressure = clamp01((num_requests_running + num_requests_waiting) / ceiling).
// 0 disables pressure (it stays 0).
MaxConcurrencyCeiling int
// Timeout bounds each scrape; defaults to defaultScrapeTimeout when <= 0.
Timeout time.Duration
}
ScraperConfig tunes the metrics scraper. URL is required.
type StatsReporter ¶
type StatsReporter struct {
// contains filtered or unexported fields
}
StatsReporter periodically scrapes engine /metrics and emits a stats-only CacheStateUpdate via ReportCacheState. It runs alongside the event Reporter (different cadence, different data source) and shares the same gRPC client.
Failure independence is load-bearing: a scrape failure (engine /metrics down, HTTP timeout, parse error) logs and skips the tick — it never blocks the event path or kills the subscriber. The two paths are independent failure domains.
func NewStatsReporter ¶
func NewStatsReporter(client icpb.InferenceCacheClient, scraper statsScraper, cfg Config, opts ...StatsReporterOption) *StatsReporter
NewStatsReporter builds a StatsReporter that ticks against scraper and publishes onto client.
type StatsReporterOption ¶
type StatsReporterOption func(*StatsReporter)
StatsReporterOption configures a StatsReporter.
func WithStatsInterval ¶
func WithStatsInterval(d time.Duration) StatsReporterOption
WithStatsInterval sets the scrape tick interval (default 10s).
func WithStatsLogger ¶
func WithStatsLogger(l *slog.Logger) StatsReporterOption
WithStatsLogger sets the logger (default slog.Default()).
func WithStatsRPCTimeout ¶
func WithStatsRPCTimeout(d time.Duration) StatsReporterOption
WithStatsRPCTimeout bounds each ReportCacheState call (default 5s).
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber reads a vLLM KV-cache event stream from a ZMQ PUB endpoint, decodes each batch, and emits it. It reconnects with backoff and never returns except on context cancellation (fail-soft — the engine is unaffected by our outages).
func NewSubscriber ¶
func NewSubscriber(endpoint, topic string, opts ...SubscriberOption) *Subscriber
NewSubscriber builds a Subscriber for one engine's ZMQ event endpoint (e.g. "tcp://127.0.0.1:5557") and topic (e.g. "kv-events"; "" = all topics).
func (*Subscriber) Run ¶
func (s *Subscriber) Run(ctx context.Context, out chan<- *EventBatch) error
Run connects, decodes batches, and sends them on out until ctx is cancelled. out is not closed (the caller owns it).
type SubscriberOption ¶
type SubscriberOption func(*Subscriber)
SubscriberOption configures a Subscriber.
func WithSubscriberBackoff ¶
func WithSubscriberBackoff(d time.Duration) SubscriberOption
WithSubscriberBackoff sets the reconnect backoff (default 1s).
func WithSubscriberLogger ¶
func WithSubscriberLogger(l *slog.Logger) SubscriberOption
WithSubscriberLogger sets the logger (default slog.Default()).