Documentation
¶
Index ¶
- Constants
- func EncodeEventBatch(batch *EventBatch) ([]byte, error)
- func InitializeMetrics() error
- func ValidateConfig(config *ZMQClientConfig) error
- type AllBlocksClearedEvent
- type BlockRemovedEvent
- type BlockStoredEvent
- type EventBatch
- type EventHandler
- type EventType
- type KVCacheMetrics
- type KVEvent
- type ZMQClient
- func (c *ZMQClient) Close()
- func (c *ZMQClient) Connect() error
- func (c *ZMQClient) GetStats() (connected bool, msgsReceived uint64)
- func (c *ZMQClient) Reconnect() error
- func (c *ZMQClient) ReplayFrom(timestamp int64) error
- func (c *ZMQClient) SetLogLevel(level string)
- func (c *ZMQClient) Start() error
- func (c *ZMQClient) Stop()
- func (c *ZMQClient) Subscribe(handler EventHandler) error
- func (c *ZMQClient) WaitForConnection(timeout time.Duration) error
- type ZMQClientConfig
- type ZMQClientMetrics
- func (m *ZMQClientMetrics) Delete()
- func (m *ZMQClientMetrics) IncrementConnectionCount()
- func (m *ZMQClientMetrics) IncrementDisconnectionCount()
- func (m *ZMQClientMetrics) IncrementErrorCount(errorType string)
- func (m *ZMQClientMetrics) IncrementEventCount(eventType string)
- func (m *ZMQClientMetrics) IncrementMissedEvents(count int64)
- func (m *ZMQClientMetrics) IncrementReconnectAttempts()
- func (m *ZMQClientMetrics) IncrementReplayCount()
- func (m *ZMQClientMetrics) IncrementReplayFailure()
- func (m *ZMQClientMetrics) IncrementReplaySuccess()
- func (m *ZMQClientMetrics) RecordEventProcessingLatency(duration time.Duration)
- func (m *ZMQClientMetrics) UpdateLastSequenceID(seqID int64)
Constants ¶
const ( LabelPodKey = "pod_key" LabelEventType = "event_type" LabelErrorType = "error_type" )
Metric labels
const ( // Default ZMQ ports DefaultPubPort = 5557 DefaultRouterPort = 5558 // Timeouts and intervals DefaultPollTimeout = 100 * time.Millisecond DefaultReplayTimeout = 5 * time.Second DefaultReconnectInterval = 1 * time.Second MaxReconnectInterval = 30 * time.Second ReconnectBackoffFactor = 2.0 // Buffer sizes EventChannelBufferSize = 1000 )
Constants for ZMQ client configuration
Variables ¶
This section is empty.
Functions ¶
func EncodeEventBatch ¶
func EncodeEventBatch(batch *EventBatch) ([]byte, error)
EncodeEventBatch encodes an EventBatch to vLLM msgpack format. Only the array-encoded event fields are included; subscriber metadata (Timestamp, ModelName, PodName) is NOT encoded.
func InitializeMetrics ¶
func InitializeMetrics() error
InitializeMetrics initializes KV cache metrics if not already done This should be called when KV event sync is enabled
func ValidateConfig ¶
func ValidateConfig(config *ZMQClientConfig) error
ValidateConfig validates the ZMQ client configuration
Types ¶
type AllBlocksClearedEvent ¶
type AllBlocksClearedEvent struct {
Type EventType `msgpack:"-"`
// NOTE: These are NOT part of msgpack
Timestamp time.Time `msgpack:"-"`
ModelName string `msgpack:"-"`
PodName string `msgpack:"-"`
// contains filtered or unexported fields
}
AllBlocksClearedEvent represents all blocks being cleared ------------------------------------------------------------ AllBlocksCleared (msgspec order) Python: [tag] ------------------------------------------------------------
func (*AllBlocksClearedEvent) GetModelName ¶ added in v0.6.0
func (e *AllBlocksClearedEvent) GetModelName() string
func (*AllBlocksClearedEvent) GetPodName ¶ added in v0.6.0
func (e *AllBlocksClearedEvent) GetPodName() string
func (*AllBlocksClearedEvent) GetTimestamp ¶
func (e *AllBlocksClearedEvent) GetTimestamp() time.Time
func (*AllBlocksClearedEvent) GetType ¶
func (e *AllBlocksClearedEvent) GetType() EventType
type BlockRemovedEvent ¶
type BlockRemovedEvent struct {
Type EventType `msgpack:"-"`
BlockHashes []int64 // Decoded from vLLM, supports both old and new formats
// NOTE: These are NOT part of msgpack
Timestamp time.Time `msgpack:"-"`
ModelName string `msgpack:"-"`
PodName string `msgpack:"-"`
// contains filtered or unexported fields
}
BlockRemovedEvent represents blocks being removed from KV cache ------------------------------------------------------------ BlockRemoved (msgspec order) Python: [tag, block_hashes, medium] ------------------------------------------------------------
lora_id is unused for now.
Note: BlockHashes are converted at decode time: - vLLM legacy format (int64) → stored as-is - vLLM new format (32-byte SHA-256 from PR #23673) → first 8 bytes converted to int64
func (*BlockRemovedEvent) GetModelName ¶ added in v0.6.0
func (e *BlockRemovedEvent) GetModelName() string
func (*BlockRemovedEvent) GetPodName ¶ added in v0.6.0
func (e *BlockRemovedEvent) GetPodName() string
func (*BlockRemovedEvent) GetTimestamp ¶
func (e *BlockRemovedEvent) GetTimestamp() time.Time
func (*BlockRemovedEvent) GetType ¶
func (e *BlockRemovedEvent) GetType() EventType
type BlockStoredEvent ¶
type BlockStoredEvent struct {
Type EventType `msgpack:"-"`
BlockHashes []int64 // Decoded from vLLM, supports both old and new formats
ParentBlockHash *int64 // Decoded from vLLM, supports both old and new formats
TokenIDs [][]byte
// NOTE: These are NOT part of msgpack
Timestamp time.Time `msgpack:"-"`
ModelName string `msgpack:"-"`
PodName string `msgpack:"-"`
// contains filtered or unexported fields
}
BlockStoredEvent represents blocks being stored in KV cache ------------------------------------------------------------ BlockStored (msgspec encoding order) Python fields:
[tag, block_hashes, parent_block_hash, token_ids, block_size, lora_id, medium]
------------------------------------------------------------
lora_id and medium are unused for now.
Note: BlockHashes are converted at decode time: - vLLM legacy format (int64) → stored as-is - vLLM new format (32-byte SHA-256 from PR #23673) → first 8 bytes converted to int64 This ensures internal consistency and compatibility with existing code.
func (*BlockStoredEvent) GetModelName ¶ added in v0.6.0
func (e *BlockStoredEvent) GetModelName() string
func (*BlockStoredEvent) GetPodName ¶ added in v0.6.0
func (e *BlockStoredEvent) GetPodName() string
func (*BlockStoredEvent) GetTimestamp ¶
func (e *BlockStoredEvent) GetTimestamp() time.Time
func (*BlockStoredEvent) GetType ¶
func (e *BlockStoredEvent) GetType() EventType
type EventBatch ¶
EventBatch represents a batch of events from vLLM ------------------------------------------------------------ Batch object Python msgspec.Struct(array_like=True):
EventBatch encoded as:
[ ts, [<event1>, <event2>, ...] ]
------------------------------------------------------------
func DecodeEventBatch ¶
func DecodeEventBatch( data []byte, modelName string, podName string, ) (*EventBatch, error)
DecodeEventBatch parses a raw msgpack batch of events. The subscriber must supply batch timestamp + model/pod name.
type EventHandler ¶
EventHandler processes received KV events
type EventType ¶
type EventType string
EventType represents the type of KV cache event
const ( // EventTypeBlockStored indicates that blocks have been stored in the KV cache EventTypeBlockStored EventType = "BlockStored" // EventTypeBlockRemoved indicates that blocks have been removed from the KV cache EventTypeBlockRemoved EventType = "BlockRemoved" // EventTypeAllCleared indicates that all blocks have been cleared from the cache EventTypeAllCleared EventType = "AllBlocksCleared" )
type KVCacheMetrics ¶
type KVCacheMetrics struct {
// contains filtered or unexported fields
}
KVCacheMetrics holds all KV cache metrics
type KVEvent ¶
type KVEvent interface {
GetType() EventType
// Timestamp of a KV event is shared among all events in the same EventBatch
GetTimestamp() time.Time
GetModelName() string
GetPodName() string
// contains filtered or unexported methods
}
KVEvent is the base interface for all KV cache events
type ZMQClient ¶
type ZMQClient struct {
// contains filtered or unexported fields
}
ZMQClient stub implementation when ZMQ is not available
func NewZMQClient ¶
func NewZMQClient(config *ZMQClientConfig, handler EventHandler) *ZMQClient
NewZMQClient creates a stub ZMQ client
func (*ZMQClient) ReplayFrom ¶
ReplayFrom returns an error
func (*ZMQClient) SetLogLevel ¶
SetLogLevel is a no-op
func (*ZMQClient) Subscribe ¶
func (c *ZMQClient) Subscribe(handler EventHandler) error
Subscribe returns an error
type ZMQClientConfig ¶
type ZMQClientConfig struct {
PodKey string
PodIP string
ModelName string
PubPort int
RouterPort int
PollTimeout time.Duration
ReplayTimeout time.Duration
ReconnectDelay time.Duration
}
ZMQClientConfig contains configuration for the ZMQ client
func DefaultZMQClientConfig ¶
func DefaultZMQClientConfig(podKey, podIP, modelName string) *ZMQClientConfig
DefaultZMQClientConfig returns a default configuration
type ZMQClientMetrics ¶
type ZMQClientMetrics struct {
// contains filtered or unexported fields
}
ZMQClientMetrics holds all metrics for the ZMQ client
func NewZMQClientMetrics ¶
func NewZMQClientMetrics(podKey string) *ZMQClientMetrics
NewZMQClientMetrics creates a new metrics instance for a ZMQ client
func (*ZMQClientMetrics) Delete ¶
func (m *ZMQClientMetrics) Delete()
Delete removes all metrics for this pod (useful for cleanup)
func (*ZMQClientMetrics) IncrementConnectionCount ¶
func (m *ZMQClientMetrics) IncrementConnectionCount()
IncrementConnectionCount increments the connection counter
func (*ZMQClientMetrics) IncrementDisconnectionCount ¶
func (m *ZMQClientMetrics) IncrementDisconnectionCount()
IncrementDisconnectionCount increments the disconnection counter
func (*ZMQClientMetrics) IncrementErrorCount ¶
func (m *ZMQClientMetrics) IncrementErrorCount(errorType string)
IncrementErrorCount increments the error counter for a specific error type
func (*ZMQClientMetrics) IncrementEventCount ¶
func (m *ZMQClientMetrics) IncrementEventCount(eventType string)
IncrementEventCount increments the event counter for a specific event type
func (*ZMQClientMetrics) IncrementMissedEvents ¶
func (m *ZMQClientMetrics) IncrementMissedEvents(count int64)
IncrementMissedEvents increments the missed events counter
func (*ZMQClientMetrics) IncrementReconnectAttempts ¶
func (m *ZMQClientMetrics) IncrementReconnectAttempts()
IncrementReconnectAttempts increments the reconnect attempts counter
func (*ZMQClientMetrics) IncrementReplayCount ¶
func (m *ZMQClientMetrics) IncrementReplayCount()
IncrementReplayCount increments the replay request counter
func (*ZMQClientMetrics) IncrementReplayFailure ¶
func (m *ZMQClientMetrics) IncrementReplayFailure()
IncrementReplayFailure increments the failed replay counter
func (*ZMQClientMetrics) IncrementReplaySuccess ¶
func (m *ZMQClientMetrics) IncrementReplaySuccess()
IncrementReplaySuccess increments the successful replay counter
func (*ZMQClientMetrics) RecordEventProcessingLatency ¶
func (m *ZMQClientMetrics) RecordEventProcessingLatency(duration time.Duration)
RecordEventProcessingLatency records the time taken to process an event
func (*ZMQClientMetrics) UpdateLastSequenceID ¶
func (m *ZMQClientMetrics) UpdateLastSequenceID(seqID int64)
UpdateLastSequenceID updates the last processed sequence ID gauge