metricstore

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 33 Imported by: 0

Documentation

Overview

This file contains the API types and data fetching logic for querying metric data from the in-memory metric store. It provides structures for building complex queries with support for aggregation, scaling, padding, and statistics computation.

Package metricstore provides buffer.go: Time-series data buffer implementation.

Buffer Architecture

Each metric at each hierarchical level (cluster/host/cpu/etc.) uses a linked-list chain of fixed-size buffers to store time-series data. This design:

  • Avoids reallocation/copying when growing (new links added instead)
  • Enables efficient pooling (buffers returned to sync.Pool)
  • Supports traversal back in time (via prev pointers)
  • Maintains temporal ordering (newer data in later buffers)

Buffer Chain Example

[oldest buffer] <- prev -- [older] <- prev -- [newest buffer (head)]
  start=1000               start=1512           start=2024
  data=[v0...v511]         data=[v0...v511]     data=[v0...v42]

When the head buffer reaches capacity (BufferCap = 512), a new buffer becomes the new head and the old head is linked via prev.

Pooling Strategy

sync.Pool reduces GC pressure for the common case (BufferCap-sized allocations). Non-standard capacity buffers are not pooled (e.g., from checkpoint deserialization).

Time Alignment

Timestamps are aligned to measurement frequency intervals:

index = (timestamp - buffer.start) / buffer.frequency
actualTime = buffer.start + (frequency / 2) + (index * frequency)

Missing data points are represented as NaN values. The read() function performs linear interpolation where possible.

This file implements checkpoint persistence for the in-memory metric store.

Checkpoints enable graceful restarts by periodically saving in-memory metric data to disk in JSON or binary format. The checkpoint system:

Key Features:

  • Periodic background checkpointing via the Checkpointing() worker
  • Two format families: JSON (human-readable) and WAL+binary (compact, crash-safe)
  • Parallel checkpoint creation and loading using worker pools
  • Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{json|bin}
  • WAL file: checkpoint_dir/cluster/host/current.wal (append-only, per-entry)
  • Only saves unarchived data (archived data is already persisted elsewhere)
  • GC optimization during loading to prevent excessive heap growth

Checkpoint Workflow:

  1. Init() loads checkpoints within retention window at startup
  2. Checkpointing() worker periodically saves new data
  3. Shutdown() writes final checkpoint before exit

File Organization:

checkpoints/
  cluster1/
    host001/
      1234567890.json  (JSON format: full subtree snapshot)
      1234567890.bin   (binary format: full subtree snapshot)
      current.wal      (WAL format: append-only per-entry log)
    host002/
      ...

Package metricstore provides config.go: Configuration structures and metric management.

Configuration Hierarchy

The metricstore package uses nested configuration structures:

MetricStoreConfig (Keys)
├─ NumWorkers: Parallel checkpoint/archive workers
├─ RetentionInMemory: How long to keep data in RAM (also used as cleanup interval)
├─ MemoryCap: Memory limit in bytes (triggers forceFree)
├─ Checkpoints: Persistence configuration
│  ├─ FileFormat: "json" or "wal" (default: "wal")
│  └─ RootDir: Checkpoint storage path
├─ Cleanup: Long-term storage configuration (interval = RetentionInMemory)
│  ├─ RootDir: Archive storage path (archive mode only)
│  └─ Mode: "delete" or "archive"
├─ Debug: Development/debugging options
└─ Subscriptions: NATS topic subscriptions for metric ingestion

Metric Configuration

Each metric (e.g., "cpu_load", "mem_used") has a MetricConfig entry in the global Metrics map, defining:

  • Frequency: Measurement interval in seconds
  • Aggregation: How to combine values (sum/avg/none) when transforming scopes
  • offset: Internal index into Level.metrics slice (assigned during Init)

AggregationStrategy

Determines how to combine metric values when aggregating from finer to coarser scopes:

  • NoAggregation: Do not combine (incompatible scopes)
  • SumAggregation: Add values (e.g., power consumption: core→socket)
  • AvgAggregation: Average values (e.g., temperature: core→socket)

Package metricstore provides level.go: Hierarchical tree structure for metric storage.

Level Architecture

The Level type forms a tree structure where each node represents a level in the ClusterCockpit hierarchy: cluster → host → socket → core → hwthread, with special nodes for memory domains and accelerators.

Structure:

Root Level (cluster="emmy")
├─ Level (host="node001")
│  ├─ Level (socket="0")
│  │  ├─ Level (core="0") [stores cpu0 metrics]
│  │  └─ Level (core="1") [stores cpu1 metrics]
│  └─ Level (socket="1")
│     └─ ...
└─ Level (host="node002")
   └─ ...

Each Level can:

  • Hold data (metrics slice of buffer pointers)
  • Have child nodes (children map[string]*Level)
  • Both simultaneously (inner nodes can store aggregated metrics)

Selector Paths

Selectors are hierarchical paths: []string{"cluster", "host", "component"}. Example: []string{"emmy", "node001", "cpu0"} navigates to the cpu0 core level.

Concurrency

RWMutex protects children map and metrics slice. Read-heavy workload (metric reads) uses RLock. Writes (new levels, buffer updates) use Lock. Double-checked locking prevents races during level creation.

This file implements ingestion of InfluxDB line-protocol metric data received over NATS. Each line encodes one metric sample with the following structure:

<measurement>[,cluster=<c>][,hostname=<h>][,type=<t>][,type-id=<id>][,subtype=<s>][,stype-id=<id>] value=<v> [<timestamp>]

The measurement name identifies the metric (e.g. "cpu_load"). Tags provide routing information (cluster, host) and optional sub-device selectors (type, subtype). Only one field is expected per line: "value".

After decoding, each sample is:

  1. Written to the in-memory store via ms.WriteToLevel.
  2. If the checkpoint format is "wal", also forwarded to the WAL staging goroutine via the WALMessages channel for durable write-ahead logging.

