Documentation
¶
Overview ¶
Package repository provides the data access layer for cc-backend using the repository pattern.
The repository pattern abstracts database operations and provides a clean interface for data access. Each major entity (Job, User, Node, Tag) has its own repository with CRUD operations and specialized queries.
Database Connection ¶
Initialize the database connection before using any repository:
repository.Connect("sqlite3", "./var/job.db")
Configuration ¶
Optional: Configure repository settings before initialization:
repository.SetConfig(&repository.RepositoryConfig{
CacheSize: 2 * 1024 * 1024, // 2MB cache
MaxOpenConnections: 8, // Connection pool size
MinRunningJobDuration: 300, // Filter threshold
})
If not configured, sensible defaults are used automatically.
Repositories ¶
- JobRepository: Job lifecycle management and querying
- UserRepository: User management and authentication
- NodeRepository: Cluster node state tracking
- Tags: Job tagging and categorization
Caching ¶
Repositories use LRU caching to improve performance. Cache keys are constructed as "type:id" (e.g., "metadata:123"). Cache is automatically invalidated on mutations to maintain consistency.
Transaction Support ¶
For batch operations, use transactions:
t, err := jobRepo.TransactionInit()
if err != nil {
return err
}
defer t.Rollback() // Rollback if not committed
// Perform operations...
jobRepo.TransactionAdd(t, query, args...)
// Commit when done
if err := t.Commit(); err != nil {
return err
}
Package repository provides job query functionality with filtering, pagination, and security controls. This file contains the main query builders and security checks for job retrieval operations.
Package repository provides data access and persistence layer for ClusterCockpit.
This file implements tag management functionality for job categorization and classification. Tags support both manual assignment (via REST/GraphQL APIs) and automatic detection (via tagger plugins). The implementation includes role-based access control through tag scopes and maintains bidirectional consistency between the SQL database and the file-based job archive.
Database Schema:
CREATE TABLE tag (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tag_type VARCHAR(255) NOT NULL,
tag_name VARCHAR(255) NOT NULL,
tag_scope VARCHAR(255) NOT NULL DEFAULT "global",
CONSTRAINT tag_unique UNIQUE (tag_type, tag_name, tag_scope)
);
CREATE TABLE jobtag (
job_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (job_id, tag_id),
FOREIGN KEY (job_id) REFERENCES job(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tag(id) ON DELETE CASCADE
);
The jobtag junction table enables many-to-many relationships between jobs and tags. CASCADE deletion ensures referential integrity when jobs or tags are removed.
Copyright (C) NHR@FAU, University Erlangen-Nuremberg. All rights reserved. This file is part of cc-backend. Use of this source code is governed by a MIT-style license that can be found in the LICENSE file.
Copyright (C) NHR@FAU, University Erlangen-Nuremberg. All rights reserved. This file is part of cc-backend. Use of this source code is governed by a MIT-style license that can be found in the LICENSE file.
Copyright (C) NHR@FAU, University Erlangen-Nuremberg. All rights reserved. This file is part of cc-backend. Use of this source code is governed by a MIT-style license that can be found in the LICENSE file.
Index ¶
- Constants
- Variables
- func AccessCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error)
- func AccessCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error)
- func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder
- func CallJobStartHooks(jobs []*schema.Job)
- func CallJobStopHooks(job *schema.Job)
- func Connect(db string)
- func ForceDB(db string) error
- func GetUserFromContext(ctx context.Context) *schema.User
- func LoadJobStat(job *schema.Job, metric string, statType string) float64
- func MigrateDB(db string) error
- func RegisterJobHook(hook JobHook)
- func ResetConnection() error
- func RevertDB(db string) error
- func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error)
- func SecurityCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error)
- func SetConfig(cfg *RepositoryConfig)
- type ContextKey
- type DBConnection
- type DatabaseOptions
- type Hooks
- type JobHook
- type JobRepository
- func (r *JobRepository) AddHistograms(ctx context.Context, filter []*model.JobFilter, stat *model.JobsStatistics, ...) (*model.JobsStatistics, error)
- func (r *JobRepository) AddJobCount(ctx context.Context, filter []*model.JobFilter, stats []*model.JobsStatistics, ...) ([]*model.JobsStatistics, error)
- func (r *JobRepository) AddJobCountGrouped(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate, ...) ([]*model.JobsStatistics, error)
- func (r *JobRepository) AddMetricHistograms(ctx context.Context, filter []*model.JobFilter, metrics []string, ...) (*model.JobsStatistics, error)
- func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*schema.Tag, error)
- func (r *JobRepository) AddTagDirect(job int64, tag int64) ([]*schema.Tag, error)
- func (r *JobRepository) AddTagOrCreate(user *schema.User, jobID int64, tagType string, tagName string, ...) (tagID int64, err error)
- func (r *JobRepository) AddTagOrCreateDirect(jobID int64, tagType string, tagName string) (tagID int64, err error)
- func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error)
- func (r *JobRepository) CountJobs(ctx context.Context, filters []*model.JobFilter) (int, error)
- func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts map[string]int, err error)
- func (r *JobRepository) CreateTag(tagType string, tagName string, tagScope string) (tagID int64, err error)
- func (r *JobRepository) DeleteJobByID(id int64) error
- func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error)
- func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error
- func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float64, error)
- func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, error)
- func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error)
- func (r *JobRepository) Find(jobID *int64, cluster *string, startTime *int64) (*schema.Job, error)
- func (r *JobRepository) FindAll(jobID *int64, cluster *string, startTime *int64) ([]*schema.Job, error)
- func (r *JobRepository) FindByID(ctx context.Context, jobID int64) (*schema.Job, error)
- func (r *JobRepository) FindByIDDirect(jobID int64) (*schema.Job, error)
- func (r *JobRepository) FindByIDWithUser(user *schema.User, jobID int64) (*schema.Job, error)
- func (r *JobRepository) FindByJobID(ctx context.Context, jobID int64, startTime int64, cluster string) (*schema.Job, error)
- func (r *JobRepository) FindCached(jobID *int64, cluster *string, startTime *int64) (*schema.Job, error)
- func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, ...) (result string, err error)
- func (r *JobRepository) FindColumnValues(user *schema.User, query string, table string, selectColumn string, ...) (results []string, err error)
- func (r *JobRepository) FindConcurrentJobs(ctx context.Context, job *schema.Job) (*model.JobLinkResultList, error)
- func (r *JobRepository) FindJobIdsByTag(tagID int64) ([]int64, error)
- func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error)
- func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error)
- func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm string) (jobid string, username string, project string, jobname string)
- func (r *JobRepository) Flush() error
- func (r *JobRepository) GetJobList(limit int, offset int) ([]int64, error)
- func (r *JobRepository) GetTags(user *schema.User, job *int64) ([]*schema.Tag, error)
- func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error)
- func (r *JobRepository) GetUsedNodes(ts int64) (map[string][]string, error)
- func (r *JobRepository) HasTag(jobID int64, tagType string, tagName string) bool
- func (r *JobRepository) ImportTag(jobID int64, tagType string, tagName string, tagScope string) (err error)
- func (r *JobRepository) InsertJob(job *schema.Job) (int64, error)
- func (r *JobRepository) InsertJobDirect(job *schema.Job) (int64, error)
- func (r *JobRepository) IsJobOwner(jobID int64, startTime int64, user string, cluster string) bool
- func (r *JobRepository) JobCountGrouped(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error)
- func (r *JobRepository) JobsStats(ctx context.Context, filter []*model.JobFilter) ([]*model.JobsStatistics, error)
- func (r *JobRepository) JobsStatsGrouped(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, ...) ([]*model.JobsStatistics, error)
- func (r *JobRepository) MarkArchived(stmt sq.UpdateBuilder, monitoringStatus int32) sq.UpdateBuilder
- func (r *JobRepository) Optimize() error
- func (r *JobRepository) Partitions(cluster string) ([]string, error)
- func (r *JobRepository) QueryJobs(ctx context.Context, filters []*model.JobFilter, page *model.PageRequest, ...) ([]*schema.Job, error)
- func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagType string, tagName string, tagScope string) ([]*schema.Tag, error)
- func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error)
- func (r *JobRepository) RemoveTagByID(tagID int64) error
- func (r *JobRepository) RemoveTagByRequest(tagType string, tagName string, tagScope string) error
- func (r *JobRepository) Start(job *schema.Job) (id int64, err error)
- func (r *JobRepository) StartDirect(job *schema.Job) (id int64, err error)
- func (r *JobRepository) Stop(jobID int64, duration int32, state schema.JobState, monitoringStatus int32) (err error)
- func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error
- func (r *JobRepository) SyncJobs() ([]*schema.Job, error)
- func (r *JobRepository) TagID(tagType string, tagName string, tagScope string) (tagID int64, exists bool)
- func (r *JobRepository) TagInfo(tagID int64) (tagType string, tagName string, tagScope string, exists bool)
- func (r *JobRepository) TransactionAdd(t *Transaction, query string, args ...any) (int64, error)
- func (r *JobRepository) TransactionAddNamed(t *Transaction, query string, args ...any) (int64, error)
- func (r *JobRepository) TransactionEnd(t *Transaction) error
- func (r *JobRepository) TransactionInit() (*Transaction, error)
- func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error)
- func (r *JobRepository) UpdateDuration() error
- func (r *JobRepository) UpdateEnergy(stmt sq.UpdateBuilder, jobMeta *schema.Job) (sq.UpdateBuilder, error)
- func (r *JobRepository) UpdateFootprint(stmt sq.UpdateBuilder, jobMeta *schema.Job) (sq.UpdateBuilder, error)
- func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err error)
- func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error)
- type NodeRepository
- func (r *NodeRepository) AddNode(node *schema.NodeDB) (int64, error)
- func (r *NodeRepository) CountNodes(ctx context.Context, filters []*model.NodeFilter) (int, error)
- func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStates, error)
- func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStatesTimed, error)
- func (r *NodeRepository) DeleteNode(id int64) error
- func (r *NodeRepository) DeleteNodeStatesBefore(cutoff int64) (int64, error)
- func (r *NodeRepository) FetchMetadata(hostname string, cluster string) (map[string]string, error)
- func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error)
- func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool) (*schema.Node, error)
- func (r *NodeRepository) GetNodeByID(id int64, withMeta bool) (*schema.Node, error)
- func (r *NodeRepository) GetNodesForList(ctx context.Context, cluster string, subCluster string, stateFilter string, ...) ([]string, map[string]string, int, bool, error)
- func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error)
- func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error)
- func (r *NodeRepository) QueryNodes(ctx context.Context, filters []*model.NodeFilter, page *model.PageRequest, ...) ([]*schema.Node, error)
- func (r *NodeRepository) QueryNodesWithMeta(ctx context.Context, filters []*model.NodeFilter, page *model.PageRequest, ...) ([]*schema.Node, error)
- func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeState *schema.NodeStateDB) error
- type NodeStateWithNode
- type RepositoryConfig
- type Transaction
- type UserCfgRepo
- type UserRepository
- func (r *UserRepository) AddProject(ctx context.Context, username string, project string) error
- func (r *UserRepository) AddRole(ctx context.Context, username string, queryrole string) error
- func (r *UserRepository) AddUser(user *schema.User) error
- func (r *UserRepository) DelUser(username string) error
- func (r *UserRepository) FetchUserInCtx(ctx context.Context, username string) (*model.User, error)
- func (r *UserRepository) GetLdapUsernames() ([]string, error)
- func (r *UserRepository) GetUser(username string) (*schema.User, error)
- func (r *UserRepository) ListUsers(specialsOnly bool) ([]*schema.User, error)
- func (r *UserRepository) RemoveProject(ctx context.Context, username string, project string) error
- func (r *UserRepository) RemoveRole(ctx context.Context, username string, queryrole string) error
- func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error
Constants ¶
const NamedJobCacheInsert string = `` /* 563-byte string literal not displayed */
const NamedJobInsert string = `` /* 557-byte string literal not displayed */
const NamedNodeInsert string = `
INSERT INTO node (hostname, cluster, subcluster)
VALUES (:hostname, :cluster, :subcluster);`
const NamedNodeStateInsert string = `` /* 299-byte string literal not displayed */
const Version uint = 10
Version is the current database schema version required by this version of cc-backend. When the database schema changes, this version is incremented and a new migration file is added to internal/repository/migrations/sqlite3/.
Version history:
- Version 10: Current version
Migration files are embedded at build time from the migrations directory.
Variables ¶
var ( ErrNotFound = errors.New("no such jobname, project or user") ErrForbidden = errors.New("not authorized") )
var ( // ErrTagNotFound is returned when a tag ID or tag identifier (type, name, scope) does not exist in the database. ErrTagNotFound = errors.New("the tag does not exist") // ErrJobNotOwned is returned when a user attempts to tag a job they do not have permission to access. ErrJobNotOwned = errors.New("user is not owner of job") // ErrTagNoAccess is returned when a user attempts to use a tag they cannot access due to scope restrictions. ErrTagNoAccess = errors.New("user not permitted to use that tag") // ErrTagPrivateScope is returned when a user attempts to access another user's private tag. ErrTagPrivateScope = errors.New("tag is private to another user") // ErrTagAdminScope is returned when a non-admin user attempts to use an admin-scoped tag. ErrTagAdminScope = errors.New("tag requires admin privileges") // ErrTagsIncompatScopes is returned when attempting to combine admin and non-admin scoped tags in a single operation. ErrTagsIncompatScopes = errors.New("combining admin and non-admin scoped tags not allowed") )
Functions ¶
func AccessCheck ¶ added in v1.5.0
func AccessCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error)
func AccessCheckWithUser ¶ added in v1.5.0
func AccessCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error)
func BuildWhereClause ¶
func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder
BuildWhereClause constructs SQL WHERE conditions from a JobFilter and applies them to the query. Supports filtering by job properties (cluster, state, user), time ranges, resource usage, tags, and JSON field searches in meta_data, footprint, and resources columns.
func CallJobStartHooks ¶ added in v1.5.0
CallJobStartHooks invokes all registered JobHook.JobStartCallback methods for each job in the provided slice. This is called internally by the repository when jobs are started (e.g., via StartJob or batch job imports).
Hooks are called synchronously in registration order. If a hook panics, the panic will propagate to the caller.
func CallJobStopHooks ¶ added in v1.5.0
CallJobStopHooks invokes all registered JobHook.JobStopCallback methods for the provided job. This is called internally by the repository when a job completes (e.g., via StopJob or job state updates).
Hooks are called synchronously in registration order. If a hook panics, the panic will propagate to the caller.
func ForceDB ¶ added in v1.3.0
ForceDB forces the database schema version to the current Version without running migrations. This is only used to recover from failed migrations that left the database in a "dirty" state.
When to use:
- After manually fixing a failed migration
- When you've manually applied schema changes and need to update the version marker
Warning:
- This does NOT apply any schema changes
- Only use after manually verifying the schema is correct
- Improper use can cause schema/version mismatch
Usage:
cc-backend -force-db
func LoadJobStat ¶ added in v1.4.0
LoadJobStat retrieves a specific statistic for a metric from a job's statistics. Returns 0.0 if the metric is not found or statType is invalid.
Parameters:
- job: Job struct with populated Statistics field
- metric: Name of the metric to query (e.g., "cpu_load", "mem_used")
- statType: Type of statistic: "avg", "min", or "max"
Returns the requested statistic value or 0.0 if not found.
func MigrateDB ¶
MigrateDB applies all pending database migrations to bring the schema up to date. This should be run with the -migrate-db flag before starting the application after upgrading to a new version that requires schema changes.
Process:
- Checks current database version
- Applies all migrations from current version to target Version
- Updates schema_migrations table to track applied migrations
Important:
- Always backup your database before running migrations
- Migrations are irreversible without manual intervention
- If a migration fails, the database is marked "dirty" and requires manual fix
Usage:
cc-backend -migrate-db
func RegisterJobHook ¶ added in v1.5.0
func RegisterJobHook(hook JobHook)
RegisterJobHook registers a JobHook to receive job lifecycle callbacks. Multiple hooks can be registered and will be called in registration order. This function is safe to call multiple times and is typically called during application initialization.
Nil hooks are silently ignored to simplify conditional registration.
func ResetConnection ¶ added in v1.5.0
func ResetConnection() error
ResetConnection closes the current database connection and resets the connection state. This function is intended for testing purposes only to allow test isolation.
func RevertDB ¶ added in v1.3.0
RevertDB rolls back the database schema to the previous version (Version - 1). This is primarily used for testing or emergency rollback scenarios.
Warning:
- This may cause data loss if newer schema added columns/tables
- Always backup before reverting
- Not all migrations are safely reversible
Usage:
cc-backend -revert-db
func SecurityCheck ¶
func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error)
SecurityCheck extracts the user from the context and applies role-based access control filters to the query. This is a convenience wrapper around SecurityCheckWithUser.
func SecurityCheckWithUser ¶ added in v1.4.0
func SecurityCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error)
SecurityCheckWithUser applies role-based access control filters to a job query based on the provided user's roles and permissions.
Access rules by role:
- API role (exclusive): Full access to all jobs
- Admin/Support roles: Full access to all jobs
- Manager role: Access to jobs in managed projects plus own jobs
- User role: Access only to own jobs
Returns an error if the user is nil or has no recognized roles.
func SetConfig ¶ added in v1.5.0
func SetConfig(cfg *RepositoryConfig)
SetConfig sets the repository configuration. This must be called before any repository initialization (Connect, GetJobRepository, etc.). If not called, default values from DefaultConfig() are used.
Types ¶
type DBConnection ¶
func GetConnection ¶
func GetConnection() *DBConnection
type DatabaseOptions ¶
type Hooks ¶
type Hooks struct{}
Hooks satisfies the sqlhook.Hooks interface
type JobHook ¶ added in v1.5.0
type JobHook interface {
// JobStartCallback is invoked when one or more jobs start.
// This is called synchronously, so implementations should be fast.
JobStartCallback(job *schema.Job)
// JobStopCallback is invoked when a job completes.
// This is called synchronously, so implementations should be fast.
JobStopCallback(job *schema.Job)
}
JobHook interface allows external components to hook into job lifecycle events. Implementations can perform actions when jobs start or stop, such as tagging, logging, notifications, or triggering external workflows.
Example implementation:
type MyJobTagger struct{}
func (t *MyJobTagger) JobStartCallback(job *schema.Job) {
if job.NumNodes > 100 {
// Tag large jobs automatically
}
}
func (t *MyJobTagger) JobStopCallback(job *schema.Job) {
if job.State == schema.JobStateFailed {
// Log or alert on failed jobs
}
}
Register hooks during application initialization:
repository.RegisterJobHook(&MyJobTagger{})
type JobRepository ¶
type JobRepository struct {
DB *sqlx.DB // Database connection pool
Mutex sync.Mutex // Mutex for thread-safe operations
// contains filtered or unexported fields
}
JobRepository provides database access for job-related operations. It implements the repository pattern to abstract database interactions and provides caching for improved performance.
The repository is a singleton initialized via GetJobRepository(). All database queries use prepared statements via stmtCache for efficiency. Frequently accessed data (metadata, energy footprints) is cached in an LRU cache.
func GetJobRepository ¶
func GetJobRepository() *JobRepository
GetJobRepository returns the singleton instance of JobRepository. The repository is initialized lazily on first access with database connection, prepared statement cache, and LRU cache configured from repoConfig.
This function is thread-safe and ensures only one instance is created. It must be called after Connect() has established a database connection.
func (*JobRepository) AddHistograms ¶
func (r *JobRepository) AddHistograms( ctx context.Context, filter []*model.JobFilter, stat *model.JobsStatistics, durationBins *string, ) (*model.JobsStatistics, error)
AddHistograms augments statistics with distribution histograms for job properties.
Generates histogram data for visualization of job duration, node count, core count, and accelerator count distributions. Duration histogram uses intelligent binning based on the requested resolution.
Parameters:
- ctx: Context for security checks
- filter: Filters to apply to jobs included in histograms
- stat: Statistics struct to augment (modified in-place)
- durationBins: Bin size - "1m", "10m", "1h", "6h", "12h", or "24h" (default)
Populates these fields in stat:
- HistDuration: Job duration distribution (zero-padded bins)
- HistNumNodes: Node count distribution
- HistNumCores: Core (hwthread) count distribution
- HistNumAccs: Accelerator count distribution
Duration bins are pre-initialized with zeros to ensure consistent ranges for visualization. Bin size determines both the width and maximum duration displayed (e.g., "1h" = 48 bins × 1h = 48h max).
func (*JobRepository) AddJobCount ¶
func (r *JobRepository) AddJobCount( ctx context.Context, filter []*model.JobFilter, stats []*model.JobsStatistics, kind string, ) ([]*model.JobsStatistics, error)
AddJobCount augments existing overall statistics with additional job counts by category.
Similar to AddJobCountGrouped but for ungrouped statistics. Applies the same count to all statistics entries (typically just one).
Parameters:
- ctx: Context for security checks
- filter: Filters to apply
- stats: Existing statistics to augment (modified in-place)
- kind: "running" to add RunningJobs count, "short" to add ShortJobs count
Returns the same stats slice with RunningJobs or ShortJobs fields set to the total count.
func (*JobRepository) AddJobCountGrouped ¶
func (r *JobRepository) AddJobCountGrouped( ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate, stats []*model.JobsStatistics, kind string, ) ([]*model.JobsStatistics, error)
AddJobCountGrouped augments existing statistics with additional job counts by category.
This method enriches JobsStatistics returned by JobsStatsGrouped or JobCountGrouped with counts of running or short-running (based on ShortRunningJobsDuration) jobs, matched by group ID.
Parameters:
- ctx: Context for security checks
- filter: Filters to apply
- groupBy: Grouping dimension (must match the dimension used for stats parameter)
- stats: Existing statistics to augment (modified in-place by ID matching)
- kind: "running" to add RunningJobs count, "short" to add ShortJobs count
Returns the same stats slice with RunningJobs or ShortJobs fields populated per group. Groups without matching jobs will have 0 for the added field.
func (*JobRepository) AddMetricHistograms ¶ added in v1.3.0
func (r *JobRepository) AddMetricHistograms( ctx context.Context, filter []*model.JobFilter, metrics []string, stat *model.JobsStatistics, targetBinCount *int, ) (*model.JobsStatistics, error)
AddMetricHistograms augments statistics with distribution histograms for job metrics.
Generates histogram data for metrics like CPU load, memory usage, etc. Handles running and completed jobs differently: running jobs load data from metric backend, completed jobs use footprint data from database.
Parameters:
- ctx: Context for security checks
- filter: Filters to apply (MUST contain State filter for running jobs)
- metrics: List of metric names to histogram (e.g., ["cpu_load", "mem_used"])
- stat: Statistics struct to augment (modified in-place)
- targetBinCount: Number of histogram bins (default: 10)
Populates HistMetrics field in stat with MetricHistoPoints for each metric.
Binning algorithm:
- Values normalized to metric's peak value from cluster configuration
- Bins evenly distributed from 0 to peak
- Pre-initialized with zeros for consistent visualization
Limitations:
- Running jobs: Limited to 5000 jobs for performance
- Requires valid cluster configuration with metric peak values
- Uses footprint statistic (avg/max/min) configured per metric
func (*JobRepository) AddTag ¶
AddTag adds the tag with id `tagId` to the job with the database id `jobId`. Requires user authentication for security checks.
The user must have permission to view the job. Tag visibility is determined by scope:
- "global" tags: visible to all users
- "private" tags: only visible to the tag creator
- "admin" tags: only visible to admin/support users
func (*JobRepository) AddTagDirect ¶ added in v1.5.0
AddTagDirect adds a tag without user security checks. Use only for internal/admin operations.
func (*JobRepository) AddTagOrCreate ¶
func (r *JobRepository) AddTagOrCreate(user *schema.User, jobID int64, tagType string, tagName string, tagScope string) (tagID int64, err error)
AddTagOrCreate adds the tag with the specified type and name to the job with the database id `jobId`. If such a tag does not yet exist, it is created.
func (*JobRepository) AddTagOrCreateDirect ¶ added in v1.5.0
func (*JobRepository) AllocatedNodes ¶
AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. Hosts with zero jobs running on them will not show up!
func (*JobRepository) CountJobs ¶
CountJobs returns the total number of jobs matching the given filters. Security controls are automatically applied based on the user context. Uses DISTINCT count to handle tag filters correctly (jobs may appear multiple times when joined with the tag table).
func (*JobRepository) CountTags ¶
func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts map[string]int, err error)
CountTags returns all tags visible to the user and the count of jobs for each tag. Applies scope-based filtering to respect tag visibility rules.
Returns:
- tags: slice of tags the user can see
- counts: map of tag name to job count
- err: any error encountered
func (*JobRepository) CreateTag ¶
func (r *JobRepository) CreateTag(tagType string, tagName string, tagScope string) (tagID int64, err error)
CreateTag creates a new tag with the specified type, name, and scope. Returns the database ID of the newly created tag.
Scope defaults to "global" if empty string is provided. Valid scopes: "global", "private", "admin"
Example:
tagID, err := repo.CreateTag("performance", "high-memory", "global")
func (*JobRepository) DeleteJobByID ¶ added in v1.5.0
func (r *JobRepository) DeleteJobByID(id int64) error
DeleteJobByID permanently removes a single job by its database ID. Cache entries for the deleted job are automatically invalidated.
Parameters:
- id: Database ID (primary key) of the job to delete
Returns an error if the deletion fails.
func (*JobRepository) DeleteJobsBefore ¶
func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged string) (int, error)
DeleteJobsBefore removes jobs older than the specified start time. Optionally preserves tagged jobs to protect important data from deletion. Cache entries for deleted jobs are automatically invalidated.
This is typically used for data retention policies and cleanup operations. WARNING: This is a destructive operation that permanently deletes job records.
Parameters:
- startTime: Unix timestamp, jobs with start_time < this value will be deleted
- omitTagged: "none" = delete all jobs, "all" = skip any tagged jobs, "user" = skip jobs with user-created tags (not auto-tagger types "app"/"jobClass")
Returns the count of deleted jobs or an error if the operation fails.
func (*JobRepository) Execute ¶ added in v1.4.0
func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error
Execute runs a Squirrel UpdateBuilder statement against the database. This is a generic helper for executing pre-built update queries.
Parameters:
- stmt: Squirrel UpdateBuilder with prepared update query
Returns an error if the execution fails.
func (*JobRepository) FetchEnergyFootprint ¶ added in v1.4.0
FetchEnergyFootprint retrieves and unmarshals the energy footprint JSON for a job. Energy footprints are cached with a 24-hour TTL as they are frequently accessed but rarely change.
The energy footprint contains calculated energy consumption (in kWh) per metric, stored as JSON with keys like "power_avg", "acc_power_avg", etc.
Parameters:
- job: Job struct with valid ID, energy footprint will be populated in job.EnergyFootprint
Returns the energy footprint map or an error if the job is nil or database query fails.
func (*JobRepository) FetchFootprint ¶ added in v1.4.0
FetchFootprint retrieves and unmarshals the performance footprint JSON for a job. Unlike FetchMetadata, footprints are NOT cached as they can be large and change frequently.
The footprint contains summary statistics (avg/min/max) for monitored metrics, stored as JSON with keys like "cpu_load_avg", "mem_used_max", etc.
Parameters:
- job: Job struct with valid ID, footprint will be populated in job.Footprint
Returns the footprint map or an error if the job is nil or database query fails.
func (*JobRepository) FetchMetadata ¶
FetchMetadata retrieves and unmarshals the metadata JSON for a job. Metadata is cached with a 24-hour TTL to improve performance.
The metadata field stores arbitrary key-value pairs associated with a job, such as tags, labels, or custom attributes added by external systems.
Parameters:
- job: Job struct with valid ID field, metadata will be populated in job.MetaData
Returns the metadata map or an error if the job is nil or database query fails.
func (*JobRepository) Find ¶
func (r *JobRepository) Find( jobID *int64, cluster *string, startTime *int64, ) (*schema.Job, error)
Find executes a SQL query to find a specific batch job. The job is queried using the batch job id, the cluster name, and the start time of the job in UNIX epoch time seconds. It returns a pointer to a schema.Job data structure and an error variable. To check if no job was found test err == sql.ErrNoRows
func (*JobRepository) FindAll ¶
func (r *JobRepository) FindAll( jobID *int64, cluster *string, startTime *int64, ) ([]*schema.Job, error)
FindAll executes a SQL query to find all batch jobs matching the given criteria. Jobs are queried using the batch job id, and optionally filtered by cluster name and start time (UNIX epoch time seconds). It returns a slice of pointers to schema.Job data structures and an error variable. An empty slice is returned if no matching jobs are found.
func (*JobRepository) FindByID ¶ added in v1.5.0
FindByID executes a SQL query to find a specific batch job. The job is queried using the database id. It returns a pointer to a schema.Job data structure and an error variable. To check if no job was found test err == sql.ErrNoRows
func (*JobRepository) FindByIDDirect ¶ added in v1.5.0
func (r *JobRepository) FindByIDDirect(jobID int64) (*schema.Job, error)
FindByIDDirect executes a SQL query to find a specific batch job. The job is queried using the database id. It returns a pointer to a schema.Job data structure and an error variable. To check if no job was found test err == sql.ErrNoRows
func (*JobRepository) FindByIDWithUser ¶ added in v1.5.0
FindByIDWithUser executes a SQL query to find a specific batch job. The job is queried using the database id. The user is passed directly, instead as part of the context. It returns a pointer to a schema.Job data structure and an error variable. To check if no job was found test err == sql.ErrNoRows
func (*JobRepository) FindByJobID ¶ added in v1.5.0
func (r *JobRepository) FindByJobID(ctx context.Context, jobID int64, startTime int64, cluster string) (*schema.Job, error)
FindByJobID executes a SQL query to find a specific batch job. The job is queried using the slurm id and the clustername. It returns a pointer to a schema.Job data structure and an error variable. To check if no job was found test err == sql.ErrNoRows
func (*JobRepository) FindCached ¶ added in v1.5.0
func (r *JobRepository) FindCached( jobID *int64, cluster *string, startTime *int64, ) (*schema.Job, error)
FindCached executes a SQL query to find a specific batch job from the job_cache table. The job is queried using the batch job id, and optionally filtered by cluster name and start time (UNIX epoch time seconds). This method uses cached job data which may be stale but provides faster access than Find(). It returns a pointer to a schema.Job data structure and an error variable. To check if no job was found test err == sql.ErrNoRows
func (*JobRepository) FindColumnValue ¶
func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error)
FindColumnValue performs a generic column lookup in a database table with role-based access control. Only users with admin, support, or manager roles can execute this query.
Parameters:
- user: User context for authorization check
- searchterm: Value to search for (exact match or LIKE pattern)
- table: Database table name to query
- selectColumn: Column name to return in results
- whereColumn: Column name to filter on
- isLike: If true, use LIKE with wildcards; if false, use exact equality
Returns the first matching value, ErrForbidden if user lacks permission, or ErrNotFound if no matches are found.
func (*JobRepository) FindColumnValues ¶
func (r *JobRepository) FindColumnValues(user *schema.User, query string, table string, selectColumn string, whereColumn string) (results []string, err error)
FindColumnValues performs a generic column lookup returning multiple matches with role-based access control. Similar to FindColumnValue but returns all matching values instead of just the first. Only users with admin, support, or manager roles can execute this query.
Parameters:
- user: User context for authorization check
- query: Search pattern (always uses LIKE with wildcards)
- table: Database table name to query
- selectColumn: Column name to return in results
- whereColumn: Column name to filter on
Returns a slice of matching values, ErrForbidden if user lacks permission, or ErrNotFound if no matches are found.
func (*JobRepository) FindConcurrentJobs ¶
func (r *JobRepository) FindConcurrentJobs( ctx context.Context, job *schema.Job, ) (*model.JobLinkResultList, error)
func (*JobRepository) FindJobIdsByTag ¶ added in v1.5.0
func (r *JobRepository) FindJobIdsByTag(tagID int64) ([]int64, error)
FindJobIdsByTag returns all job database IDs associated with a specific tag.
Parameters:
- tagID: Database ID of the tag to search for
Returns a slice of job IDs or an error if the query fails.
func (*JobRepository) FindJobsBetween ¶
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64, omitTagged string) ([]*schema.Job, error)
FindJobsBetween returns jobs within a specified time range. If startTimeBegin is 0, returns all jobs before startTimeEnd. Optionally excludes tagged jobs from results.
Parameters:
- startTimeBegin: Unix timestamp for range start (use 0 for unbounded start)
- startTimeEnd: Unix timestamp for range end
- omitTagged: "none" = include all jobs, "all" = exclude any tagged jobs, "user" = exclude jobs with user-created tags (not auto-tagger types "app"/"jobClass")
Returns a slice of jobs or an error if the time range is invalid or query fails.
func (*JobRepository) FindRunningJobs ¶ added in v1.4.0
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error)
FindRunningJobs returns all currently running jobs for a specific cluster. Filters out short-running jobs based on repoConfig.MinRunningJobDuration threshold.
Parameters:
- cluster: Cluster name to filter jobs
Returns a slice of running job objects or an error if the query fails.
func (*JobRepository) FindUserOrProjectOrJobname ¶
func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm string) (jobid string, username string, project string, jobname string)
FindUserOrProjectOrJobname attempts to interpret a search term as a job ID, username, project ID, or job name by querying the database.
Search logic (in priority order):
- If searchterm is numeric, treat as job ID (returned immediately)
- Try exact match in job.hpc_user column (username)
- Try LIKE match in hpc_user.name column (real name)
- Try exact match in job.project column (project ID)
- If no matches, return searchterm as jobname for GraphQL query
This powers the searchbar functionality for flexible job searching. Requires authenticated user for database lookups (returns empty if user is nil).
Parameters:
- user: Authenticated user context, required for database access
- searchterm: Search string to interpret
Returns up to one non-empty value among (jobid, username, project, jobname).
func (*JobRepository) Flush ¶
func (r *JobRepository) Flush() error
Flush removes all data from job-related tables (jobtag, tag, job). WARNING: This is a destructive operation that deletes all job data. Use with extreme caution, typically only for testing or complete resets.
func (*JobRepository) GetJobList ¶ added in v1.5.0
func (r *JobRepository) GetJobList(limit int, offset int) ([]int64, error)
GetJobList returns job IDs for non-running jobs. This is useful to process large job counts and intended to be used together with FindById to process jobs one by one. Use limit and offset for pagination. Use limit=0 to get all results (not recommended for large datasets).
func (*JobRepository) GetTags ¶
GetTags returns a list of all scoped tags if job is nil or of the tags that the job with that database ID has.
func (*JobRepository) GetTagsDirect ¶ added in v1.5.0
func (r *JobRepository) GetTagsDirect(job *int64) ([]*schema.Tag, error)
func (*JobRepository) GetUsedNodes ¶ added in v1.5.0
func (r *JobRepository) GetUsedNodes(ts int64) (map[string][]string, error)
GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames that are currently in use by jobs that started before the given timestamp and are still in running state.
The timestamp parameter (ts) is compared against job.start_time to find relevant jobs. Returns an error if the database query fails or row iteration encounters errors. Individual row parsing errors are logged but don't fail the entire operation.
func (*JobRepository) HasTag ¶ added in v1.5.0
func (r *JobRepository) HasTag(jobID int64, tagType string, tagName string) bool
func (*JobRepository) InsertJobDirect ¶ added in v1.5.0
func (r *JobRepository) InsertJobDirect(job *schema.Job) (int64, error)
InsertJobDirect inserts a job directly into the job table (not job_cache). Use this when the returned ID will be used for operations on the job table (e.g., adding tags), or for imported jobs that are already completed.
func (*JobRepository) IsJobOwner ¶ added in v1.4.0
IsJobOwner checks if the specified user owns the batch job identified by jobID, startTime, and cluster. Returns true if the user is the owner, false otherwise. This method does not return errors; it returns false for both non-existent jobs and jobs owned by other users.
func (*JobRepository) JobCountGrouped ¶
func (r *JobRepository) JobCountGrouped( ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate, ) ([]*model.JobsStatistics, error)
JobCountGrouped counts jobs grouped by a dimension without computing detailed statistics.
This is a lightweight alternative to JobsStatsGrouped when only job counts are needed, avoiding the overhead of calculating walltime and resource usage metrics.
Parameters:
- ctx: Context for security checks
- filter: Filters to apply
- groupBy: Grouping dimension (User, Project, Cluster, or SubCluster)
Returns JobsStatistics with only ID and TotalJobs populated for each group.
func (*JobRepository) JobsStats ¶
func (r *JobRepository) JobsStats( ctx context.Context, filter []*model.JobFilter, ) ([]*model.JobsStatistics, error)
JobsStats computes overall job statistics across all matching jobs without grouping.
This method provides a single aggregate view of job metrics, useful for dashboard summaries and overall system utilization reports.
Parameters:
- ctx: Context for security checks and cancellation
- filter: Filters to apply (time range, cluster, job state, etc.)
Returns a single-element slice containing aggregate statistics:
- totalJobs, totalUsers, totalWalltime
- totalNodeHours, totalCoreHours, totalAccHours
Unlike JobsStatsGrouped, this returns overall totals without breaking down by dimension. Security checks are applied via SecurityCheck to respect user access levels.
func (*JobRepository) JobsStatsGrouped ¶
func (r *JobRepository) JobsStatsGrouped( ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate, ) ([]*model.JobsStatistics, error)
JobsStatsGrouped computes comprehensive job statistics grouped by a dimension (user, project, cluster, or subcluster).
This is the primary method for generating aggregated statistics views in the UI, providing metrics like total jobs, walltime, and resource usage broken down by the specified grouping.
Parameters:
- ctx: Context for security checks and cancellation
- filter: Filters to apply (time range, cluster, job state, etc.)
- page: Optional pagination (ItemsPerPage: -1 disables pagination)
- sortBy: Optional sort column (totalJobs, totalWalltime, totalCoreHours, etc.)
- groupBy: Required grouping dimension (User, Project, Cluster, or SubCluster)
Returns a slice of JobsStatistics, one per group, with:
- ID: The group identifier (username, project name, cluster name, etc.)
- Name: Display name (for users, from hpc_user.name; empty for other groups)
- Statistics: totalJobs, totalUsers, totalWalltime, resource usage metrics
Security: Respects user roles via SecurityCheck - users see only their own data unless admin/support. Performance: Results are sorted in SQL and pagination applied before scanning rows.
func (*JobRepository) MarkArchived ¶
func (r *JobRepository) MarkArchived( stmt sq.UpdateBuilder, monitoringStatus int32, ) sq.UpdateBuilder
MarkArchived adds monitoring status update to an existing UpdateBuilder statement. This is a builder helper used when constructing multi-field update queries.
Parameters:
- stmt: Existing UpdateBuilder to modify
- monitoringStatus: Monitoring status value to set
Returns the modified UpdateBuilder for method chaining.
func (*JobRepository) Optimize ¶
func (r *JobRepository) Optimize() error
Optimize performs database optimization by running VACUUM command. This reclaims unused space and defragments the database file. Should be run periodically during maintenance windows.
func (*JobRepository) Partitions ¶
func (r *JobRepository) Partitions(cluster string) ([]string, error)
Partitions returns a list of distinct cluster partitions for a given cluster. Results are cached with a 1-hour TTL to improve performance.
Parameters:
- cluster: Cluster name to query partitions for
Returns a slice of partition names or an error if the database query fails.
func (*JobRepository) QueryJobs ¶
func (r *JobRepository) QueryJobs( ctx context.Context, filters []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput, ) ([]*schema.Job, error)
QueryJobs retrieves jobs from the database with optional filtering, pagination, and sorting. Security controls are automatically applied based on the user context.
Parameters:
- ctx: Context containing user authentication information
- filters: Optional job filters (cluster, state, user, time ranges, etc.)
- page: Optional pagination parameters (page number and items per page)
- order: Optional sorting specification (column or footprint field)
Returns a slice of jobs matching the criteria, or an error if the query fails. The function enforces role-based access control through SecurityCheck.
func (*JobRepository) RemoveJobTagByRequest ¶ added in v1.4.4
func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagType string, tagName string, tagScope string) ([]*schema.Tag, error)
RemoveJobTagByRequest removes a tag from the job with the database id `job` by tag type, name, and scope. Requires user authentication for security checks. Used by REST API.
func (*JobRepository) RemoveTag ¶
RemoveTag removes the tag with the database id `tag` from the job with the database id `job`. Requires user authentication for security checks. Used by GraphQL API.
func (*JobRepository) RemoveTagByID ¶ added in v1.5.0
func (r *JobRepository) RemoveTagByID(tagID int64) error
Removes a tag from db by tag id Used by GraphQL API.
func (*JobRepository) RemoveTagByRequest ¶ added in v1.4.4
func (r *JobRepository) RemoveTagByRequest(tagType string, tagName string, tagScope string) error
Removes a tag from db by tag info Used by REST API. Does not update tagged jobs in Job archive.
func (*JobRepository) Start ¶
func (r *JobRepository) Start(job *schema.Job) (id int64, err error)
Start inserts a new job in the table, returning the unique job ID. Statistics are not transfered!
func (*JobRepository) StartDirect ¶ added in v1.5.0
func (r *JobRepository) StartDirect(job *schema.Job) (id int64, err error)
StartDirect inserts a new job directly into the job table (not job_cache). Use this when the returned ID will immediately be used for job table operations such as adding tags.
func (*JobRepository) Stop ¶
func (r *JobRepository) Stop( jobID int64, duration int32, state schema.JobState, monitoringStatus int32, ) (err error)
Stop updates the job with the database id jobId using the provided arguments.
func (*JobRepository) StopJobsExceedingWalltimeBy ¶
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error
StopJobsExceedingWalltimeBy marks running jobs as failed if they exceed their walltime limit. This is typically called periodically to clean up stuck or orphaned jobs.
Jobs are marked with:
- monitoring_status: MonitoringStatusArchivingFailed
- duration: 0
- job_state: JobStateFailed
Parameters:
- seconds: Grace period beyond walltime before marking as failed
Returns an error if the database update fails. Logs the number of jobs marked as failed if any were affected.
func (*JobRepository) SyncJobs ¶ added in v1.5.0
func (r *JobRepository) SyncJobs() ([]*schema.Job, error)
func (*JobRepository) TagID ¶ added in v1.5.0
func (r *JobRepository) TagID(tagType string, tagName string, tagScope string) (tagID int64, exists bool)
TagID returns the database id of the tag with the specified type and name.
func (*JobRepository) TagInfo ¶ added in v1.4.4
func (r *JobRepository) TagInfo(tagID int64) (tagType string, tagName string, tagScope string, exists bool)
TagInfo returns the database infos of the tag with the specified id.
func (*JobRepository) TransactionAdd ¶
func (r *JobRepository) TransactionAdd(t *Transaction, query string, args ...any) (int64, error)
TransactionAdd executes a query within the transaction.
func (*JobRepository) TransactionAddNamed ¶ added in v1.4.0
func (r *JobRepository) TransactionAddNamed( t *Transaction, query string, args ...any, ) (int64, error)
TransactionAddNamed executes a named query within the transaction.
func (*JobRepository) TransactionEnd ¶
func (r *JobRepository) TransactionEnd(t *Transaction) error
TransactionEnd commits the transaction. Deprecated: Use Commit() instead.
func (*JobRepository) TransactionInit ¶
func (r *JobRepository) TransactionInit() (*Transaction, error)
TransactionInit begins a new transaction.
func (*JobRepository) TransferCachedJobToMain ¶ added in v1.5.0
func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error)
TransferCachedJobToMain moves a job from job_cache to the job table. Caller must hold r.Mutex. Returns the new job table ID.
func (*JobRepository) UpdateDuration ¶ added in v1.4.0
func (r *JobRepository) UpdateDuration() error
UpdateDuration recalculates and updates the duration field for all running jobs. Called periodically to keep job durations current without querying individual jobs.
Duration is calculated as: current_time - job.start_time
Returns an error if the database update fails.
func (*JobRepository) UpdateEnergy ¶ added in v1.4.0
func (r *JobRepository) UpdateEnergy( stmt sq.UpdateBuilder, jobMeta *schema.Job, ) (sq.UpdateBuilder, error)
UpdateEnergy calculates and updates the energy consumption for a job. This is called for running jobs during intermediate updates or when archiving.
Energy calculation formula:
- For "power" metrics: Energy (kWh) = (Power_avg * NumNodes * Duration_hours) / 1000
- For "energy" metrics: Currently not implemented (would need sum statistics)
The calculation accounts for:
- Multi-node jobs: Multiplies by NumNodes to get total cluster energy
- Shared jobs: Node average is already based on partial resources, so NumNodes=1
- Unit conversion: Watts * hours / 1000 = kilowatt-hours (kWh)
- Rounding: Results rounded to 2 decimal places
func (*JobRepository) UpdateFootprint ¶ added in v1.4.0
func (r *JobRepository) UpdateFootprint( stmt sq.UpdateBuilder, jobMeta *schema.Job, ) (sq.UpdateBuilder, error)
UpdateFootprint calculates and updates the performance footprint for a job. This is called for running jobs during intermediate updates or when archiving.
A footprint is a summary statistic (avg/min/max) for each monitored metric. The specific statistic type is defined in the cluster config's Footprint field. Results are stored as JSON with keys like "metric_avg", "metric_max", etc.
Example: For a "cpu_load" metric with Footprint="avg", this stores the average CPU load across all nodes as "cpu_load_avg": 85.3
func (*JobRepository) UpdateMetadata ¶
func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err error)
UpdateMetadata adds or updates a single metadata key-value pair for a job. The entire metadata map is re-marshaled and stored, and the cache is invalidated. Also triggers archive metadata update via archive.UpdateMetadata.
Parameters:
- job: Job struct with valid ID, existing metadata will be fetched if not present
- key: Metadata key to set
- val: Metadata value to set
Returns an error if the job is nil, metadata fetch fails, or database update fails.
func (*JobRepository) UpdateMonitoringStatus ¶
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error)
UpdateMonitoringStatus updates the monitoring status for a job and invalidates its cache entries. Cache invalidation affects both metadata and energy footprint to ensure consistency.
Parameters:
- job: Database ID of the job to update
- monitoringStatus: New monitoring status value (see schema.MonitoringStatus constants)
Returns an error if the database update fails.
type NodeRepository ¶ added in v1.5.0
func GetNodeRepository ¶ added in v1.5.0
func GetNodeRepository() *NodeRepository
func (*NodeRepository) AddNode ¶ added in v1.5.0
func (r *NodeRepository) AddNode(node *schema.NodeDB) (int64, error)
AddNode adds a Node to the node table. This can be triggered by a node collector registration or from a nodestate update from the job scheduler.
func (*NodeRepository) CountNodes ¶ added in v1.5.0
func (r *NodeRepository) CountNodes( ctx context.Context, filters []*model.NodeFilter, ) (int, error)
CountNodes returns the total matched nodes based on a node filter. It always operates on the last state (largest timestamp) per node.
func (*NodeRepository) CountStates ¶ added in v1.5.0
func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStates, error)
func (*NodeRepository) CountStatesTimed ¶ added in v1.5.0
func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStatesTimed, error)
func (*NodeRepository) DeleteNode ¶ added in v1.5.0
func (r *NodeRepository) DeleteNode(id int64) error
func (*NodeRepository) DeleteNodeStatesBefore ¶ added in v1.5.0
func (r *NodeRepository) DeleteNodeStatesBefore(cutoff int64) (int64, error)
DeleteNodeStatesBefore removes node_state rows with time_stamp < cutoff, but always preserves the row with the latest timestamp per node_id.
func (*NodeRepository) FetchMetadata ¶ added in v1.5.0
func (*NodeRepository) FindNodeStatesBefore ¶ added in v1.5.0
func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error)
FindNodeStatesBefore returns all node_state rows with time_stamp < cutoff, joined with node info for denormalized archiving.
func (*NodeRepository) GetNodeByID ¶ added in v1.5.0
func (*NodeRepository) GetNodesForList ¶ added in v1.5.0
func (*NodeRepository) ListNodes ¶ added in v1.5.0
func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error)
func (*NodeRepository) MapNodes ¶ added in v1.5.0
func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error)
func (*NodeRepository) QueryNodes ¶ added in v1.5.0
func (r *NodeRepository) QueryNodes( ctx context.Context, filters []*model.NodeFilter, page *model.PageRequest, order *model.OrderByInput, ) ([]*schema.Node, error)
QueryNodes returns a list of nodes based on a node filter. It always operates on the last state (largest timestamp).
func (*NodeRepository) QueryNodesWithMeta ¶ added in v1.5.0
func (r *NodeRepository) QueryNodesWithMeta( ctx context.Context, filters []*model.NodeFilter, page *model.PageRequest, order *model.OrderByInput, ) ([]*schema.Node, error)
QueryNodesWithMeta returns a list of nodes based on a node filter. It always operates on the last state (largest timestamp). It includes both (!) optional JSON column data
func (*NodeRepository) UpdateNodeState ¶ added in v1.5.0
func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeState *schema.NodeStateDB) error
UpdateNodeState is called from the Node REST API to add a row in the node state table
type NodeStateWithNode ¶ added in v1.5.0
type NodeStateWithNode struct {
ID int64 `db:"id"`
TimeStamp int64 `db:"time_stamp"`
NodeState string `db:"node_state"`
HealthState string `db:"health_state"`
HealthMetrics string `db:"health_metrics"`
CpusAllocated int `db:"cpus_allocated"`
MemoryAllocated int64 `db:"memory_allocated"`
GpusAllocated int `db:"gpus_allocated"`
JobsRunning int `db:"jobs_running"`
Hostname string `db:"hostname"`
Cluster string `db:"cluster"`
SubCluster string `db:"subcluster"`
}
NodeStateWithNode combines a node state row with denormalized node info.
type RepositoryConfig ¶ added in v1.5.0
type RepositoryConfig struct {
// CacheSize is the LRU cache size in bytes for job metadata and energy footprints.
// Default: 1MB (1024 * 1024 bytes)
CacheSize int
// MaxOpenConnections is the maximum number of open database connections.
// Default: 4
MaxOpenConnections int
// MaxIdleConnections is the maximum number of idle database connections.
// Default: 4
MaxIdleConnections int
// ConnectionMaxLifetime is the maximum amount of time a connection may be reused.
// Default: 1 hour
ConnectionMaxLifetime time.Duration
// ConnectionMaxIdleTime is the maximum amount of time a connection may be idle.
// Default: 1 hour
ConnectionMaxIdleTime time.Duration
// MinRunningJobDuration is the minimum duration in seconds for a job to be
// considered in "running jobs" queries. This filters out very short jobs.
// Default: 600 seconds (10 minutes)
MinRunningJobDuration int
}
RepositoryConfig holds configuration for repository operations. All fields have sensible defaults, so this configuration is optional.
func DefaultConfig ¶ added in v1.5.0
func DefaultConfig() *RepositoryConfig
DefaultConfig returns the default repository configuration. These values are optimized for typical deployments.
func GetConfig ¶ added in v1.5.0
func GetConfig() *RepositoryConfig
GetConfig returns the current repository configuration.
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction wraps a database transaction for job-related operations.
func (*Transaction) Commit ¶ added in v1.5.0
func (t *Transaction) Commit() error
Commit commits the transaction. After calling Commit, the transaction should not be used again.
func (*Transaction) Rollback ¶ added in v1.5.0
func (t *Transaction) Rollback() error
Rollback rolls back the transaction. It's safe to call Rollback on an already committed or rolled back transaction.
type UserCfgRepo ¶
func GetUserCfgRepo ¶
func GetUserCfgRepo() *UserCfgRepo
func (*UserCfgRepo) GetUIConfig ¶
Return the personalised UI config for the currently authenticated user or return the plain default config.
func (*UserCfgRepo) UpdateConfig ¶
func (uCfg *UserCfgRepo) UpdateConfig( key, value string, user *schema.User, ) error
If the context does not have a user, update the global ui configuration without persisting it! If there is a (authenticated) user, update only his configuration.
type UserRepository ¶ added in v1.2.0
func GetUserRepository ¶ added in v1.2.0
func GetUserRepository() *UserRepository
func (*UserRepository) AddProject ¶ added in v1.2.0
AddProject assigns a project to a manager user. Only users with the "manager" role can have assigned projects.
Returns error if:
- User doesn't have manager role
- User already manages the project
func (*UserRepository) AddRole ¶ added in v1.2.0
AddRole adds a role to a user's role list. Role string is automatically lowercased. Valid roles: admin, support, manager, api, user
Returns error if:
- User doesn't exist
- Role is invalid
- User already has the role
func (*UserRepository) AddUser ¶ added in v1.2.0
func (r *UserRepository) AddUser(user *schema.User) error
AddUser creates a new user in the database. Passwords are automatically hashed with bcrypt before storage. Auth source determines authentication method (local, LDAP, etc.).
Required fields: Username, Roles Optional fields: Name, Email, Password, Projects, AuthSource
func (*UserRepository) DelUser ¶ added in v1.2.0
func (r *UserRepository) DelUser(username string) error
func (*UserRepository) FetchUserInCtx ¶ added in v1.2.0
func (*UserRepository) GetLdapUsernames ¶ added in v1.2.0
func (r *UserRepository) GetLdapUsernames() ([]string, error)
func (*UserRepository) GetUser ¶ added in v1.2.0
func (r *UserRepository) GetUser(username string) (*schema.User, error)
GetUser retrieves a user by username from the database. Returns the complete user record including hashed password, roles, and projects. Password field contains bcrypt hash for local auth users, empty for LDAP users.
func (*UserRepository) ListUsers ¶ added in v1.2.0
func (r *UserRepository) ListUsers(specialsOnly bool) ([]*schema.User, error)
func (*UserRepository) RemoveProject ¶ added in v1.2.0
func (*UserRepository) RemoveRole ¶ added in v1.2.0
RemoveRole removes a role from a user's role list.
Special rules:
- Cannot remove "manager" role while user has assigned projects
- Must remove all projects first before removing manager role