engine

package
v0.0.0-...-ed02025 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package engine is the KV-event subscriber. It runs as a sidecar next to a vLLM engine replica, subscribes to the engine's KV cache events over ZMQ, decodes them, and reports cache state to the inferencecache-server over gRPC.

Two independent paths share one gRPC client:

  • Event path: ZMQ → EventBatch → ReportCacheState (prefix adds) + PublishEvent (removals/clears), debounced on a short window.
  • Stats path: HTTP GET against the engine's Prometheus /metrics → MetricsScraper → StatsReporter → ReportCacheState (stats-only CacheStateUpdate populating cacheMemoryBytes / hitRate / pressure on its own cadence, default ~10s).

Metadata only — never KV tensors or prompt text. Fail-soft on both paths: neither a ZMQ drop nor a scrape failure can stall the engine. The package is built into the kvevent-subscriber binary (cmd/kvevent-subscriber).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StoredPrefixes

func StoredPrefixes(ev BlockStored) []*icpb.PrefixEntry

StoredPrefixes renders a BlockStored event as PrefixEntry values: one per block hash. token_count is cumulative: vLLM block hashes chain their parent, so block i's hash identifies the prefix up to and including block i and therefore covers (i+1) blocks of this event. This keeps the ranking signal (longer prefixes rank higher) instead of a flat per-block count. It counts only within-event tokens — the parent prefix's length isn't in the event — so it's a lower bound, and it never uses token contents.

Types

type AllBlocksCleared

type AllBlocksCleared struct{}

AllBlocksCleared reports that the engine flushed its entire KV cache.

type BlockRemoved

type BlockRemoved struct {
	BlockHashes [][]byte
}

BlockRemoved reports KV blocks that were evicted. BlockHashes are opaque bytes (see BlockStored).

type BlockStored

type BlockStored struct {
	BlockHashes [][]byte
	BlockSize   int32
}

BlockStored reports KV blocks that became resident. BlockSize is the number of tokens per block; a block covers BlockSize tokens of the prefix.

BlockHashes are opaque prefix-hash bytes. vLLM's ExternalBlockHash is a union of bytes and int — integer hashes are normalized to 8-byte big-endian here, so downstream only ever sees opaque bytes (matching the contract's prefix_hash).

type CacheTier

type CacheTier string

CacheTier selects which vLLM cache-usage gauge feeds cache_memory_bytes. vLLM 0.21+ exposes a single unified `vllm:kv_cache_usage_perc`; older vLLM exposed `vllm:gpu_cache_usage_perc` and (on some builds) `vllm:cpu_cache_usage_perc`. "auto" probes that fallback chain — kv → gpu → cpu — so the scraper degrades across vLLM releases without operator action.

const (
	CacheTierAuto CacheTier = "auto"
	CacheTierKV   CacheTier = "kv"  // vLLM 0.21+: vllm:kv_cache_usage_perc
	CacheTierGPU  CacheTier = "gpu" // legacy: vllm:gpu_cache_usage_perc
	CacheTierCPU  CacheTier = "cpu" // legacy: vllm:cpu_cache_usage_perc
)

func ValidCacheTierNames

func ValidCacheTierNames() []CacheTier

ValidCacheTierNames returns the accepted --cache-tier values in fallback order. Returns a fresh slice each call so callers cannot mutate the canonical set.

func (CacheTier) IsValid

func (t CacheTier) IsValid() bool

IsValid reports whether t is one of the documented tiers.

type Config

type Config struct {
	// ReplicaID identifies the engine replica these events come from. Required;
	// it is the index key the server attributes prefixes/stats to.
	ReplicaID string
	// ModelID is the served model identifier. Required.
	ModelID string
	// TenantID is the tenant namespace. Optional (empty = shared/default).
	TenantID string
	// HashScheme names the engine's prefix-hash domain (e.g. "vllm"). Required
	// and non-empty: an empty scheme fails open server-side (dropped on ingest),
	// so reporting with one would silently lose the data.
	HashScheme string
}

Config is the per-engine-replica identity the subscriber stamps onto every report. vLLM's KV-cache events carry none of this, so the subscriber must be told which replica/model/tenant it watches and which hash scheme the engine uses (so the server keeps different engines' hashes in disjoint domains).

func (Config) ClearedEvent

func (c Config) ClearedEvent(tsSeconds float64) *icpb.CacheEvent

ClearedEvent builds the ALL_CLEARED CacheEvent for an AllBlocksCleared event.

func (Config) RemovedEvents

func (c Config) RemovedEvents(ev BlockRemoved, tsSeconds float64) []*icpb.CacheEvent