Package metricstore provides an efficient in-memory time-series metric storage system with support for hierarchical data organization, checkpointing, and archiving.

The package organizes metrics in a tree structure (cluster → host → component) and provides concurrent read/write access to metric data with configurable aggregation strategies. Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data, and enforcing retention policies.

Key features:

  • In-memory metric storage with configurable retention
  • Hierarchical data organization (selectors)
  • Concurrent checkpoint/archive workers
  • Support for sum and average aggregation
  • NATS integration for metric ingestion

This file implements high-level query functions for loading job metric data with automatic scope transformation and aggregation.

Key Concepts:

Metric Scopes: Metrics are collected at different granularities (native scope):

  • HWThread: Per hardware thread
  • Core: Per CPU core
  • Socket: Per CPU socket
  • MemoryDomain: Per memory domain (NUMA)
  • Accelerator: Per GPU/accelerator
  • Node: Per compute node

Scope Transformation: The buildQueries functions transform between native scope and requested scope by:

  • Aggregating finer-grained data (e.g., HWThread → Core → Socket → Node)
  • Rejecting requests for finer granularity than available
  • Handling special cases (e.g., Accelerator metrics)

Query Building: Constructs APIQuery structures with proper selectors (Type, TypeIds) based on cluster topology and job resources.

This file contains shared scope transformation logic used by both the internal metric store (pkg/metricstore) and the external cc-metric-store client (internal/metricstoreclient). It extracts the common algorithm for mapping between native metric scopes and requested scopes based on cluster topology.

Package metricstore provides walCheckpoint.go: WAL-based checkpoint implementation.

This replaces the Avro shadow tree with an append-only Write-Ahead Log (WAL) per host, eliminating the extra memory overhead of the AvroStore and providing truly continuous (per-write) crash safety.

Architecture

Metric write (DecodeLine)
      │
      ├─► WriteToLevel()    → main MemoryStore (unchanged)
      │
      └─► WALMessages channel
               │
               ▼
        WALStaging goroutine
               │
               ▼
        checkpoints/cluster/host/current.wal  (append-only, binary)

Periodic checkpoint (Checkpointing goroutine):
  1. Write <timestamp>.bin snapshot (column-oriented, from main tree)
  2. Signal WALStaging to truncate current.wal per host

On restart (FromCheckpoint):
  1. Load most recent <timestamp>.bin snapshot
  2. Replay current.wal (overwrite-safe: buffer.write handles duplicate timestamps)

WAL Record Format

[4B magic 0xCC1DA7A1][4B payload_len][payload][4B CRC32]

payload:
  [8B timestamp int64]
  [2B metric_name_len uint16][N metric name bytes]
  [1B selector_count uint8]
  per selector: [1B selector_len uint8][M selector bytes]
  [4B value float32 bits]

Binary Snapshot Format

[4B magic 0xCC5B0001][8B from int64][8B to int64]
Level tree (recursive):
  [4B num_metrics uint32]
  per metric:
    [2B name_len uint16][N name bytes]
    [8B frequency int64][8B start int64]
    [4B num_values uint32][num_values × 4B float32]
  [4B num_children uint32]
  per child: [2B name_len uint16][N name bytes] + Level (recursive)

Index

Constants

View Source
const (
	CheckpointFilePerms = 0o644                    // File permissions for checkpoint files
	CheckpointDirPerms  = 0o755                    // Directory permissions for checkpoint directories
	GCTriggerInterval   = DefaultGCTriggerInterval // Interval for triggering GC during checkpoint loading
)
View Source
const (
	DefaultMaxWorkers                 = 10
	DefaultBufferCapacity             = 512
	DefaultGCTriggerInterval          = 100
	DefaultMemoryUsageTrackerInterval = 1 * time.Hour
)

BufferCap is the default buffer capacity. buffer.data will only ever grow up to its capacity and a new link in the buffer chain will be created if needed so that no copying of data or reallocation needs to happen on writes.

View Source
const MaxMissingDataPoints int64 = 5

MaxMissingDataPoints is the threshold for stale data detection. A buffer is considered healthy if the gap between its last data point and the current time is within MaxMissingDataPoints * frequency.

Variables

View Source
var (
	// ErrNoHostOrMetric is returned when the metric store does not find the host or the metric
	ErrNoHostOrMetric error = errors.New("[METRICSTORE]> metric or host not found")
	// ErrInvalidTimeRange is returned when a query has 'from' >= 'to'
	ErrInvalidTimeRange = errors.New("[METRICSTORE]> invalid time range: 'from' must be before 'to'")
	// ErrEmptyCluster is returned when a query with ForAllNodes has no cluster specified
	ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty")
)
View Source
var (
	// ErrNoData indicates no time-series data exists for the requested metric/level.
	ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level")

	// ErrDataDoesNotAlign indicates that aggregated data from child scopes
	// does not align with the parent scope's expected timestamps/intervals.
	ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align")
)
View Source
var (
	HWThreadString     = string(schema.MetricScopeHWThread)
	CoreString         = string(schema.MetricScopeCore)
	MemoryDomainString = string(schema.MetricScopeMemoryDomain)
	SocketString       = string(schema.MetricScopeSocket)
	AcceleratorString  = string(schema.MetricScopeAccelerator)
)

Pre-converted scope strings avoid repeated string(MetricScope) allocations during query construction. Used in ScopeQueryResult.Type field.

View Source
var ErrNoNewArchiveData error = errors.New("all data already archived")
View Source
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)

TestLoadDataCallback allows tests to override LoadData behavior for testing purposes. When set to a non-nil function, LoadData will call this function instead of the default implementation.

View Source
var WALMessages = make(chan *WALMessage, 4096)

WALMessages is the channel for sending metric writes to the WAL staging goroutine. Buffered to allow burst writes without blocking the metric ingestion path.

Functions

func BuildMetricList

func BuildMetricList() map[string]MetricConfig

func Checkpointing

func Checkpointing(wg *sync.WaitGroup, ctx context.Context)

Checkpointing starts a background worker that periodically saves metric data to disk.

Checkpoints are written every 12 hours (hardcoded).

Format behaviour:

  • "json": Periodic checkpointing every checkpointInterval
  • "wal": Periodic binary snapshots + WAL rotation every checkpointInterval

func CleanUp

func CleanUp(wg *sync.WaitGroup, ctx context.Context)

func CleanupCheckpoints

func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteInstead bool) (int, error)

CleanupCheckpoints deletes or archives all checkpoint files older than `from`. When archiving, consolidates all hosts per cluster into a single Parquet file.

func DecodeLine

func DecodeLine(dec *lineprotocol.Decoder,
	ms *MemoryStore,
	clusterDefault string,
) error

DecodeLine reads all lines from dec (InfluxDB line-protocol) and writes each decoded metric sample into ms.

clusterDefault is used as the cluster name for lines that do not carry a "cluster" tag. Callers typically supply the ClusterTag value from the NATS subscription configuration.

Performance notes:

  • A decodeState is obtained from decodeStatePool to reuse scratch buffers.
  • The Level pointer (host-level node in the metric tree) is cached across consecutive lines that share the same cluster+host pair to avoid repeated lock acquisitions on the root and cluster levels.
  • []byte→string conversions for type/subtype selectors are cached via prevType*/prevSubType* fields because batches typically repeat the same sub-device identifiers.
  • Timestamp parsing tries Second precision first; if that fails it retries Millisecond, Microsecond, and Nanosecond in turn. A missing timestamp falls back to time.Now().

When the checkpoint format is "wal" each successfully decoded sample is also sent to WALMessages so the WAL staging goroutine can persist it durably before the next binary snapshot.

func ExtractTypeID

func ExtractTypeID(queryType *string, typeIds []string, ndx int, metric, hostname string) *string

ExtractTypeID returns the type ID at the given index from a query's TypeIds slice. Returns nil if queryType is nil (no type filtering). Logs a warning and returns nil if the index is out of range.

func Free

func Free(ms *MemoryStore, t time.Time) (int, error)

Free removes metric data older than the given time while preserving data for active nodes.

This function implements intelligent retention by consulting the NodeProvider (if configured) to determine which nodes are currently in use by running jobs. Data for these nodes is preserved even if older than the retention time.

Parameters:

  • ms: The MemoryStore instance
  • t: Time threshold - buffers with data older than this will be freed

Returns:

  • Number of buffers freed
  • Error if NodeProvider query fails

Behavior:

  • If no NodeProvider is set: frees all buffers older than t
  • If NodeProvider returns empty map: frees all buffers older than t
  • Otherwise: preserves buffers for nodes returned by GetUsedNodes(), frees others

func FreeSelected

func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, error)

FreeSelected frees buffers for specific selectors while preserving others.

This function is used when we want to retain some specific nodes beyond the retention time. It iterates through the provided selectors and frees their associated buffers.

Parameters:

  • ms: The MemoryStore instance
  • selectors: List of selector paths to free (e.g., [["cluster1", "node1"], ["cluster2", "node2"]])
  • t: Time threshold for freeing buffers

Returns the total number of buffers freed and any error encountered.

func GetSelectors

func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]string

GetSelectors returns all selectors at depth 2 (cluster/node level) that are NOT in the exclusion map.

This function generates a list of selectors whose buffers should be freed by excluding selectors that correspond to nodes currently in use by running jobs.

Parameters:

  • ms: The MemoryStore instance
  • excludeSelectors: Map of cluster names to node hostnames that should NOT be freed

Returns a list of selectors ([]string paths) that can be safely freed.

Example:

If the tree has paths ["emmy", "node001"] and ["emmy", "node002"],
and excludeSelectors contains {"emmy": ["node001"]},
then only [["emmy", "node002"]] is returned.

func Init

func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.WaitGroup)

Init initializes the metric store from configuration and starts background workers.

This function must be called exactly once before any other metricstore operations. It performs the following initialization steps:

  1. Validates and decodes the metric store configuration
  2. Configures worker pool size (defaults to NumCPU/2+1, max 10)
  3. Loads metric configurations from all registered clusters
  4. Restores checkpoints within the retention window
  5. Starts background workers for retention, checkpointing, archiving, and monitoring
  6. Optionally subscribes to NATS for real-time metric ingestion

Parameters:

  • rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
  • wg: WaitGroup that will be incremented for each background goroutine started

The function will call cclog.Fatal on critical errors during initialization. Use Shutdown() to cleanly stop all background workers started by Init().

Note: Signal handling must be implemented by the caller. Call Shutdown() when receiving termination signals to ensure checkpoint data is persisted.

func InitMetrics

func InitMetrics(metrics map[string]MetricConfig)

InitMetrics initializes the singleton MemoryStore instance with the given metric configurations.

This function must be called before GetMemoryStore() and can only be called once due to the singleton pattern. It assigns each metric an internal offset for efficient buffer indexing.

Parameters:

  • metrics: Map of metric names to their configurations (frequency and aggregation strategy)

Panics if any metric has Frequency == 0, which indicates an invalid configuration.

After this call, the global msInstance is ready for use via GetMemoryStore().

func IntToStringSlice

func IntToStringSlice(is []int) []string

IntToStringSlice converts a slice of integers to a slice of strings. Used to convert hardware thread/core/socket IDs from topology (int) to query TypeIds (string). Optimized to reuse a byte buffer for string conversion, reducing allocations.

func IsMetricRemovedForSubCluster

func IsMetricRemovedForSubCluster(mc *schema.MetricConfig, subCluster string) bool

IsMetricRemovedForSubCluster checks whether a metric is marked as removed for the given subcluster in its per-subcluster configuration.

func MemoryUsageTracker

func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context)

MemoryUsageTracker starts a background goroutine that monitors memory usage.

This worker checks actual process memory usage (via runtime.MemStats) periodically and force-frees buffers if memory exceeds the configured cap. It uses FreeOSMemory() to return memory to the OS after freeing buffers, avoiding aggressive GC that causes performance issues.

The tracker logs both actual memory usage (heap allocated) and metric data size for visibility into memory overhead from Go runtime structures and allocations.

Parameters:

  • wg: WaitGroup to signal completion when context is cancelled
  • ctx: Context for cancellation signal

The goroutine exits when ctx is cancelled.

func ReceiveNats

func ReceiveNats(ms *MemoryStore,
	workers int,
	ctx context.Context,
) error

ReceiveNats subscribes to all configured NATS subjects and feeds incoming line-protocol messages into the MemoryStore.

When workers > 1 a pool of goroutines drains a shared channel so that multiple messages can be decoded in parallel. With workers == 1 the NATS callback decodes inline (no channel overhead, lower latency).

The function blocks until ctx is cancelled and all worker goroutines have finished. It returns nil when the NATS client is not configured; callers should treat that as a no-op rather than an error.

func Retention

func Retention(wg *sync.WaitGroup, ctx context.Context)

Retention starts a background goroutine that periodically frees old metric data.

This worker runs at half the retention interval and calls Free() to remove buffers older than the configured retention time. It respects the NodeProvider to preserve data for nodes with active jobs.

Parameters:

  • wg: WaitGroup to signal completion when context is cancelled
  • ctx: Context for cancellation signal

The goroutine exits when ctx is cancelled.

func RotateWALFiles

func RotateWALFiles(hostDirs []string)

RotateWALFiles sends rotation requests for the given host directories and blocks until all rotations complete.

func RotateWALFilesAfterShutdown

func RotateWALFilesAfterShutdown(hostDirs []string)

RotateWALFiles sends rotation requests for the given host directories and blocks until all rotations complete.

func SanitizeStats

func SanitizeStats(avg, min, max *schema.Float)

SanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling. If ANY of avg/min/max is NaN, ALL three are zeroed for consistency.

func Shutdown

func Shutdown()

Shutdown performs a graceful shutdown of the metric store.

This function cancels all background goroutines started by Init() and writes a final checkpoint to disk before returning. It should be called when the application receives a termination signal.

The function will:

  1. Cancel the context to stop all background workers
  2. Close the WAL messages channel if using WAL format
  3. Write a final checkpoint to preserve in-memory data
  4. Log any errors encountered during shutdown

Note: This function blocks until the final checkpoint is written.

func WALStaging

func WALStaging(wg *sync.WaitGroup, ctx context.Context)

WALStaging starts a background goroutine that receives WALMessage items and appends binary WAL records to per-host current.wal files. Also handles WAL rotation requests from the checkpoint goroutine.

Types

type APIMetricData

type APIMetricData struct {
	Error      *string           `json:"error,omitempty"`
	Data       schema.FloatArray `json:"data,omitempty"`
	From       int64             `json:"from"`
	To         int64             `json:"to"`
	Resolution int64             `json:"resolution"`
	Avg        schema.Float      `json:"avg"`
	Min        schema.Float      `json:"min"`
	Max        schema.Float      `json:"max"`
}

APIMetricData represents the response data for a single metric query.

It contains both the time-series data points and computed statistics (avg, min, max). If an error occurred during data retrieval, the Error field will be set and other fields may be incomplete.

func (*APIMetricData) AddStats

func (data *APIMetricData) AddStats()

AddStats computes and populates the Avg, Min, and Max fields from the Data array.

NaN values in the data are ignored during computation. If all values are NaN, the statistics fields will be set to NaN.

TODO: Optimize this, just like the stats endpoint!

func (*APIMetricData) PadDataWithNull

func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metric string)

PadDataWithNull pads the beginning of the data array with NaN values if needed.

This ensures that the data aligns with the requested 'from' timestamp, even if the metric store doesn't have data for the earliest time points. This is useful for maintaining consistent array indexing across multiple queries.

Parameters:

  • ms: MemoryStore instance to lookup metric configuration
  • from: The requested start timestamp
  • to: The requested end timestamp (unused but kept for API consistency)
  • metric: The metric name to lookup frequency information

func (*APIMetricData) ScaleBy

func (data *APIMetricData) ScaleBy(f schema.Float)

ScaleBy multiplies all data points and statistics by the given factor.

This is commonly used for unit conversion (e.g., bytes to gigabytes). Scaling by 0 or 1 is a no-op for performance reasons.

type APIQuery

type APIQuery struct {
	Type        *string      `json:"type,omitempty"`
	SubType     *string      `json:"subtype,omitempty"`
	Metric      string       `json:"metric"`
	Hostname    string       `json:"host"`
	Resolution  int64        `json:"resolution"`
	TypeIds     []string     `json:"type-ids,omitempty"`
	SubTypeIds  []string     `json:"subtype-ids,omitempty"`
	ScaleFactor schema.Float `json:"scale-by,omitempty"`
	Aggregate   bool         `json:"aggreg"`
}

APIQuery represents a single metric query with optional hierarchical selectors.

The hierarchical selection works as follows:

  • Hostname: The node to query
  • Type + TypeIds: First level of hierarchy (e.g., "cpu" + ["0", "1", "2"])
  • SubType + SubTypeIds: Second level of hierarchy (e.g., "core" + ["0", "1"])

If Aggregate is true, data from multiple type/subtype IDs will be aggregated according to the metric's aggregation strategy. Otherwise, separate results are returned for each combination.

type APIQueryRequest

type APIQueryRequest struct {
	Cluster     string     `json:"cluster"`
	Queries     []APIQuery `json:"queries"`
	ForAllNodes []string   `json:"for-all-nodes"`
	From        int64      `json:"from"`
	To          int64      `json:"to"`
	WithStats   bool       `json:"with-stats"`
	WithData    bool       `json:"with-data"`
	WithPadding bool       `json:"with-padding"`
}

APIQueryRequest represents a batch query request for metric data.

It supports two modes of operation:

  1. Explicit queries via the Queries field
  2. Automatic query generation via ForAllNodes (queries all specified metrics for all nodes in the cluster)

The request can be customized with flags to include/exclude statistics, raw data, and padding.

type APIQueryResponse

type APIQueryResponse struct {
	Queries []APIQuery        `json:"queries,omitempty"`
	Results [][]APIMetricData `json:"results"`
}

APIQueryResponse represents the response to an APIQueryRequest.

Results is a 2D array where each outer element corresponds to a query, and each inner element corresponds to a selector within that query (e.g., multiple CPUs or cores).

func FetchData

func FetchData(req APIQueryRequest) (*APIQueryResponse, error)

FetchData executes a batch metric query request and returns the results.

This is the primary API for retrieving metric data from the memory store. It supports:

  • Individual queries via req.Queries
  • Batch queries for all nodes via req.ForAllNodes
  • Hierarchical selector construction (cluster → host → type → subtype)
  • Optional statistics computation (avg, min, max)
  • Optional data scaling
  • Optional data padding with NaN values

The function constructs selectors based on the query parameters and calls MemoryStore.Read() for each selector. If a query specifies Aggregate=false with multiple type/subtype IDs, separate results are returned for each combination.

Parameters:

  • req: The query request containing queries, time range, and options

Returns:

  • APIQueryResponse containing results for each query, or error if validation fails

Errors:

  • ErrInvalidTimeRange if req.From > req.To
  • ErrEmptyCluster if req.ForAllNodes is used without specifying a cluster
  • Error if MemoryStore is not initialized
  • Individual query errors are stored in APIMetricData.Error field

type AggregationStrategy

type AggregationStrategy int

AggregationStrategy defines how to combine metric values across hierarchy levels.

Used when transforming data from finer-grained scopes (e.g., core) to coarser scopes (e.g., socket). This is SPATIAL aggregation, not TEMPORAL (time-based) aggregation.

Values:

  • NoAggregation: Do not aggregate (incompatible scopes or non-aggregatable metrics)
  • SumAggregation: Add values (e.g., power: sum core power → socket power)
  • AvgAggregation: Average values (e.g., temperature: average core temps → socket temp)
const (
	NoAggregation  AggregationStrategy = iota // Do not aggregate
	SumAggregation                            // Sum values (e.g., power, energy)
	AvgAggregation                            // Average values (e.g., temperature, utilization)
)

func AssignAggregationStrategy

func AssignAggregationStrategy(str string) (AggregationStrategy, error)

AssignAggregationStrategy parses a string into an AggregationStrategy value.

Used when loading metric configurations from JSON/YAML files.

Parameters:

  • str: "sum", "avg", or "" (empty string for NoAggregation)

Returns:

  • AggregationStrategy: Parsed value
  • error: Non-nil if str is unrecognized

type CheckpointFile

type CheckpointFile struct {
	Metrics  map[string]*CheckpointMetrics `json:"metrics"`
	Children map[string]*CheckpointFile    `json:"children"`
	From     int64                         `json:"from"`
	To       int64                         `json:"to"`
}

CheckpointFile represents the hierarchical structure of a checkpoint file. It mirrors the Level tree structure from the MemoryStore.

type CheckpointMetrics

type CheckpointMetrics struct {
	Data      []schema.Float `json:"data"`
	Frequency int64          `json:"frequency"`
	Start     int64          `json:"start"`
}

CheckpointMetrics represents metric data in a checkpoint file. Whenever the structure changes, update MarshalJSON as well!

func (*CheckpointMetrics) MarshalJSON

func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error)

MarshalJSON provides optimized JSON encoding for CheckpointMetrics.

Since schema.Float has custom MarshalJSON, serializing []Float has significant overhead. This method manually constructs JSON to avoid allocations and interface conversions.

type Checkpoints

type Checkpoints struct {
	FileFormat string `json:"file-format"`
	RootDir    string `json:"directory"`
}

Checkpoints configures periodic persistence of in-memory metric data.

Fields:

  • FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
  • RootDir: Filesystem path for checkpoint files (created if missing)

type Cleanup

type Cleanup struct {
	RootDir string `json:"directory"`
	Mode    string `json:"mode"`
}

Cleanup configures long-term storage of old metric data.

Data older than RetentionInMemory is archived to disk or deleted. The cleanup interval is always RetentionInMemory.

Fields:

  • RootDir: Filesystem path for archived data (used in "archive" mode)
  • Mode: "delete" (discard old data) or "archive" (write to RootDir)

type Debug

type Debug struct {
	DumpToFile string `json:"dump-to-file"`
	EnableGops bool   `json:"gops"`
}

Debug provides development and profiling options.

Fields:

  • DumpToFile: Path to dump checkpoint data for inspection (empty = disabled)
  • EnableGops: Enable gops agent for live runtime debugging (https://github.com/google/gops)

type GlobalState

type GlobalState struct {
	// contains filtered or unexported fields
}

GlobalState holds the global state for the metric store with thread-safe access.

type HealthCheckReq

type HealthCheckReq struct {
	Cluster     string   `json:"cluster" example:"fritz"`
	Nodes       []string `json:"nodes"`
	MetricNames []string `json:"metric-names"`
}

type HealthCheckResponse

type HealthCheckResponse struct {
	Status schema.MonitoringState
	Error  error
}

HealthCheckResponse represents the result of a health check operation.

type HealthCheckResult

type HealthCheckResult struct {
	State         schema.MonitoringState
	HealthMetrics string // JSON: {"missing":[...],"degraded":[...]}
}

HealthCheckResult holds the monitoring state and raw JSON health metrics for a single node as determined by HealthCheck.

type InternalMetricStore

type InternalMetricStore struct{}
var MetricStoreHandle *InternalMetricStore

func (*InternalMetricStore) HealthCheck

func (ccms *InternalMetricStore) HealthCheck(cluster string,
	nodes []string, metrics []string,
) (map[string]HealthCheckResult, error)

HealthCheck delegates to the internal MemoryStore's HealthCheck.

func (*InternalMetricStore) LoadData

func (ccms *InternalMetricStore) LoadData(
	job *schema.Job,
	metrics []string,
	scopes []schema.MetricScope,
	ctx context.Context,
	resolution int,
) (schema.JobData, error)

LoadData loads metric data for a specific job with automatic scope transformation.

This is the primary function for retrieving job metric data. It handles:

  • Building queries with scope transformation via buildQueries
  • Fetching data from the metric store
  • Organizing results by metric and scope
  • Converting NaN statistics to 0 for JSON compatibility
  • Partial error handling (returns data for successful queries even if some fail)

Parameters:

  • job: Job metadata including cluster, resources, and time range
  • metrics: List of metric names to load
  • scopes: Requested metric scopes (will be transformed to match native scopes)
  • ctx: Context for cancellation (currently unused but reserved for future use)
  • resolution: Data resolution in seconds (0 for native resolution)

Returns:

  • JobData: Map of metric → scope → JobMetric with time-series data and statistics
  • Error: Returns error if query building or fetching fails, or partial error listing failed hosts

Example:

jobData, err := LoadData(job, []string{"cpu_load", "mem_used"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 60)

func (*InternalMetricStore) LoadNodeData

func (ccms *InternalMetricStore) LoadNodeData(
	cluster string,
	metrics, nodes []string,
	scopes []schema.MetricScope,
	from, to time.Time,
	ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error)

LoadNodeData loads metric data for specific nodes in a cluster over a time range.

Unlike LoadData which operates on job resources, this function queries arbitrary nodes directly. Useful for system monitoring and node status views.

Parameters:

  • cluster: Cluster name
  • metrics: List of metric names
  • nodes: List of node hostnames (nil = all nodes in cluster via ForAllNodes)
  • scopes: Requested metric scopes (currently unused - always node scope)
  • from, to: Time range
  • ctx: Context (currently unused)

Returns:

  • Map of hostname → metric → []JobMetric
  • Error or partial error listing failed queries

func (*InternalMetricStore) LoadNodeListData

func (ccms *InternalMetricStore) LoadNodeListData(
	cluster, subCluster string,
	nodes []string,
	metrics []string,
	scopes []schema.MetricScope,
	resolution int,
	from, to time.Time,
	ctx context.Context,
) (map[string]schema.JobData, error)

LoadNodeListData loads metric data for a list of nodes with full scope transformation support.

This is the most flexible node data loading function, supporting arbitrary scopes and resolution. Uses buildNodeQueries for proper scope transformation based on topology.

Parameters:

  • cluster: Cluster name
  • subCluster: SubCluster name (empty string to infer from node names)
  • nodes: List of node hostnames
  • metrics: List of metric names
  • scopes: Requested metric scopes
  • resolution: Data resolution in seconds
  • from, to: Time range
  • ctx: Context (currently unused)

Returns:

  • Map of hostname → JobData (metric → scope → JobMetric)
  • Error or partial error listing failed queries

func (*InternalMetricStore) LoadScopedStats

func (ccms *InternalMetricStore) LoadScopedStats(
	job *schema.Job,
	metrics []string,
	scopes []schema.MetricScope,
	ctx context.Context,
) (schema.ScopedJobStats, error)

LoadScopedStats loads metric statistics for a job with scope-aware grouping.

Similar to LoadStats but supports multiple scopes and returns statistics grouped by scope with hardware IDs (e.g., per-core, per-socket statistics).

Parameters:

  • job: Job metadata
  • metrics: List of metric names
  • scopes: Requested metric scopes
  • ctx: Context (currently unused)

Returns:

  • ScopedJobStats: Map of metric → scope → []ScopedStats (with hostname and ID)
  • Error or partial error listing failed queries

func (*InternalMetricStore) LoadStats

func (ccms *InternalMetricStore) LoadStats(
	job *schema.Job,
	metrics []string,
	ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error)

LoadStats loads only metric statistics (avg/min/max) for a job at node scope.

This is an optimized version of LoadData that fetches only statistics without time-series data, reducing bandwidth and memory usage. Always queries at node scope.

Parameters:

  • job: Job metadata
  • metrics: List of metric names
  • ctx: Context (currently unused)

Returns:

  • Map of metric → hostname → statistics
  • Error on query building or fetching failure

type Level

type Level struct {
	// contains filtered or unexported fields
}

Level represents a node in the hierarchical metric storage tree.

Can be both a leaf or inner node. Inner nodes hold data in 'metrics' for aggregated values (e.g., socket-level metrics derived from core-level data). Named "Level" instead of "node" to avoid confusion with cluster nodes (hosts).

Fields:

  • children: Map of child level names to Level pointers (e.g., "cpu0" → Level)
  • metrics: Slice of buffer pointers (one per metric, indexed by MetricConfig.offset)
  • lock: RWMutex for concurrent access (read-heavy, write-rare)

type MemoryStore

type MemoryStore struct {
	Metrics map[string]MetricConfig
	// contains filtered or unexported fields
}

MemoryStore is the main in-memory time-series metric storage implementation.

It organizes metrics in a hierarchical tree structure where each level represents a component of the system hierarchy (e.g., cluster → host → CPU). Each level can store multiple metrics as time-series buffers.

The store is initialized as a singleton via InitMetrics() and accessed via GetMemoryStore(). All public methods are safe for concurrent use.

func GetMemoryStore

func GetMemoryStore() *MemoryStore

GetMemoryStore returns the singleton MemoryStore instance.

Returns the initialized MemoryStore singleton. Calls cclog.Fatal if InitMetrics() was not called first.

This function is safe for concurrent use after initialization.

func (*MemoryStore) DebugDump

func (m *MemoryStore) DebugDump(w *bufio.Writer, selector []string) error

func (*MemoryStore) ForceFree

func (m *MemoryStore) ForceFree() (int, error)

ForceFree unconditionally removes the oldest buffer from each metric chain.

func (*MemoryStore) Free

func (m *MemoryStore) Free(selector []string, t int64) (int, error)

Free releases all buffers for the selected level and all its children that contain only values older than `t`.

func (*MemoryStore) FreeAll

func (m *MemoryStore) FreeAll() error

func (*MemoryStore) FromCheckpoint

func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error)

FromCheckpoint loads checkpoint files from disk into memory in parallel.

Uses worker pool to load cluster/host combinations. Returns number of files loaded and any errors.

func (*MemoryStore) FromCheckpointFiles

func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error)

FromCheckpointFiles is the main entry point for loading checkpoints at startup.

Creates checkpoint directory if it doesn't exist. This function must be called before any writes or reads, and can only be called once.

func (*MemoryStore) GetHealthyMetrics

func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []string) ([]string, []string, error)

GetHealthyMetrics returns missing and degraded metric lists for a node.

It walks the metric tree starting from the node identified by selector and classifies each expected metric:

  • Missing: no buffer anywhere in the subtree, or metric not in global config
  • Degraded: at least one stale buffer exists in the subtree

Metrics present in expectedMetrics but absent from both returned lists are considered fully healthy.

func (*MemoryStore) GetLevel

func (m *MemoryStore) GetLevel(selector []string) *Level

func (*MemoryStore) GetMetricFrequency

func (ms *MemoryStore) GetMetricFrequency(metricName string) (int64, error)

func (*MemoryStore) GetPaths

func (ms *MemoryStore) GetPaths(targetDepth int) [][]string

GetPaths returns a list of lists (paths) to the specified depth.

func (*MemoryStore) HealthCheck

func (m *MemoryStore) HealthCheck(cluster string,
	nodes []string, expectedMetrics []string,
) (map[string]HealthCheckResult, error)

HealthCheck evaluates multiple nodes against a set of expected metrics and returns a monitoring state per node.

States:

  • MonitoringStateFull: all expected metrics are healthy
  • MonitoringStatePartial: some metrics are missing or degraded
  • MonitoringStateFailed: node not found, or no healthy metrics at all

func (*MemoryStore) ListChildren

func (m *MemoryStore) ListChildren(selector []string) []string

ListChildren , given a selector, returns a list of all children of the level selected.

func (*MemoryStore) Read

func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]schema.Float, int64, int64, int64, error)

Read returns all values for metric `metric` from `from` to `to` for the selected level(s). If the level does not hold the metric itself, the data will be aggregated recursively from the children. The second and third return value are the actual from/to for the data. Those can be different from the range asked for if no data was available.

func (*MemoryStore) SetNodeProvider

func (ms *MemoryStore) SetNodeProvider(provider NodeProvider)

SetNodeProvider sets the NodeProvider implementation for the MemoryStore. This must be called during initialization to provide job state information for selective buffer retention during Free operations. If not set, the Free function will fall back to freeing all buffers.

func (*MemoryStore) SizeInBytes

func (m *MemoryStore) SizeInBytes() int64

func (*MemoryStore) SizeInGB

func (m *MemoryStore) SizeInGB() float64

func (*MemoryStore) Stats

func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int64) (*Stats, int64, int64, error)

Returns statistics for the requested metric on the selected node/level. Data is aggregated to the selected level the same way as in `MemoryStore.Read`. If `Stats.Samples` is zero, the statistics should not be considered as valid.

func (*MemoryStore) ToCheckpoint

func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error)

ToCheckpoint writes metric data to checkpoint files in parallel (JSON format).

Metrics at root and cluster levels are skipped. One file per host is created. Uses worker pool (Keys.NumWorkers) for parallel processing. Only locks one host at a time, allowing concurrent writes/reads to other hosts.

Returns the number of checkpoint files created and any errors encountered.

func (*MemoryStore) ToCheckpointWAL

func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string, error)

ToCheckpointWAL writes binary snapshot files for all hosts in parallel. Returns the number of files written, the list of host directories that were successfully checkpointed (for WAL rotation), and any errors.

func (*MemoryStore) Write

func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error

Write all values in `metrics` to the level specified by `selector` for time `ts`. Look at `findLevelOrCreate` for how selectors work.

func (*MemoryStore) WriteToLevel

func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metrics []Metric) error

WriteToLevel assumes that `minfo` in `metrics` is filled in

type Metric

type Metric struct {
	Name  string
	Value schema.Float
	// MetricConfig contains frequency and aggregation settings for this metric.
	// If Frequency is 0, configuration will be looked up from MemoryStore.Metrics during Write().
	MetricConfig MetricConfig
}

Metric represents a single metric data point to be written to the store.

type MetricConfig

type MetricConfig struct {
	// Interval in seconds at which measurements are stored
	Frequency int64

	// Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy.
	Aggregation AggregationStrategy
	// contains filtered or unexported fields
}

MetricConfig defines configuration for a single metric type.

Stored in the global Metrics map, keyed by metric name (e.g., "cpu_load").

Fields:

  • Frequency: Measurement interval in seconds (e.g., 60 for 1-minute granularity)
  • Aggregation: How to combine values across hierarchy levels (sum/avg/none)
  • offset: Internal index into Level.metrics slice (assigned during Init)

type MetricStoreConfig

type MetricStoreConfig struct {
	// Number of concurrent workers for checkpoint and archive operations.
	// If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10)
	NumWorkers        int            `json:"num-workers"`
	RetentionInMemory string         `json:"retention-in-memory"`
	MemoryCap         int            `json:"memory-cap"`
	Checkpoints       Checkpoints    `json:"checkpoints"`
	Debug             *Debug         `json:"debug"`
	Cleanup           *Cleanup       `json:"cleanup"`
	Subscriptions     *Subscriptions `json:"nats-subscriptions"`
}

MetricStoreConfig defines the main configuration for the metricstore.

Loaded from cc-backend's config.json "metricstore" section. Controls memory usage, persistence, archiving, and metric ingestion.

Fields:

  • NumWorkers: Parallel workers for checkpoint/archive (0 = auto: min(NumCPU/2+1, 10))
  • RetentionInMemory: Duration string (e.g., "48h") for in-memory data retention
  • MemoryCap: Max bytes for buffer data (0 = unlimited); triggers forceFree when exceeded
  • Checkpoints: Periodic persistence configuration
  • Debug: Development/profiling options (nil = disabled)
  • Archive: Long-term storage configuration (nil = disabled)
  • Subscriptions: NATS topics for metric ingestion (nil = polling only)
var Keys MetricStoreConfig = MetricStoreConfig{
	Checkpoints: Checkpoints{
		FileFormat: "wal",
		RootDir:    "./var/checkpoints",
	},
	Cleanup: &Cleanup{
		Mode: "delete",
	},
}

Keys is the global metricstore configuration instance.

Initialized with defaults, then overwritten by cc-backend's config.json. Accessed by Init(), Checkpointing(), and other lifecycle functions.

type NodeProvider

type NodeProvider interface {
	// GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames
	// that are currently in use by jobs that started before the given timestamp.
	//
	// Parameters:
	//   - ts: Unix timestamp threshold - returns nodes with jobs started before this time
	//
	// Returns:
	//   - Map of cluster names to lists of node hostnames that should be excluded from garbage collection
	//   - Error if the query fails
	GetUsedNodes(ts int64) (map[string][]string, error)
}

NodeProvider provides information about nodes currently in use by running jobs.

This interface allows metricstore to query job information without directly depending on the repository package, breaking the import cycle.

Implementations should return nodes that are actively processing jobs started before the given timestamp. These nodes will be excluded from retention-based garbage collection to prevent data loss for jobs that are still running or recently completed.

type ParquetMetricRow

type ParquetMetricRow struct {
	Cluster   string  `parquet:"cluster"`
	Hostname  string  `parquet:"hostname"`
	Metric    string  `parquet:"metric"`
	Scope     string  `parquet:"scope"`
	ScopeID   string  `parquet:"scope_id"`
	Timestamp int64   `parquet:"timestamp"`
	Frequency int64   `parquet:"frequency"`
	Value     float32 `parquet:"value"`
}

ParquetMetricRow is the long-format schema for archived metric data. One row per (host, metric, scope, scope_id, timestamp) data point. Sorted by (cluster, hostname, metric, timestamp) for optimal compression.

type PersistentBufferPool

type PersistentBufferPool struct {
	// contains filtered or unexported fields
}

func NewPersistentBufferPool

func NewPersistentBufferPool() *PersistentBufferPool

NewPersistentBufferPool creates a dynamic pool for buffers.

func (*PersistentBufferPool) Clean

func (p *PersistentBufferPool) Clean(threshold int64)

Clean removes buffers from the pool that haven't been used in the given duration. It uses a simple LRU approach based on the lastUsed timestamp.

func (*PersistentBufferPool) Clear

func (p *PersistentBufferPool) Clear()

Clear drains all buffers currently in the pool, allowing the GC to collect them.

func (*PersistentBufferPool) Get

func (p *PersistentBufferPool) Get() *buffer

func (*PersistentBufferPool) GetSize

func (p *PersistentBufferPool) GetSize() int

GetSize returns the exact number of buffers currently sitting in the pool.

func (*PersistentBufferPool) Put

func (p *PersistentBufferPool) Put(b *buffer)

Put returns b to the pool. The caller must set b.lastUsed = time.Now().Unix() before calling Put so that Clean() can evict idle entries correctly.

type ScopeQueryResult

type ScopeQueryResult struct {
	Type      *string
	Metric    string
	Hostname  string
	TypeIds   []string
	Scope     schema.MetricScope
	Aggregate bool
}

ScopeQueryResult is a package-independent intermediate type returned by BuildScopeQueries. Each consumer converts it to their own APIQuery type (adding Resolution and any other package-specific fields).

func BuildScopeQueries

func BuildScopeQueries(
	nativeScope, requestedScope schema.MetricScope,
	metric, hostname string,
	topology *schema.Topology,
	hwthreads []int,
	accelerators []string,
) ([]ScopeQueryResult, bool)

BuildScopeQueries generates scope query results for a given scope transformation. It returns a slice of results and a boolean indicating success. An empty slice means an expected exception (skip this combination). ok=false means an unhandled case (caller should return an error).

type Stats

type Stats struct {
	Samples int
	Avg     schema.Float
	Min     schema.Float
	Max     schema.Float
}

type Subscriptions

type Subscriptions []struct {
	// Channel name
	SubscribeTo string `json:"subscribe-to"`

	// Allow lines without a cluster tag, use this as default, optional
	ClusterTag string `json:"cluster-tag"`
}

Subscriptions defines NATS topics to subscribe to for metric ingestion.

Each subscription receives metrics via NATS messaging, enabling real-time data collection from compute nodes.

Fields:

  • SubscribeTo: NATS subject/channel name (e.g., "metrics.compute.*")
  • ClusterTag: Default cluster name for metrics without cluster tag (optional)

type WALMessage

type WALMessage struct {
	MetricName string
	Cluster    string
	Node       string
	Selector   []string
	Value      schema.Float
	Timestamp  int64
}

WALMessage represents a single metric write to be appended to the WAL. Cluster and Node are NOT stored in the WAL record (inferred from file path).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL