archive

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 32 Imported by: 0

Documentation

Overview

Package archive implements the job archive interface and various backend implementations.

The archive package provides a pluggable storage backend system for job metadata and performance data. It supports three backend types:

  • file: Filesystem-based storage with hierarchical directory structure
  • s3: AWS S3 and S3-compatible object storage (MinIO, localstack)
  • sqlite: Single-file SQLite database with BLOB storage

Backend Selection

Choose a backend based on your deployment requirements:

  • File: Best for single-server deployments with local fast storage
  • S3: Best for distributed deployments requiring redundancy and multi-instance access
  • SQLite: Best for portable archives with SQL query capability and transactional integrity

Configuration

The archive backend is configured via JSON in the application config file:

{
  "archive": {
    "kind": "file",           // or "s3" or "sqlite"
    "path": "/var/lib/archive" // for file backend
  }
}

For S3 backend (endpoint, region, and usePathStyle are optional):

{
  "archive": {
    "kind": "s3",
    "endpoint": "http://192.168.178.10",
    "bucket": "my-job-archive",
    "region": "us-east-1",
    "usePathStyle": true,
    "accessKey": "...",
    "secretKey": "..."
  }
}

For SQLite backend:

{
  "archive": {
    "kind": "sqlite",
    "dbPath": "/var/lib/archive.db"
  }
}

Usage

The package is initialized once at application startup:

err := archive.Init(rawConfig, false)
if err != nil {
    log.Fatal(err)
}

After initialization, use the global functions to interact with the archive:

// Check if a job exists
exists := archive.GetHandle().Exists(job)

// Load job metadata
jobMeta, err := archive.GetHandle().LoadJobMeta(job)

// Store job metadata
err = archive.GetHandle().StoreJobMeta(job)

Thread Safety

All backend implementations are safe for concurrent use. The package uses internal locking for operations that modify shared state.

Package archive provides nodelist parsing functionality for HPC cluster node specifications.

Overview

The nodelist package implements parsing and querying of compact node list representations commonly used in HPC job schedulers and cluster management systems. It converts compressed node specifications (e.g., "node[01-10]") into queryable structures that can efficiently test node membership and expand to full node lists.

Node List Format

Node lists use a compact syntax with the following rules:

  1. Comma-separated terms represent alternative node patterns (OR logic)
  2. Each term consists of a string prefix followed by optional numeric ranges
  3. Numeric ranges are specified in square brackets with zero-padded start-end format
  4. Multiple ranges within brackets are comma-separated
  5. Range digits must be zero-padded and of equal length (e.g., "01-99" not "1-99")

Examples

"node01"                    // Single node
"node01,node02"             // Multiple individual nodes
"node[01-10]"               // Range: node01 through node10 (zero-padded)
"node[01-10,20-30]"         // Multiple ranges: node01-10 and node20-30
"cn-00[10-20],cn-00[50-60]" // Different prefixes with ranges
"login,compute[001-100]"    // Mixed individual and range terms

Usage

Parse a node list specification:

nl, err := ParseNodeList("node[01-10],login")
if err != nil {
    log.Fatal(err)
}

Check if a node name matches the list:

if nl.Contains("node05") {
    // node05 is in the list
}

Expand to full list of node names:

nodes := nl.PrintList()  // ["node01", "node02", ..., "node10", "login"]

Count total nodes in the list:

count := nl.NodeCount()  // 11 (10 from range + 1 individual)

Integration

This package is used by:

  • clusterConfig.go: Parses SubCluster.Nodes field from cluster configuration
  • schema.resolvers.go: GraphQL resolver for computing numberOfNodes in subclusters
  • Job archive: Validates node assignments against configured cluster topology

Constraints

  • Only zero-padded numeric ranges are supported
  • Range start and end must have identical digit counts
  • No whitespace allowed in node list specifications
  • Ranges must be specified as start-end (not individual numbers)

Index

Constants

View Source
const Version uint64 = 3