RemovedEvents builds one PREFIX_EVICTED CacheEvent per removed block hash. CacheEvent carries a single prefix_hash, so a BlockRemoved with N hashes maps to N events.

func (Config) StatsUpdate

func (c Config) StatsUpdate(tsUs int64, stats *icpb.ReplicaStats) *icpb.CacheStateUpdate

StatsUpdate stamps the replica/model/tenant/hash_scheme identity onto a scraped ReplicaStats and produces a stats-only CacheStateUpdate (empty prefixes). The contract treats a CSU as an additive delta, so a stats-only update refreshes liveness + the per-replica stats without touching prefixes. Returns nil if stats is nil.

The nested stats are rebuilt by-field rather than copied — proto messages embed a sync.Mutex via MessageState, so go vet rejects value copies.

func (Config) StoredUpdate

func (c Config) StoredUpdate(ev BlockStored, tsSeconds float64) *icpb.CacheStateUpdate

StoredUpdate builds the CacheStateUpdate for a single BlockStored event. Returns nil if the event carries no hashes (nothing to report).

func (Config) Update

func (c Config) Update(tsUs int64, prefixes []*icpb.PrefixEntry) *icpb.CacheStateUpdate

Update stamps the replica/model/tenant/hash_scheme identity onto a set of prefixes. Returns nil for an empty prefix set (nothing to report).

func (Config) Validate

func (c Config) Validate() error

Validate checks the required identity fields are set.

type Event

type Event interface {
	// contains filtered or unexported methods
}

Event is one decoded KV-cache event. The concrete types are BlockStored, BlockRemoved, and AllBlocksCleared.

type EventBatch

type EventBatch struct {
	// TimestampSeconds is the engine's batch timestamp (Unix seconds, float).
	TimestampSeconds float64
	Events           []Event
}

EventBatch is one decoded vLLM event batch.

func DecodeEventBatch

func DecodeEventBatch(payload []byte) (*EventBatch, error)

DecodeEventBatch decodes one msgpack EventBatch payload (the last ZMQ frame). Unknown event tags are skipped (forward-compatible); a malformed batch is an error so the caller can drop it without corrupting state.

type MetricsScraper

type MetricsScraper struct {
	// contains filtered or unexported fields
}

MetricsScraper polls vLLM's Prometheus /metrics endpoint and projects the payload into a ReplicaStats. It is fail-soft: any HTTP/parse error returns a zero ReplicaStats + the error so the caller logs and retries on the next tick.

Hit rate is a sliding signal — the per-scrape delta of (prefix_cache_hits_total / prefix_cache_queries_total) — so the very first scrape returns hit_rate=0 (no delta available) and the previous values are cached on the scraper for the next tick. A counter reset (engine restart) resets the delta state too: the tick that observes the reset returns 0 and the subsequent tick produces a fresh delta from the new baseline.

func NewMetricsScraper

func NewMetricsScraper(httpClient *http.Client, cfg ScraperConfig, logger *slog.Logger) *MetricsScraper

NewMetricsScraper builds a scraper. If httpClient is nil, a fresh *http.Client with the configured Timeout is used.

func (*MetricsScraper) Scrape

func (s *MetricsScraper) Scrape(ctx context.Context) (*icpb.ReplicaStats, error)

Scrape performs one GET against the engine /metrics endpoint and returns the derived ReplicaStats. On error a zero-valued *icpb.ReplicaStats is returned alongside the error — the caller logs and skips the tick.

type Reporter

type Reporter struct {
	// contains filtered or unexported fields
}

Reporter forwards decoded KV-cache events to the policy server over gRPC.

Adds (BlockStored) are accumulated and flushed on a short window — this debounces high engine event rates. Each flush sends one CacheStateUpdate on a fresh, time-bounded ReportCacheState stream; removals (BlockRemoved → PREFIX_EVICTED, AllBlocksCleared → ALL_CLEARED) go via a time-bounded unary PublishEvent.

Every RPC uses its own bounded context, so a stalled or unreachable server can never block the loop for longer than rpcTimeout — the cache is an optimization and must never stall the engine. Errors are logged and dropped (soft state); Run only returns on context cancellation or input close.

func NewReporter

func NewReporter(client icpb.InferenceCacheClient, cfg Config, opts ...ReporterOption) *Reporter

NewReporter builds a Reporter for one engine replica.

func (*Reporter) Run

func (r *Reporter) Run(ctx context.Context, in <-chan *EventBatch) error

Run consumes decoded event batches until ctx is cancelled or in is closed. On input close it drains the final buffered adds before returning.

type ReporterOption

type ReporterOption func(*Reporter)

ReporterOption configures a Reporter.

func WithLogger

func WithLogger(l *slog.Logger) ReporterOption

WithLogger sets the logger (default slog.Default()).

func WithRPCTimeout

func WithRPCTimeout(d time.Duration) ReporterOption

WithRPCTimeout bounds each gRPC call/flush (default 5s).

func WithWindow

func WithWindow(d time.Duration) ReporterOption

WithWindow sets the add-batching/debounce flush window (default 100ms).

type ScraperConfig

type ScraperConfig struct {
	// URL is the engine's Prometheus /metrics endpoint.
	URL string
	// Tier selects which cache-usage gauge feeds cache_memory_bytes.
	// Defaults to CacheTierAuto.
	Tier CacheTier
	// ModelLabel filters every series by the `model_name` Prometheus label
	// (the one vLLM stamps on its metrics). When non-empty, only series whose
	// label matches participate in the scrape — so a /metrics endpoint that
	// happens to expose multiple models cannot attribute another model's
	// usage/load/hit-rate to the replica this scraper is configured for. When
	// empty, every series is included (the legacy aggregate behaviour).
	ModelLabel string
	// CacheSizeBytes is the engine's total KV-cache capacity, used to map a
	// usage_perc gauge (0..1) to bytes. When zero, cache_memory_bytes is
	// emitted as 0 — the ranker doesn't consume this field, so an honest
	// "unknown" is preferred over a fabricated number.
	CacheSizeBytes int64
	// MaxConcurrencyCeiling is the denominator for the pressure proxy:
	//   pressure = clamp01((num_requests_running + num_requests_waiting) / ceiling).
	// 0 disables pressure (it stays 0).
	MaxConcurrencyCeiling int
	// Timeout bounds each scrape; defaults to defaultScrapeTimeout when <= 0.
	Timeout time.Duration
}

ScraperConfig tunes the metrics scraper. URL is required.

type StatsReporter

type StatsReporter struct {
	// contains filtered or unexported fields
}

StatsReporter periodically scrapes engine /metrics and emits a stats-only CacheStateUpdate via ReportCacheState. It runs alongside the event Reporter (different cadence, different data source) and shares the same gRPC client.

Failure independence is load-bearing: a scrape failure (engine /metrics down, HTTP timeout, parse error) logs and skips the tick — it never blocks the event path or kills the subscriber. The two paths are independent failure domains.

func NewStatsReporter

func NewStatsReporter(client icpb.InferenceCacheClient, scraper statsScraper, cfg Config, opts ...StatsReporterOption) *StatsReporter

NewStatsReporter builds a StatsReporter that ticks against scraper and publishes onto client.

func (*StatsReporter) Run

func (r *StatsReporter) Run(ctx context.Context) error

Run ticks until ctx is cancelled, scraping and emitting on each tick. It returns ctx.Err() on shutdown so the caller can distinguish clean exit from a misconfiguration that triggers an early return.

type StatsReporterOption

type StatsReporterOption func(*StatsReporter)

StatsReporterOption configures a StatsReporter.

func WithStatsInterval

func WithStatsInterval(d time.Duration) StatsReporterOption

WithStatsInterval sets the scrape tick interval (default 10s).

func WithStatsLogger

func WithStatsLogger(l *slog.Logger) StatsReporterOption

WithStatsLogger sets the logger (default slog.Default()).

func WithStatsRPCTimeout

func WithStatsRPCTimeout(d time.Duration) StatsReporterOption

WithStatsRPCTimeout bounds each ReportCacheState call (default 5s).

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber reads a vLLM KV-cache event stream from a ZMQ PUB endpoint, decodes each batch, and emits it. It reconnects with backoff and never returns except on context cancellation (fail-soft — the engine is unaffected by our outages).

func NewSubscriber

func NewSubscriber(endpoint, topic string, opts ...SubscriberOption) *Subscriber

NewSubscriber builds a Subscriber for one engine's ZMQ event endpoint (e.g. "tcp://127.0.0.1:5557") and topic (e.g. "kv-events"; "" = all topics).

func (*Subscriber) Run

func (s *Subscriber) Run(ctx context.Context, out chan<- *EventBatch) error

Run connects, decodes batches, and sends them on out until ctx is cancelled. out is not closed (the caller owns it).

type SubscriberOption

type SubscriberOption func(*Subscriber)

SubscriberOption configures a Subscriber.

func WithSubscriberBackoff

func WithSubscriberBackoff(d time.Duration) SubscriberOption

WithSubscriberBackoff sets the reconnect backoff (default 1s).

func WithSubscriberLogger

func WithSubscriberLogger(l *slog.Logger) SubscriberOption

WithSubscriberLogger sets the logger (default slog.Default()).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL