Documentation
¶
Overview ¶
This file contains the API types and data fetching logic for querying metric data from the in-memory metric store. It provides structures for building complex queries with support for aggregation, scaling, padding, and statistics computation.
Package metricstore provides buffer.go: Time-series data buffer implementation.
Buffer Architecture ¶
Each metric at each hierarchical level (cluster/host/cpu/etc.) uses a linked-list chain of fixed-size buffers to store time-series data. This design:
- Avoids reallocation/copying when growing (new links added instead)
- Enables efficient pooling (buffers returned to sync.Pool)
- Supports traversal back in time (via prev pointers)
- Maintains temporal ordering (newer data in later buffers)
Buffer Chain Example ¶
[oldest buffer] <- prev -- [older] <- prev -- [newest buffer (head)] start=1000 start=1512 start=2024 data=[v0...v511] data=[v0...v511] data=[v0...v42]
When the head buffer reaches capacity (BufferCap = 512), a new buffer becomes the new head and the old head is linked via prev.
Pooling Strategy ¶
sync.Pool reduces GC pressure for the common case (BufferCap-sized allocations). Non-standard capacity buffers are not pooled (e.g., from checkpoint deserialization).
Time Alignment ¶
Timestamps are aligned to measurement frequency intervals:
index = (timestamp - buffer.start) / buffer.frequency actualTime = buffer.start + (frequency / 2) + (index * frequency)
Missing data points are represented as NaN values. The read() function performs linear interpolation where possible.
This file implements checkpoint persistence for the in-memory metric store.
Checkpoints enable graceful restarts by periodically saving in-memory metric data to disk in JSON or binary format. The checkpoint system:
Key Features:
- Periodic background checkpointing via the Checkpointing() worker
- Two format families: JSON (human-readable) and WAL+binary (compact, crash-safe)
- Parallel checkpoint creation and loading using worker pools
- Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{json|bin}
- WAL file: checkpoint_dir/cluster/host/current.wal (append-only, per-entry)
- Only saves unarchived data (archived data is already persisted elsewhere)
- GC optimization during loading to prevent excessive heap growth
Checkpoint Workflow:
- Init() loads checkpoints within retention window at startup
- Checkpointing() worker periodically saves new data
- Shutdown() writes final checkpoint before exit
File Organization:
checkpoints/
cluster1/
host001/
1234567890.json (JSON format: full subtree snapshot)
1234567890.bin (binary format: full subtree snapshot)
current.wal (WAL format: append-only per-entry log)
host002/
...
Package metricstore provides config.go: Configuration structures and metric management.
Configuration Hierarchy ¶
The metricstore package uses nested configuration structures:
MetricStoreConfig (Keys) ├─ NumWorkers: Parallel checkpoint/archive workers ├─ RetentionInMemory: How long to keep data in RAM (also used as cleanup interval) ├─ MemoryCap: Memory limit in bytes (triggers forceFree) ├─ Checkpoints: Persistence configuration │ ├─ FileFormat: "json" or "wal" (default: "wal") │ └─ RootDir: Checkpoint storage path ├─ Cleanup: Long-term storage configuration (interval = RetentionInMemory) │ ├─ RootDir: Archive storage path (archive mode only) │ └─ Mode: "delete" or "archive" ├─ Debug: Development/debugging options └─ Subscriptions: NATS topic subscriptions for metric ingestion
Metric Configuration ¶
Each metric (e.g., "cpu_load", "mem_used") has a MetricConfig entry in the global Metrics map, defining:
- Frequency: Measurement interval in seconds
- Aggregation: How to combine values (sum/avg/none) when transforming scopes
- offset: Internal index into Level.metrics slice (assigned during Init)
AggregationStrategy ¶
Determines how to combine metric values when aggregating from finer to coarser scopes:
- NoAggregation: Do not combine (incompatible scopes)
- SumAggregation: Add values (e.g., power consumption: core→socket)
- AvgAggregation: Average values (e.g., temperature: core→socket)
Package metricstore provides level.go: Hierarchical tree structure for metric storage.
Level Architecture ¶
The Level type forms a tree structure where each node represents a level in the ClusterCockpit hierarchy: cluster → host → socket → core → hwthread, with special nodes for memory domains and accelerators.
Structure:
Root Level (cluster="emmy") ├─ Level (host="node001") │ ├─ Level (socket="0") │ │ ├─ Level (core="0") [stores cpu0 metrics] │ │ └─ Level (core="1") [stores cpu1 metrics] │ └─ Level (socket="1") │ └─ ... └─ Level (host="node002") └─ ...
Each Level can:
- Hold data (metrics slice of buffer pointers)
- Have child nodes (children map[string]*Level)
- Both simultaneously (inner nodes can store aggregated metrics)
Selector Paths ¶
Selectors are hierarchical paths: []string{"cluster", "host", "component"}. Example: []string{"emmy", "node001", "cpu0"} navigates to the cpu0 core level.
Concurrency ¶
RWMutex protects children map and metrics slice. Read-heavy workload (metric reads) uses RLock. Writes (new levels, buffer updates) use Lock. Double-checked locking prevents races during level creation.
This file implements ingestion of InfluxDB line-protocol metric data received over NATS. Each line encodes one metric sample with the following structure:
<measurement>[,cluster=<c>][,hostname=<h>][,type=<t>][,type-id=<id>][,subtype=<s>][,stype-id=<id>] value=<v> [<timestamp>]
The measurement name identifies the metric (e.g. "cpu_load"). Tags provide routing information (cluster, host) and optional sub-device selectors (type, subtype). Only one field is expected per line: "value".
After decoding, each sample is:
- Written to the in-memory store via ms.WriteToLevel.
- If the checkpoint format is "wal", also forwarded to the WAL staging goroutine via the WALMessages channel for durable write-ahead logging.
Package metricstore provides an efficient in-memory time-series metric storage system with support for hierarchical data organization, checkpointing, and archiving.
The package organizes metrics in a tree structure (cluster → host → component) and provides concurrent read/write access to metric data with configurable aggregation strategies. Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data, and enforcing retention policies.
Key features:
- In-memory metric storage with configurable retention
- Hierarchical data organization (selectors)
- Concurrent checkpoint/archive workers
- Support for sum and average aggregation
- NATS integration for metric ingestion
This file implements high-level query functions for loading job metric data with automatic scope transformation and aggregation.
Key Concepts:
Metric Scopes: Metrics are collected at different granularities (native scope):
- HWThread: Per hardware thread
- Core: Per CPU core
- Socket: Per CPU socket
- MemoryDomain: Per memory domain (NUMA)
- Accelerator: Per GPU/accelerator
- Node: Per compute node
Scope Transformation: The buildQueries functions transform between native scope and requested scope by:
- Aggregating finer-grained data (e.g., HWThread → Core → Socket → Node)
- Rejecting requests for finer granularity than available
- Handling special cases (e.g., Accelerator metrics)
Query Building: Constructs APIQuery structures with proper selectors (Type, TypeIds) based on cluster topology and job resources.
This file contains shared scope transformation logic used by both the internal metric store (pkg/metricstore) and the external cc-metric-store client (internal/metricstoreclient). It extracts the common algorithm for mapping between native metric scopes and requested scopes based on cluster topology.
Package metricstore provides walCheckpoint.go: WAL-based checkpoint implementation.
This replaces the Avro shadow tree with an append-only Write-Ahead Log (WAL) per host, eliminating the extra memory overhead of the AvroStore and providing truly continuous (per-write) crash safety.
Architecture ¶
Metric write (DecodeLine)
│
├─► WriteToLevel() → main MemoryStore (unchanged)
│
└─► WALMessages channel
│
▼
WALStaging goroutine
│
▼
checkpoints/cluster/host/current.wal (append-only, binary)
Periodic checkpoint (Checkpointing goroutine):
1. Write <timestamp>.bin snapshot (column-oriented, from main tree)
2. Signal WALStaging to truncate current.wal per host
On restart (FromCheckpoint):
1. Load most recent <timestamp>.bin snapshot
2. Replay current.wal (overwrite-safe: buffer.write handles duplicate timestamps)
WAL Record Format ¶
[4B magic 0xCC1DA7A1][4B payload_len][payload][4B CRC32] payload: [8B timestamp int64] [2B metric_name_len uint16][N metric name bytes] [1B selector_count uint8] per selector: [1B selector_len uint8][M selector bytes] [4B value float32 bits]
Binary Snapshot Format ¶
[4B magic 0xCC5B0001][8B from int64][8B to int64]
Level tree (recursive):
[4B num_metrics uint32]
per metric:
[2B name_len uint16][N name bytes]
[8B frequency int64][8B start int64]
[4B num_values uint32][num_values × 4B float32]
[4B num_children uint32]
per child: [2B name_len uint16][N name bytes] + Level (recursive)
Index ¶
- Constants
- Variables
- func BuildMetricList() map[string]MetricConfig
- func Checkpointing(wg *sync.WaitGroup, ctx context.Context)
- func CleanUp(wg *sync.WaitGroup, ctx context.Context)
- func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteInstead bool) (int, error)
- func DecodeLine(dec *lineprotocol.Decoder, ms *MemoryStore, clusterDefault string) error
- func ExtractTypeID(queryType *string, typeIds []string, ndx int, metric, hostname string) *string
- func Free(ms *MemoryStore, t time.Time) (int, error)
- func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, error)
- func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]string
- func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.WaitGroup)
- func InitMetrics(metrics map[string]MetricConfig)
- func IntToStringSlice(is []int) []string
- func IsMetricRemovedForSubCluster(mc *schema.MetricConfig, subCluster string) bool
- func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context)
- func ReceiveNats(ms *MemoryStore, workers int, ctx context.Context) error
- func Retention(wg *sync.WaitGroup, ctx context.Context)
- func RotateWALFiles(hostDirs []string)
- func RotateWALFilesAfterShutdown(hostDirs []string)
- func SanitizeStats(avg, min, max *schema.Float)
- func Shutdown()
- func WALStaging(wg *sync.WaitGroup, ctx context.Context)
- type APIMetricData
- type APIQuery
- type APIQueryRequest
- type APIQueryResponse
- type AggregationStrategy
- type CheckpointFile
- type CheckpointMetrics
- type Checkpoints
- type Cleanup
- type Debug
- type GlobalState
- type HealthCheckReq
- type HealthCheckResponse
- type HealthCheckResult
- type InternalMetricStore
- func (ccms *InternalMetricStore) HealthCheck(cluster string, nodes []string, metrics []string) (map[string]HealthCheckResult, error)
- func (ccms *InternalMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ...) (schema.JobData, error)
- func (ccms *InternalMetricStore) LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, ...) (map[string]map[string][]*schema.JobMetric, error)
- func (ccms *InternalMetricStore) LoadNodeListData(cluster, subCluster string, nodes []string, metrics []string, ...) (map[string]schema.JobData, error)
- func (ccms *InternalMetricStore) LoadScopedStats(job *schema.Job, metrics []string, scopes []schema.MetricScope, ...) (schema.ScopedJobStats, error)
- func (ccms *InternalMetricStore) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
- type Level
- type MemoryStore
- func (m *MemoryStore) DebugDump(w *bufio.Writer, selector []string) error
- func (m *MemoryStore) ForceFree() (int, error)
- func (m *MemoryStore) Free(selector []string, t int64) (int, error)
- func (m *MemoryStore) FreeAll() error
- func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error)
- func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error)
- func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []string) ([]string, []string, error)
- func (m *MemoryStore) GetLevel(selector []string) *Level
- func (ms *MemoryStore) GetMetricFrequency(metricName string) (int64, error)
- func (ms *MemoryStore) GetPaths(targetDepth int) [][]string
- func (m *MemoryStore) HealthCheck(cluster string, nodes []string, expectedMetrics []string) (map[string]HealthCheckResult, error)
- func (m *MemoryStore) ListChildren(selector []string) []string
- func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]schema.Float, int64, int64, int64, error)
- func (ms *MemoryStore) SetNodeProvider(provider NodeProvider)
- func (m *MemoryStore) SizeInBytes() int64
- func (m *MemoryStore) SizeInGB() float64
- func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int64) (*Stats, int64, int64, error)
- func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error)
- func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string, error)
- func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
- func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metrics []Metric) error
- type Metric
- type MetricConfig
- type MetricStoreConfig
- type NodeProvider
- type ParquetMetricRow
- type PersistentBufferPool
- type ScopeQueryResult
- type Stats
- type Subscriptions
- type WALMessage
Constants ¶
const ( CheckpointFilePerms = 0o644 // File permissions for checkpoint files CheckpointDirPerms = 0o755 // Directory permissions for checkpoint directories GCTriggerInterval = DefaultGCTriggerInterval // Interval for triggering GC during checkpoint loading )
const ( DefaultMaxWorkers = 10 DefaultBufferCapacity = 512 DefaultGCTriggerInterval = 100 DefaultMemoryUsageTrackerInterval = 1 * time.Hour )
const BufferCap int = DefaultBufferCapacity
BufferCap is the default buffer capacity. buffer.data will only ever grow up to its capacity and a new link in the buffer chain will be created if needed so that no copying of data or reallocation needs to happen on writes.
const MaxMissingDataPoints int64 = 5
MaxMissingDataPoints is the threshold for stale data detection. A buffer is considered healthy if the gap between its last data point and the current time is within MaxMissingDataPoints * frequency.
Variables ¶
var ( // ErrNoHostOrMetric is returned when the metric store does not find the host or the metric ErrNoHostOrMetric error = errors.New("[METRICSTORE]> metric or host not found") // ErrInvalidTimeRange is returned when a query has 'from' >= 'to' ErrInvalidTimeRange = errors.New("[METRICSTORE]> invalid time range: 'from' must be before 'to'") // ErrEmptyCluster is returned when a query with ForAllNodes has no cluster specified ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty") )
var ( // ErrNoData indicates no time-series data exists for the requested metric/level. ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level") // ErrDataDoesNotAlign indicates that aggregated data from child scopes // does not align with the parent scope's expected timestamps/intervals. ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align") )
var ( HWThreadString = string(schema.MetricScopeHWThread) CoreString = string(schema.MetricScopeCore) MemoryDomainString = string(schema.MetricScopeMemoryDomain) SocketString = string(schema.MetricScopeSocket) AcceleratorString = string(schema.MetricScopeAccelerator) )
Pre-converted scope strings avoid repeated string(MetricScope) allocations during query construction. Used in ScopeQueryResult.Type field.
var ErrNoNewArchiveData error = errors.New("all data already archived")
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
TestLoadDataCallback allows tests to override LoadData behavior for testing purposes. When set to a non-nil function, LoadData will call this function instead of the default implementation.
var WALMessages = make(chan *WALMessage, 4096)
WALMessages is the channel for sending metric writes to the WAL staging goroutine. Buffered to allow burst writes without blocking the metric ingestion path.
Functions ¶
func BuildMetricList ¶
func BuildMetricList() map[string]MetricConfig
func Checkpointing ¶
Checkpointing starts a background worker that periodically saves metric data to disk.
Checkpoints are written every 12 hours (hardcoded).
Format behaviour:
- "json": Periodic checkpointing every checkpointInterval
- "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
func CleanupCheckpoints ¶
func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteInstead bool) (int, error)
CleanupCheckpoints deletes or archives all checkpoint files older than `from`. When archiving, consolidates all hosts per cluster into a single Parquet file.
func DecodeLine ¶
func DecodeLine(dec *lineprotocol.Decoder, ms *MemoryStore, clusterDefault string, ) error
DecodeLine reads all lines from dec (InfluxDB line-protocol) and writes each decoded metric sample into ms.
clusterDefault is used as the cluster name for lines that do not carry a "cluster" tag. Callers typically supply the ClusterTag value from the NATS subscription configuration.
Performance notes:
- A decodeState is obtained from decodeStatePool to reuse scratch buffers.
- The Level pointer (host-level node in the metric tree) is cached across consecutive lines that share the same cluster+host pair to avoid repeated lock acquisitions on the root and cluster levels.
- []byte→string conversions for type/subtype selectors are cached via prevType*/prevSubType* fields because batches typically repeat the same sub-device identifiers.
- Timestamp parsing tries Second precision first; if that fails it retries Millisecond, Microsecond, and Nanosecond in turn. A missing timestamp falls back to time.Now().
When the checkpoint format is "wal" each successfully decoded sample is also sent to WALMessages so the WAL staging goroutine can persist it durably before the next binary snapshot.
func ExtractTypeID ¶
ExtractTypeID returns the type ID at the given index from a query's TypeIds slice. Returns nil if queryType is nil (no type filtering). Logs a warning and returns nil if the index is out of range.
func Free ¶
func Free(ms *MemoryStore, t time.Time) (int, error)
Free removes metric data older than the given time while preserving data for active nodes.
This function implements intelligent retention by consulting the NodeProvider (if configured) to determine which nodes are currently in use by running jobs. Data for these nodes is preserved even if older than the retention time.
Parameters:
- ms: The MemoryStore instance
- t: Time threshold - buffers with data older than this will be freed
Returns:
- Number of buffers freed
- Error if NodeProvider query fails
Behavior:
- If no NodeProvider is set: frees all buffers older than t
- If NodeProvider returns empty map: frees all buffers older than t
- Otherwise: preserves buffers for nodes returned by GetUsedNodes(), frees others
func FreeSelected ¶
FreeSelected frees buffers for specific selectors while preserving others.
This function is used when we want to retain some specific nodes beyond the retention time. It iterates through the provided selectors and frees their associated buffers.
Parameters:
- ms: The MemoryStore instance
- selectors: List of selector paths to free (e.g., [["cluster1", "node1"], ["cluster2", "node2"]])
- t: Time threshold for freeing buffers
Returns the total number of buffers freed and any error encountered.
func GetSelectors ¶
func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]string
GetSelectors returns all selectors at depth 2 (cluster/node level) that are NOT in the exclusion map.
This function generates a list of selectors whose buffers should be freed by excluding selectors that correspond to nodes currently in use by running jobs.
Parameters:
- ms: The MemoryStore instance
- excludeSelectors: Map of cluster names to node hostnames that should NOT be freed
Returns a list of selectors ([]string paths) that can be safely freed.
Example:
If the tree has paths ["emmy", "node001"] and ["emmy", "node002"],
and excludeSelectors contains {"emmy": ["node001"]},
then only [["emmy", "node002"]] is returned.
func Init ¶
func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.WaitGroup)
Init initializes the metric store from configuration and starts background workers.
This function must be called exactly once before any other metricstore operations. It performs the following initialization steps:
- Validates and decodes the metric store configuration
- Configures worker pool size (defaults to NumCPU/2+1, max 10)
- Loads metric configurations from all registered clusters
- Restores checkpoints within the retention window
- Starts background workers for retention, checkpointing, archiving, and monitoring
- Optionally subscribes to NATS for real-time metric ingestion
Parameters:
- rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
- wg: WaitGroup that will be incremented for each background goroutine started
The function will call cclog.Fatal on critical errors during initialization. Use Shutdown() to cleanly stop all background workers started by Init().
Note: Signal handling must be implemented by the caller. Call Shutdown() when receiving termination signals to ensure checkpoint data is persisted.
func InitMetrics ¶
func InitMetrics(metrics map[string]MetricConfig)
InitMetrics initializes the singleton MemoryStore instance with the given metric configurations.
This function must be called before GetMemoryStore() and can only be called once due to the singleton pattern. It assigns each metric an internal offset for efficient buffer indexing.
Parameters:
- metrics: Map of metric names to their configurations (frequency and aggregation strategy)
Panics if any metric has Frequency == 0, which indicates an invalid configuration.
After this call, the global msInstance is ready for use via GetMemoryStore().
func IntToStringSlice ¶
IntToStringSlice converts a slice of integers to a slice of strings. Used to convert hardware thread/core/socket IDs from topology (int) to query TypeIds (string). Optimized to reuse a byte buffer for string conversion, reducing allocations.
func IsMetricRemovedForSubCluster ¶
func IsMetricRemovedForSubCluster(mc *schema.MetricConfig, subCluster string) bool
IsMetricRemovedForSubCluster checks whether a metric is marked as removed for the given subcluster in its per-subcluster configuration.
func MemoryUsageTracker ¶
MemoryUsageTracker starts a background goroutine that monitors memory usage.
This worker checks actual process memory usage (via runtime.MemStats) periodically and force-frees buffers if memory exceeds the configured cap. It uses FreeOSMemory() to return memory to the OS after freeing buffers, avoiding aggressive GC that causes performance issues.
The tracker logs both actual memory usage (heap allocated) and metric data size for visibility into memory overhead from Go runtime structures and allocations.
Parameters:
- wg: WaitGroup to signal completion when context is cancelled
- ctx: Context for cancellation signal
The goroutine exits when ctx is cancelled.
func ReceiveNats ¶
func ReceiveNats(ms *MemoryStore, workers int, ctx context.Context, ) error
ReceiveNats subscribes to all configured NATS subjects and feeds incoming line-protocol messages into the MemoryStore.
When workers > 1 a pool of goroutines drains a shared channel so that multiple messages can be decoded in parallel. With workers == 1 the NATS callback decodes inline (no channel overhead, lower latency).
The function blocks until ctx is cancelled and all worker goroutines have finished. It returns nil when the NATS client is not configured; callers should treat that as a no-op rather than an error.
func Retention ¶
Retention starts a background goroutine that periodically frees old metric data.
This worker runs at half the retention interval and calls Free() to remove buffers older than the configured retention time. It respects the NodeProvider to preserve data for nodes with active jobs.
Parameters:
- wg: WaitGroup to signal completion when context is cancelled
- ctx: Context for cancellation signal
The goroutine exits when ctx is cancelled.
func RotateWALFiles ¶
func RotateWALFiles(hostDirs []string)
RotateWALFiles sends rotation requests for the given host directories and blocks until all rotations complete.
func RotateWALFilesAfterShutdown ¶
func RotateWALFilesAfterShutdown(hostDirs []string)
RotateWALFiles sends rotation requests for the given host directories and blocks until all rotations complete.
func SanitizeStats ¶
SanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling. If ANY of avg/min/max is NaN, ALL three are zeroed for consistency.
func Shutdown ¶
func Shutdown()
Shutdown performs a graceful shutdown of the metric store.
This function cancels all background goroutines started by Init() and writes a final checkpoint to disk before returning. It should be called when the application receives a termination signal.
The function will:
- Cancel the context to stop all background workers
- Close the WAL messages channel if using WAL format
- Write a final checkpoint to preserve in-memory data
- Log any errors encountered during shutdown
Note: This function blocks until the final checkpoint is written.
Types ¶
type APIMetricData ¶
type APIMetricData struct {
Error *string `json:"error,omitempty"`
Data schema.FloatArray `json:"data,omitempty"`
From int64 `json:"from"`
To int64 `json:"to"`
Resolution int64 `json:"resolution"`
Avg schema.Float `json:"avg"`
Min schema.Float `json:"min"`
Max schema.Float `json:"max"`
}
APIMetricData represents the response data for a single metric query.
It contains both the time-series data points and computed statistics (avg, min, max). If an error occurred during data retrieval, the Error field will be set and other fields may be incomplete.
func (*APIMetricData) AddStats ¶
func (data *APIMetricData) AddStats()
AddStats computes and populates the Avg, Min, and Max fields from the Data array.
NaN values in the data are ignored during computation. If all values are NaN, the statistics fields will be set to NaN.
TODO: Optimize this, just like the stats endpoint!
func (*APIMetricData) PadDataWithNull ¶
func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metric string)
PadDataWithNull pads the beginning of the data array with NaN values if needed.
This ensures that the data aligns with the requested 'from' timestamp, even if the metric store doesn't have data for the earliest time points. This is useful for maintaining consistent array indexing across multiple queries.
Parameters:
- ms: MemoryStore instance to lookup metric configuration
- from: The requested start timestamp
- to: The requested end timestamp (unused but kept for API consistency)
- metric: The metric name to lookup frequency information
func (*APIMetricData) ScaleBy ¶
func (data *APIMetricData) ScaleBy(f schema.Float)
ScaleBy multiplies all data points and statistics by the given factor.
This is commonly used for unit conversion (e.g., bytes to gigabytes). Scaling by 0 or 1 is a no-op for performance reasons.
type APIQuery ¶
type APIQuery struct {
Type *string `json:"type,omitempty"`
SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"`
Hostname string `json:"host"`
Resolution int64 `json:"resolution"`
TypeIds []string `json:"type-ids,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"`
ScaleFactor schema.Float `json:"scale-by,omitempty"`
Aggregate bool `json:"aggreg"`
}
APIQuery represents a single metric query with optional hierarchical selectors.
The hierarchical selection works as follows:
- Hostname: The node to query
- Type + TypeIds: First level of hierarchy (e.g., "cpu" + ["0", "1", "2"])
- SubType + SubTypeIds: Second level of hierarchy (e.g., "core" + ["0", "1"])
If Aggregate is true, data from multiple type/subtype IDs will be aggregated according to the metric's aggregation strategy. Otherwise, separate results are returned for each combination.
type APIQueryRequest ¶
type APIQueryRequest struct {
Cluster string `json:"cluster"`
Queries []APIQuery `json:"queries"`
ForAllNodes []string `json:"for-all-nodes"`
From int64 `json:"from"`
To int64 `json:"to"`
WithStats bool `json:"with-stats"`
WithData bool `json:"with-data"`
WithPadding bool `json:"with-padding"`
}
APIQueryRequest represents a batch query request for metric data.
It supports two modes of operation:
- Explicit queries via the Queries field
- Automatic query generation via ForAllNodes (queries all specified metrics for all nodes in the cluster)
The request can be customized with flags to include/exclude statistics, raw data, and padding.
type APIQueryResponse ¶
type APIQueryResponse struct {
Queries []APIQuery `json:"queries,omitempty"`
Results [][]APIMetricData `json:"results"`
}
APIQueryResponse represents the response to an APIQueryRequest.
Results is a 2D array where each outer element corresponds to a query, and each inner element corresponds to a selector within that query (e.g., multiple CPUs or cores).
func FetchData ¶
func FetchData(req APIQueryRequest) (*APIQueryResponse, error)
FetchData executes a batch metric query request and returns the results.
This is the primary API for retrieving metric data from the memory store. It supports:
- Individual queries via req.Queries
- Batch queries for all nodes via req.ForAllNodes
- Hierarchical selector construction (cluster → host → type → subtype)
- Optional statistics computation (avg, min, max)
- Optional data scaling
- Optional data padding with NaN values
The function constructs selectors based on the query parameters and calls MemoryStore.Read() for each selector. If a query specifies Aggregate=false with multiple type/subtype IDs, separate results are returned for each combination.
Parameters:
- req: The query request containing queries, time range, and options
Returns:
- APIQueryResponse containing results for each query, or error if validation fails
Errors:
- ErrInvalidTimeRange if req.From > req.To
- ErrEmptyCluster if req.ForAllNodes is used without specifying a cluster
- Error if MemoryStore is not initialized
- Individual query errors are stored in APIMetricData.Error field
type AggregationStrategy ¶
type AggregationStrategy int
AggregationStrategy defines how to combine metric values across hierarchy levels.
Used when transforming data from finer-grained scopes (e.g., core) to coarser scopes (e.g., socket). This is SPATIAL aggregation, not TEMPORAL (time-based) aggregation.
Values:
- NoAggregation: Do not aggregate (incompatible scopes or non-aggregatable metrics)
- SumAggregation: Add values (e.g., power: sum core power → socket power)
- AvgAggregation: Average values (e.g., temperature: average core temps → socket temp)
const ( NoAggregation AggregationStrategy = iota // Do not aggregate SumAggregation // Sum values (e.g., power, energy) AvgAggregation // Average values (e.g., temperature, utilization) )
func AssignAggregationStrategy ¶
func AssignAggregationStrategy(str string) (AggregationStrategy, error)
AssignAggregationStrategy parses a string into an AggregationStrategy value.
Used when loading metric configurations from JSON/YAML files.
Parameters:
- str: "sum", "avg", or "" (empty string for NoAggregation)
Returns:
- AggregationStrategy: Parsed value
- error: Non-nil if str is unrecognized
type CheckpointFile ¶
type CheckpointFile struct {
Metrics map[string]*CheckpointMetrics `json:"metrics"`
Children map[string]*CheckpointFile `json:"children"`
From int64 `json:"from"`
To int64 `json:"to"`
}
CheckpointFile represents the hierarchical structure of a checkpoint file. It mirrors the Level tree structure from the MemoryStore.
type CheckpointMetrics ¶
type CheckpointMetrics struct {
Data []schema.Float `json:"data"`
Frequency int64 `json:"frequency"`
Start int64 `json:"start"`
}
CheckpointMetrics represents metric data in a checkpoint file. Whenever the structure changes, update MarshalJSON as well!
func (*CheckpointMetrics) MarshalJSON ¶
func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error)
MarshalJSON provides optimized JSON encoding for CheckpointMetrics.
Since schema.Float has custom MarshalJSON, serializing []Float has significant overhead. This method manually constructs JSON to avoid allocations and interface conversions.
type Checkpoints ¶
type Checkpoints struct {
FileFormat string `json:"file-format"`
RootDir string `json:"directory"`
}
Checkpoints configures periodic persistence of in-memory metric data.
Fields:
- FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
- RootDir: Filesystem path for checkpoint files (created if missing)
type Cleanup ¶
Cleanup configures long-term storage of old metric data.
Data older than RetentionInMemory is archived to disk or deleted. The cleanup interval is always RetentionInMemory.
Fields:
- RootDir: Filesystem path for archived data (used in "archive" mode)
- Mode: "delete" (discard old data) or "archive" (write to RootDir)
type Debug ¶
Debug provides development and profiling options.
Fields:
- DumpToFile: Path to dump checkpoint data for inspection (empty = disabled)
- EnableGops: Enable gops agent for live runtime debugging (https://github.com/google/gops)
type GlobalState ¶
type GlobalState struct {
// contains filtered or unexported fields
}
GlobalState holds the global state for the metric store with thread-safe access.
type HealthCheckReq ¶
type HealthCheckResponse ¶
type HealthCheckResponse struct {
Status schema.MonitoringState
Error error
}
HealthCheckResponse represents the result of a health check operation.
type HealthCheckResult ¶
type HealthCheckResult struct {
State schema.MonitoringState
HealthMetrics string // JSON: {"missing":[...],"degraded":[...]}
}
HealthCheckResult holds the monitoring state and raw JSON health metrics for a single node as determined by HealthCheck.
type InternalMetricStore ¶
type InternalMetricStore struct{}
var MetricStoreHandle *InternalMetricStore
func (*InternalMetricStore) HealthCheck ¶
func (ccms *InternalMetricStore) HealthCheck(cluster string, nodes []string, metrics []string, ) (map[string]HealthCheckResult, error)
HealthCheck delegates to the internal MemoryStore's HealthCheck.
func (*InternalMetricStore) LoadData ¶
func (ccms *InternalMetricStore) LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int, ) (schema.JobData, error)
LoadData loads metric data for a specific job with automatic scope transformation.
This is the primary function for retrieving job metric data. It handles:
- Building queries with scope transformation via buildQueries
- Fetching data from the metric store
- Organizing results by metric and scope
- Converting NaN statistics to 0 for JSON compatibility
- Partial error handling (returns data for successful queries even if some fail)
Parameters:
- job: Job metadata including cluster, resources, and time range
- metrics: List of metric names to load
- scopes: Requested metric scopes (will be transformed to match native scopes)
- ctx: Context for cancellation (currently unused but reserved for future use)
- resolution: Data resolution in seconds (0 for native resolution)
Returns:
- JobData: Map of metric → scope → JobMetric with time-series data and statistics
- Error: Returns error if query building or fetching fails, or partial error listing failed hosts
Example:
jobData, err := LoadData(job, []string{"cpu_load", "mem_used"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 60)
func (*InternalMetricStore) LoadNodeData ¶
func (ccms *InternalMetricStore) LoadNodeData( cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context, ) (map[string]map[string][]*schema.JobMetric, error)
LoadNodeData loads metric data for specific nodes in a cluster over a time range.
Unlike LoadData which operates on job resources, this function queries arbitrary nodes directly. Useful for system monitoring and node status views.
Parameters:
- cluster: Cluster name
- metrics: List of metric names
- nodes: List of node hostnames (nil = all nodes in cluster via ForAllNodes)
- scopes: Requested metric scopes (currently unused - always node scope)
- from, to: Time range
- ctx: Context (currently unused)
Returns:
- Map of hostname → metric → []JobMetric
- Error or partial error listing failed queries
func (*InternalMetricStore) LoadNodeListData ¶
func (ccms *InternalMetricStore) LoadNodeListData( cluster, subCluster string, nodes []string, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, ctx context.Context, ) (map[string]schema.JobData, error)
LoadNodeListData loads metric data for a list of nodes with full scope transformation support.
This is the most flexible node data loading function, supporting arbitrary scopes and resolution. Uses buildNodeQueries for proper scope transformation based on topology.
Parameters:
- cluster: Cluster name
- subCluster: SubCluster name (empty string to infer from node names)
- nodes: List of node hostnames
- metrics: List of metric names
- scopes: Requested metric scopes
- resolution: Data resolution in seconds
- from, to: Time range
- ctx: Context (currently unused)
Returns:
- Map of hostname → JobData (metric → scope → JobMetric)
- Error or partial error listing failed queries
func (*InternalMetricStore) LoadScopedStats ¶
func (ccms *InternalMetricStore) LoadScopedStats( job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, ) (schema.ScopedJobStats, error)
LoadScopedStats loads metric statistics for a job with scope-aware grouping.
Similar to LoadStats but supports multiple scopes and returns statistics grouped by scope with hardware IDs (e.g., per-core, per-socket statistics).
Parameters:
- job: Job metadata
- metrics: List of metric names
- scopes: Requested metric scopes
- ctx: Context (currently unused)
Returns:
- ScopedJobStats: Map of metric → scope → []ScopedStats (with hostname and ID)
- Error or partial error listing failed queries
func (*InternalMetricStore) LoadStats ¶
func (ccms *InternalMetricStore) LoadStats( job *schema.Job, metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error)
LoadStats loads only metric statistics (avg/min/max) for a job at node scope.
This is an optimized version of LoadData that fetches only statistics without time-series data, reducing bandwidth and memory usage. Always queries at node scope.
Parameters:
- job: Job metadata
- metrics: List of metric names
- ctx: Context (currently unused)
Returns:
- Map of metric → hostname → statistics
- Error on query building or fetching failure
type Level ¶
type Level struct {
// contains filtered or unexported fields
}
Level represents a node in the hierarchical metric storage tree.
Can be both a leaf or inner node. Inner nodes hold data in 'metrics' for aggregated values (e.g., socket-level metrics derived from core-level data). Named "Level" instead of "node" to avoid confusion with cluster nodes (hosts).
Fields:
- children: Map of child level names to Level pointers (e.g., "cpu0" → Level)
- metrics: Slice of buffer pointers (one per metric, indexed by MetricConfig.offset)
- lock: RWMutex for concurrent access (read-heavy, write-rare)
type MemoryStore ¶
type MemoryStore struct {
Metrics map[string]MetricConfig
// contains filtered or unexported fields
}
MemoryStore is the main in-memory time-series metric storage implementation.
It organizes metrics in a hierarchical tree structure where each level represents a component of the system hierarchy (e.g., cluster → host → CPU). Each level can store multiple metrics as time-series buffers.
The store is initialized as a singleton via InitMetrics() and accessed via GetMemoryStore(). All public methods are safe for concurrent use.
func GetMemoryStore ¶
func GetMemoryStore() *MemoryStore
GetMemoryStore returns the singleton MemoryStore instance.
Returns the initialized MemoryStore singleton. Calls cclog.Fatal if InitMetrics() was not called first.
This function is safe for concurrent use after initialization.
func (*MemoryStore) DebugDump ¶
func (m *MemoryStore) DebugDump(w *bufio.Writer, selector []string) error
func (*MemoryStore) ForceFree ¶
func (m *MemoryStore) ForceFree() (int, error)
ForceFree unconditionally removes the oldest buffer from each metric chain.
func (*MemoryStore) Free ¶
func (m *MemoryStore) Free(selector []string, t int64) (int, error)
Free releases all buffers for the selected level and all its children that contain only values older than `t`.
func (*MemoryStore) FreeAll ¶
func (m *MemoryStore) FreeAll() error
func (*MemoryStore) FromCheckpoint ¶
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error)
FromCheckpoint loads checkpoint files from disk into memory in parallel.
Uses worker pool to load cluster/host combinations. Returns number of files loaded and any errors.
func (*MemoryStore) FromCheckpointFiles ¶
func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error)
FromCheckpointFiles is the main entry point for loading checkpoints at startup.
Creates checkpoint directory if it doesn't exist. This function must be called before any writes or reads, and can only be called once.
func (*MemoryStore) GetHealthyMetrics ¶
func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []string) ([]string, []string, error)
GetHealthyMetrics returns missing and degraded metric lists for a node.
It walks the metric tree starting from the node identified by selector and classifies each expected metric:
- Missing: no buffer anywhere in the subtree, or metric not in global config
- Degraded: at least one stale buffer exists in the subtree
Metrics present in expectedMetrics but absent from both returned lists are considered fully healthy.
func (*MemoryStore) GetLevel ¶
func (m *MemoryStore) GetLevel(selector []string) *Level
func (*MemoryStore) GetMetricFrequency ¶
func (ms *MemoryStore) GetMetricFrequency(metricName string) (int64, error)
func (*MemoryStore) GetPaths ¶
func (ms *MemoryStore) GetPaths(targetDepth int) [][]string
GetPaths returns a list of lists (paths) to the specified depth.
func (*MemoryStore) HealthCheck ¶
func (m *MemoryStore) HealthCheck(cluster string, nodes []string, expectedMetrics []string, ) (map[string]HealthCheckResult, error)
HealthCheck evaluates multiple nodes against a set of expected metrics and returns a monitoring state per node.
States:
- MonitoringStateFull: all expected metrics are healthy
- MonitoringStatePartial: some metrics are missing or degraded
- MonitoringStateFailed: node not found, or no healthy metrics at all
func (*MemoryStore) ListChildren ¶
func (m *MemoryStore) ListChildren(selector []string) []string
ListChildren , given a selector, returns a list of all children of the level selected.
func (*MemoryStore) Read ¶
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]schema.Float, int64, int64, int64, error)
Read returns all values for metric `metric` from `from` to `to` for the selected level(s). If the level does not hold the metric itself, the data will be aggregated recursively from the children. The second and third return value are the actual from/to for the data. Those can be different from the range asked for if no data was available.
func (*MemoryStore) SetNodeProvider ¶
func (ms *MemoryStore) SetNodeProvider(provider NodeProvider)
SetNodeProvider sets the NodeProvider implementation for the MemoryStore. This must be called during initialization to provide job state information for selective buffer retention during Free operations. If not set, the Free function will fall back to freeing all buffers.
func (*MemoryStore) SizeInBytes ¶
func (m *MemoryStore) SizeInBytes() int64
func (*MemoryStore) SizeInGB ¶
func (m *MemoryStore) SizeInGB() float64
func (*MemoryStore) Stats ¶
func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int64) (*Stats, int64, int64, error)
Returns statistics for the requested metric on the selected node/level. Data is aggregated to the selected level the same way as in `MemoryStore.Read`. If `Stats.Samples` is zero, the statistics should not be considered as valid.
func (*MemoryStore) ToCheckpoint ¶
func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error)
ToCheckpoint writes metric data to checkpoint files in parallel (JSON format).
Metrics at root and cluster levels are skipped. One file per host is created. Uses worker pool (Keys.NumWorkers) for parallel processing. Only locks one host at a time, allowing concurrent writes/reads to other hosts.
Returns the number of checkpoint files created and any errors encountered.
func (*MemoryStore) ToCheckpointWAL ¶
ToCheckpointWAL writes binary snapshot files for all hosts in parallel. Returns the number of files written, the list of host directories that were successfully checkpointed (for WAL rotation), and any errors.
func (*MemoryStore) Write ¶
func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
Write all values in `metrics` to the level specified by `selector` for time `ts`. Look at `findLevelOrCreate` for how selectors work.
func (*MemoryStore) WriteToLevel ¶
WriteToLevel assumes that `minfo` in `metrics` is filled in
type Metric ¶
type Metric struct {
Name string
Value schema.Float
// MetricConfig contains frequency and aggregation settings for this metric.
// If Frequency is 0, configuration will be looked up from MemoryStore.Metrics during Write().
MetricConfig MetricConfig
}
Metric represents a single metric data point to be written to the store.
type MetricConfig ¶
type MetricConfig struct {
// Interval in seconds at which measurements are stored
Frequency int64
// Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy.
Aggregation AggregationStrategy
// contains filtered or unexported fields
}
MetricConfig defines configuration for a single metric type.
Stored in the global Metrics map, keyed by metric name (e.g., "cpu_load").
Fields:
- Frequency: Measurement interval in seconds (e.g., 60 for 1-minute granularity)
- Aggregation: How to combine values across hierarchy levels (sum/avg/none)
- offset: Internal index into Level.metrics slice (assigned during Init)
type MetricStoreConfig ¶
type MetricStoreConfig struct {
// Number of concurrent workers for checkpoint and archive operations.
// If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10)
NumWorkers int `json:"num-workers"`
RetentionInMemory string `json:"retention-in-memory"`
MemoryCap int `json:"memory-cap"`
Checkpoints Checkpoints `json:"checkpoints"`
Debug *Debug `json:"debug"`
Cleanup *Cleanup `json:"cleanup"`
Subscriptions *Subscriptions `json:"nats-subscriptions"`
}
MetricStoreConfig defines the main configuration for the metricstore.
Loaded from cc-backend's config.json "metricstore" section. Controls memory usage, persistence, archiving, and metric ingestion.
Fields:
- NumWorkers: Parallel workers for checkpoint/archive (0 = auto: min(NumCPU/2+1, 10))
- RetentionInMemory: Duration string (e.g., "48h") for in-memory data retention
- MemoryCap: Max bytes for buffer data (0 = unlimited); triggers forceFree when exceeded
- Checkpoints: Periodic persistence configuration
- Debug: Development/profiling options (nil = disabled)
- Archive: Long-term storage configuration (nil = disabled)
- Subscriptions: NATS topics for metric ingestion (nil = polling only)
var Keys MetricStoreConfig = MetricStoreConfig{ Checkpoints: Checkpoints{ FileFormat: "wal", RootDir: "./var/checkpoints", }, Cleanup: &Cleanup{ Mode: "delete", }, }
Keys is the global metricstore configuration instance.
Initialized with defaults, then overwritten by cc-backend's config.json. Accessed by Init(), Checkpointing(), and other lifecycle functions.
type NodeProvider ¶
type NodeProvider interface {
// GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames
// that are currently in use by jobs that started before the given timestamp.
//
// Parameters:
// - ts: Unix timestamp threshold - returns nodes with jobs started before this time
//
// Returns:
// - Map of cluster names to lists of node hostnames that should be excluded from garbage collection
// - Error if the query fails
GetUsedNodes(ts int64) (map[string][]string, error)
}
NodeProvider provides information about nodes currently in use by running jobs.
This interface allows metricstore to query job information without directly depending on the repository package, breaking the import cycle.
Implementations should return nodes that are actively processing jobs started before the given timestamp. These nodes will be excluded from retention-based garbage collection to prevent data loss for jobs that are still running or recently completed.
type ParquetMetricRow ¶
type ParquetMetricRow struct {
Cluster string `parquet:"cluster"`
Hostname string `parquet:"hostname"`
Metric string `parquet:"metric"`
Scope string `parquet:"scope"`
ScopeID string `parquet:"scope_id"`
Timestamp int64 `parquet:"timestamp"`
Frequency int64 `parquet:"frequency"`
Value float32 `parquet:"value"`
}
ParquetMetricRow is the long-format schema for archived metric data. One row per (host, metric, scope, scope_id, timestamp) data point. Sorted by (cluster, hostname, metric, timestamp) for optimal compression.
type PersistentBufferPool ¶
type PersistentBufferPool struct {
// contains filtered or unexported fields
}
func NewPersistentBufferPool ¶
func NewPersistentBufferPool() *PersistentBufferPool
NewPersistentBufferPool creates a dynamic pool for buffers.
func (*PersistentBufferPool) Clean ¶
func (p *PersistentBufferPool) Clean(threshold int64)
Clean removes buffers from the pool that haven't been used in the given duration. It uses a simple LRU approach based on the lastUsed timestamp.
func (*PersistentBufferPool) Clear ¶
func (p *PersistentBufferPool) Clear()
Clear drains all buffers currently in the pool, allowing the GC to collect them.
func (*PersistentBufferPool) Get ¶
func (p *PersistentBufferPool) Get() *buffer
func (*PersistentBufferPool) GetSize ¶
func (p *PersistentBufferPool) GetSize() int
GetSize returns the exact number of buffers currently sitting in the pool.
func (*PersistentBufferPool) Put ¶
func (p *PersistentBufferPool) Put(b *buffer)
Put returns b to the pool. The caller must set b.lastUsed = time.Now().Unix() before calling Put so that Clean() can evict idle entries correctly.
type ScopeQueryResult ¶
type ScopeQueryResult struct {
Type *string
Metric string
Hostname string
TypeIds []string
Scope schema.MetricScope
Aggregate bool
}
ScopeQueryResult is a package-independent intermediate type returned by BuildScopeQueries. Each consumer converts it to their own APIQuery type (adding Resolution and any other package-specific fields).
func BuildScopeQueries ¶
func BuildScopeQueries( nativeScope, requestedScope schema.MetricScope, metric, hostname string, topology *schema.Topology, hwthreads []int, accelerators []string, ) ([]ScopeQueryResult, bool)
BuildScopeQueries generates scope query results for a given scope transformation. It returns a slice of results and a boolean indicating success. An empty slice means an expected exception (skip this combination). ok=false means an unhandled case (caller should return an error).
type Subscriptions ¶
type Subscriptions []struct {
// Channel name
SubscribeTo string `json:"subscribe-to"`
// Allow lines without a cluster tag, use this as default, optional
ClusterTag string `json:"cluster-tag"`
}
Subscriptions defines NATS topics to subscribe to for metric ingestion.
Each subscription receives metrics via NATS messaging, enabling real-time data collection from compute nodes.
Fields:
- SubscribeTo: NATS subject/channel name (e.g., "metrics.compute.*")
- ClusterTag: Default cluster name for metrics without cluster tag (optional)