Version is the current archive schema version. The archive backend must match this version for compatibility.

Variables

View Source
var (
	Clusters             []*schema.Cluster
	GlobalMetricList     []*schema.GlobalMetricListItem
	GlobalUserMetricList []*schema.GlobalMetricListItem
	NodeLists            map[string]map[string]NodeList
)

Functions

func AssignSubCluster

func AssignSubCluster(job *schema.Job) error

AssignSubCluster sets the `job.subcluster` property of the job based on its cluster and resources.

func DecodeCluster

func DecodeCluster(r io.Reader) (*schema.Cluster, error)

func DecodeJobData

func DecodeJobData(r io.Reader, k string) (schema.JobData, error)

func DecodeJobMeta

func DecodeJobMeta(r io.Reader) (*schema.Job, error)

func DecodeJobStats added in v1.4.4

func DecodeJobStats(r io.Reader, k string) (schema.ScopedJobStats, error)

func EncodeCluster added in v1.5.0

func EncodeCluster(w io.Writer, c *schema.Cluster) error

func EncodeJobData

func EncodeJobData(w io.Writer, d *schema.JobData) error

func EncodeJobMeta

func EncodeJobMeta(w io.Writer, d *schema.Job) error

func GetCluster

func GetCluster(cluster string) *schema.Cluster

func GetMetricConfig

func GetMetricConfig(cluster, metric string) *schema.MetricConfig

func GetMetricConfigSubCluster added in v1.5.0

func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric

func GetStatistics

func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error)

GetStatistics returns all metric statistics for a job. Returns a map of metric names to their job-level statistics.

func GetSubCluster

func GetSubCluster(cluster, subcluster string) (*schema.SubCluster, error)

func GetSubClusterByNode

func GetSubClusterByNode(cluster, hostname string) (string, error)

func Init

func Init(rawConfig json.RawMessage) error

Init initializes the archive backend with the provided configuration. Must be called once at application startup before using any archive functions.

Parameters:

  • rawConfig: JSON configuration for the archive backend
  • disableArchive: if true, disables archive functionality

The configuration determines which backend is used (file, s3, or sqlite). Returns an error if initialization fails or version is incompatible.

func LoadAveragesFromArchive

func LoadAveragesFromArchive(
	job *schema.Job,
	metrics []string,
	data [][]schema.Float,
) error

LoadAveragesFromArchive loads average metric values for a job from the archive. This is a helper function that extracts average values from job statistics.

Parameters:

  • job: Job to load averages for
  • metrics: List of metric names to retrieve
  • data: 2D slice where averages will be appended (one row per metric)

func LoadScopedStatsFromArchive added in v1.4.4

func LoadScopedStatsFromArchive(
	job *schema.Job,
	metrics []string,
	scopes []schema.MetricScope,
) (schema.ScopedJobStats, error)

LoadScopedStatsFromArchive loads scoped statistics for a job from the archive. Returns statistics organized by metric scope (node, socket, core, etc.).

func LoadStatsFromArchive added in v1.4.3

func LoadStatsFromArchive(
	job *schema.Job,
	metrics []string,
) (map[string]schema.MetricStatistics, error)

LoadStatsFromArchive loads metric statistics for a job from the archive. Returns a map of metric names to their statistics (min, max, avg).

func MetricIndex added in v1.4.0

func MetricIndex(mc []*schema.MetricConfig, name string) (int, error)

func UpdateMetadata added in v1.3.0

func UpdateMetadata(job *schema.Job, metadata map[string]string) error

UpdateMetadata updates the metadata map for an archived job. If the job is still running or archiving is disabled, this is a no-op.

This function is safe for concurrent use (protected by mutex).

func UpdateTags

func UpdateTags(job *schema.Job, tags []*schema.Tag) error

UpdateTags updates the tag list for an archived job. If the job is still running or archiving is disabled, this is a no-op.

This function is safe for concurrent use (protected by mutex).

Types

type ArchiveBackend

type ArchiveBackend interface {
	// Init initializes the archive backend with the provided configuration.
	// Returns the archive version found in the backend storage.
	// Returns an error if the version is incompatible or initialization fails.
	Init(rawConfig json.RawMessage) (uint64, error)

	// Info prints archive statistics to stdout, including job counts,
	// date ranges, and storage sizes per cluster.
	Info()

	// Exists checks if a job with the given ID, cluster, and start time
	// exists in the archive.
	Exists(job *schema.Job) bool

	// LoadJobMeta loads job metadata from the archive.
	// Returns the complete Job structure including resources, tags, and statistics.
	LoadJobMeta(job *schema.Job) (*schema.Job, error)

	// LoadJobData loads the complete time-series performance data for a job.
	// Returns a map of metric names to their scoped data (node, socket, core, etc.).
	LoadJobData(job *schema.Job) (schema.JobData, error)

	// LoadJobStats loads pre-computed statistics from the job data.
	// Returns scoped statistics (min, max, avg) for all metrics.
	LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error)

	// LoadClusterCfg loads the cluster configuration.
	// Returns the cluster topology, metrics, and hardware specifications.
	LoadClusterCfg(name string) (*schema.Cluster, error)

	// StoreJobMeta stores job metadata to the archive.
	// Overwrites existing metadata for the same job ID, cluster, and start time.
	StoreJobMeta(jobMeta *schema.Job) error

	// StoreClusterCfg stores the cluster configuration to the archive.
	// Overwrites an existing configuration for the same cluster.
	StoreClusterCfg(name string, config *schema.Cluster) error

	// ImportJob stores both job metadata and performance data to the archive.
	// This is typically used during initial job archiving.
	ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error

	// GetClusters returns a list of all cluster names found in the archive.
	GetClusters() []string

	// CleanUp removes the specified jobs from the archive.
	// Used by retention policies to delete old jobs.
	CleanUp(jobs []*schema.Job)

	// Move relocates jobs to a different path within the archive.
	// The implementation depends on the backend type.
	Move(jobs []*schema.Job, path string)

	// Clean removes jobs outside the specified time range.
	// Jobs with start_time < before OR start_time > after are deleted.
	// Set after=0 to only use the before parameter.
	Clean(before int64, after int64)

	// Compress compresses job data files to save storage space.
	// For filesystem and SQLite backends, this applies gzip compression.
	// For S3, this compresses and replaces objects.
	Compress(jobs []*schema.Job)

	// CompressLast returns the timestamp of the last compression run
	// and updates it to the provided starttime.
	CompressLast(starttime int64) int64

	// Iter returns a channel that yields all jobs in the archive.
	// If loadMetricData is true, includes performance data; otherwise only metadata.
	// The channel is closed when iteration completes.
	Iter(loadMetricData bool) <-chan JobContainer
}

ArchiveBackend defines the interface that all archive storage backends must implement. Implementations include FsArchive (filesystem), S3Archive (object storage), and SqliteArchive (database).

All methods are safe for concurrent use unless otherwise noted.

func GetHandle

func GetHandle() ArchiveBackend

GetHandle returns the initialized archive backend instance. Must be called after Init().

func InitBackend added in v1.5.0

func InitBackend(rawConfig json.RawMessage) (ArchiveBackend, error)

InitBackend creates and initializes a new archive backend instance without affecting the global singleton. This is useful for archive migration tools that need to work with multiple archive backends simultaneously.

Parameters:

  • rawConfig: JSON configuration for the archive backend

Returns the initialized backend instance or an error if initialization fails. Does not validate the configuration against the schema.

type FsArchive

type FsArchive struct {
	// contains filtered or unexported fields
}

FsArchive implements ArchiveBackend using a hierarchical filesystem structure. Jobs are stored in directories organized by cluster, job ID, and start time.

Directory structure: <path>/<cluster>/<jobid/1000>/<jobid%1000>/<starttime>/

func (*FsArchive) Clean

func (fsa *FsArchive) Clean(before int64, after int64)

func (*FsArchive) CleanUp

func (fsa *FsArchive) CleanUp(jobs []*schema.Job)

func (*FsArchive) Compress

func (fsa *FsArchive) Compress(jobs []*schema.Job)

func (*FsArchive) CompressLast

func (fsa *FsArchive) CompressLast(starttime int64) int64

func (*FsArchive) Exists

func (fsa *FsArchive) Exists(job *schema.Job) bool

func (*FsArchive) GetClusters

func (fsa *FsArchive) GetClusters() []string

func (*FsArchive) ImportJob

func (fsa *FsArchive) ImportJob(
	jobMeta *schema.Job,
	jobData *schema.JobData,
) error

func (*FsArchive) Info

func (fsa *FsArchive) Info()

func (*FsArchive) Init

func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error)

func (*FsArchive) Iter

func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer

func (*FsArchive) LoadClusterCfg

func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error)

func (*FsArchive) LoadJobData

func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error)

func (*FsArchive) LoadJobMeta

func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.Job, error)

func (*FsArchive) LoadJobStats added in v1.4.4

func (fsa *FsArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error)

func (*FsArchive) Move

func (fsa *FsArchive) Move(jobs []*schema.Job, path string)

func (*FsArchive) StoreClusterCfg added in v1.5.0

func (fsa *FsArchive) StoreClusterCfg(name string, config *schema.Cluster) error

func (*FsArchive) StoreJobMeta

func (fsa *FsArchive) StoreJobMeta(job *schema.Job) error

type FsArchiveConfig

type FsArchiveConfig struct {
	Path string `json:"path"` // Root directory path for the archive
}

FsArchiveConfig holds the configuration for the filesystem archive backend.

type JobContainer

type JobContainer struct {
	Meta *schema.Job     // Job metadata (always present)
	Data *schema.JobData // Performance data (nil if not loaded)
}

JobContainer combines job metadata and optional performance data. Used by Iter() to yield jobs during archive iteration.

type NLExprIntRange

type NLExprIntRange struct {
	// contains filtered or unexported fields
}

NLExprIntRange represents a single zero-padded integer range (e.g., "01-99"). Fields:

  • start, end: Numeric range boundaries (inclusive)
  • zeroPadded: Must be true (non-padded ranges not supported)
  • digits: Required digit count for zero-padding

type NLExprIntRanges

type NLExprIntRanges []NLExprIntRange

NLExprIntRanges represents multiple alternative integer ranges (comma-separated within brackets). A node name matches if it matches ANY of the contained ranges (OR logic).

type NLExprString

type NLExprString string

NLExprString represents a literal string prefix in a node name pattern. It matches by checking if the input starts with this exact string.

type NodeList

type NodeList [][]interface {
	// contains filtered or unexported methods
}

NodeList represents a parsed node list specification as a collection of node pattern terms. Each term is a sequence of expressions that must match consecutively for a node name to match. Terms are evaluated with OR logic - a node matches if ANY term matches completely.

Internal structure:

  • Outer slice: OR terms (comma-separated in input)
  • Inner slice: AND expressions (must all match sequentially)
  • Each expression implements: consume (pattern matching), limits (range info), prefix (string part)

Example: "node[01-10],login" becomes:

  • Term 1: [NLExprString("node"), NLExprIntRanges(01-10)]
  • Term 2: [NLExprString("login")]

func ParseNodeList

func ParseNodeList(raw string) (NodeList, error)

ParseNodeList parses a compact node list specification into a queryable NodeList structure.

Input format rules:

  • Comma-separated terms (OR logic): "node01,node02" matches either node
  • Range syntax: "node[01-10]" expands to node01 through node10
  • Multiple ranges: "node[01-05,10-15]" creates two ranges
  • Zero-padding required: digits in ranges must be zero-padded and equal length
  • Mixed formats: "login,compute[001-100]" combines individual and range terms

Validation:

  • Returns error if brackets are unclosed
  • Returns error if ranges lack '-' separator
  • Returns error if range digits have unequal length
  • Returns error if range numbers fail to parse
  • Returns error on invalid characters

Examples:

  • "node[01-10]" → NodeList with one term (10 nodes)
  • "node01,node02" → NodeList with two terms (2 nodes)
  • "cn[01-05,10-15]" → NodeList with ranges 01-05 and 10-15 (11 nodes total)
  • "a[1-9]" → Error (not zero-padded)
  • "a[01-9]" → Error (unequal digit counts)

func (*NodeList) Contains

func (nl *NodeList) Contains(name string) bool

Contains tests whether the given node name matches any pattern in the NodeList. Returns true if the name matches at least one term completely, false otherwise.

Matching logic:

  • Evaluates each term sequentially (OR logic across terms)
  • Within a term, all expressions must match in order (AND logic)
  • A match is complete only if the entire input is consumed (str == "")

Examples:

  • NodeList("node[01-10]").Contains("node05") → true
  • NodeList("node[01-10]").Contains("node11") → false
  • NodeList("node[01-10]").Contains("node5") → false (missing zero-padding)

func (*NodeList) NodeCount

func (nl *NodeList) NodeCount() int

NodeCount returns the total number of individual nodes represented by the NodeList. This efficiently counts nodes without expanding the full list, making it suitable for large node ranges.

Calculation:

  • Individual node terms contribute 1
  • Range terms contribute (end - start + 1) for each range

Example:

  • ParseNodeList("node[01-10],login").NodeCount() → 11 (10 from range + 1 individual)

func (*NodeList) PrintList

func (nl *NodeList) PrintList() []string

PrintList expands the NodeList into a full slice of individual node names. This performs the inverse operation of ParseNodeList, expanding all ranges into their constituent node names with proper zero-padding.

Returns a slice of node names in the order they appear in the NodeList. For range terms, nodes are expanded in ascending numeric order.

Example:

  • ParseNodeList("node[01-03],login").PrintList() → ["node01", "node02", "node03", "login"]

type S3Archive

type S3Archive struct {
	// contains filtered or unexported fields
}

S3Archive implements ArchiveBackend using AWS S3 or S3-compatible object storage. Jobs are stored as objects with keys mirroring the filesystem structure.

Object key structure: <cluster>/<jobid/1000>/<jobid%1000>/<starttime>/meta.json

func (*S3Archive) Clean added in v1.5.0

func (s3a *S3Archive) Clean(before int64, after int64)

func (*S3Archive) CleanUp added in v1.5.0

func (s3a *S3Archive) CleanUp(jobs []*schema.Job)

func (*S3Archive) Compress added in v1.5.0

func (s3a *S3Archive) Compress(jobs []*schema.Job)

func (*S3Archive) CompressLast added in v1.5.0

func (s3a *S3Archive) CompressLast(starttime int64) int64

func (*S3Archive) Exists added in v1.5.0

func (s3a *S3Archive) Exists(job *schema.Job) bool

func (*S3Archive) GetClusters added in v1.5.0

func (s3a *S3Archive) GetClusters() []string

func (*S3Archive) ImportJob added in v1.5.0

func (s3a *S3Archive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error

func (*S3Archive) Info added in v1.5.0

func (s3a *S3Archive) Info()

func (*S3Archive) Init added in v1.5.0

func (s3a *S3Archive) Init(rawConfig json.RawMessage) (uint64, error)

func (*S3Archive) Iter added in v1.5.0

func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer

func (*S3Archive) LoadClusterCfg added in v1.5.0

func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error)

func (*S3Archive) LoadJobData added in v1.5.0

func (s3a *S3Archive) LoadJobData(job *schema.Job) (schema.JobData, error)

func (*S3Archive) LoadJobMeta added in v1.5.0

func (s3a *S3Archive) LoadJobMeta(job *schema.Job) (*schema.Job, error)

func (*S3Archive) LoadJobStats added in v1.5.0

func (s3a *S3Archive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error)

func (*S3Archive) Move added in v1.5.0

func (s3a *S3Archive) Move(jobs []*schema.Job, targetPath string)

func (*S3Archive) StoreClusterCfg added in v1.5.0

func (s3a *S3Archive) StoreClusterCfg(name string, config *schema.Cluster) error

func (*S3Archive) StoreJobMeta added in v1.5.0

func (s3a *S3Archive) StoreJobMeta(job *schema.Job) error

type S3ArchiveConfig

type S3ArchiveConfig struct {
	Endpoint     string `json:"endpoint"`       // S3 endpoint URL (optional, for MinIO/localstack)
	AccessKey    string `json:"access-key"`     // AWS access key ID
	SecretKey    string `json:"secret-key"`     // AWS secret access key
	Bucket       string `json:"bucket"`         // S3 bucket name
	Region       string `json:"region"`         // AWS region
	UsePathStyle bool   `json:"use-path-style"` // Use path-style URLs (required for MinIO)
}

S3ArchiveConfig holds the configuration for the S3 archive backend.

type SqliteArchive added in v1.5.0

type SqliteArchive struct {
	// contains filtered or unexported fields
}

SqliteArchive implements ArchiveBackend using a SQLite database with BLOB storage. Job metadata and data are stored as JSON BLOBs with indexes for fast queries.

Uses WAL (Write-Ahead Logging) mode for better concurrency and a 64MB cache.

func (*SqliteArchive) Clean added in v1.5.0

func (sa *SqliteArchive) Clean(before int64, after int64)

func (*SqliteArchive) CleanUp added in v1.5.0

func (sa *SqliteArchive) CleanUp(jobs []*schema.Job)

func (*SqliteArchive) Compress added in v1.5.0

func (sa *SqliteArchive) Compress(jobs []*schema.Job)

func (*SqliteArchive) CompressLast added in v1.5.0

func (sa *SqliteArchive) CompressLast(starttime int64) int64

func (*SqliteArchive) Exists added in v1.5.0

func (sa *SqliteArchive) Exists(job *schema.Job) bool

func (*SqliteArchive) GetClusters added in v1.5.0

func (sa *SqliteArchive) GetClusters() []string

func (*SqliteArchive) ImportJob added in v1.5.0

func (sa *SqliteArchive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error

func (*SqliteArchive) Info added in v1.5.0

func (sa *SqliteArchive) Info()

func (*SqliteArchive) Init added in v1.5.0

func (sa *SqliteArchive) Init(rawConfig json.RawMessage) (uint64, error)

func (*SqliteArchive) Iter added in v1.5.0

func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer

func (*SqliteArchive) LoadClusterCfg added in v1.5.0

func (sa *SqliteArchive) LoadClusterCfg(name string) (*schema.Cluster, error)

func (*SqliteArchive) LoadJobData added in v1.5.0

func (sa *SqliteArchive) LoadJobData(job *schema.Job) (schema.JobData, error)

func (*SqliteArchive) LoadJobMeta added in v1.5.0

func (sa *SqliteArchive) LoadJobMeta(job *schema.Job) (*schema.Job, error)

func (*SqliteArchive) LoadJobStats added in v1.5.0

func (sa *SqliteArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error)

func (*SqliteArchive) Move added in v1.5.0

func (sa *SqliteArchive) Move(jobs []*schema.Job, targetPath string)

func (*SqliteArchive) StoreClusterCfg added in v1.5.0

func (sa *SqliteArchive) StoreClusterCfg(name string, config *schema.Cluster) error

func (*SqliteArchive) StoreJobMeta added in v1.5.0

func (sa *SqliteArchive) StoreJobMeta(job *schema.Job) error

type SqliteArchiveConfig added in v1.5.0

type SqliteArchiveConfig struct {
	DBPath string `json:"db-path"` // Path to SQLite database file
}

SqliteArchiveConfig holds the configuration for the SQLite archive backend.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL