archiver

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 11 Imported by: 0

README

Archiver Package

The archiver package provides asynchronous job archiving functionality for ClusterCockpit. When jobs complete, their metric data is archived from the metric store to a persistent archive backend (filesystem, S3, SQLite, etc.).

Architecture

Producer-Consumer Pattern
┌──────────────┐     TriggerArchiving()      ┌───────────────┐
│  API Handler │  ───────────────────────▶   │ archiveChannel│
│ (Job Stop)   │                             │  (buffer: 128)│
└──────────────┘                             └───────┬───────┘
                                                     │
                   ┌─────────────────────────────────┘
                   │
                   ▼
         ┌──────────────────────┐
         │  archivingWorker()   │
         │   (goroutine)        │
         └──────────┬───────────┘
                    │
                    ▼
         1. Fetch job metadata
         2. Load metric data
         3. Calculate statistics
         4. Archive to backend
         5. Update database
         6. Call hooks
Components
  • archiveChannel: Buffered channel (128 jobs) for async communication
  • archivePending: WaitGroup tracking in-flight archiving operations
  • archivingWorker: Background goroutine processing archiving requests
  • shutdownCtx: Context for graceful cancellation during shutdown

Usage

Initialization
// Start archiver with context for shutdown control
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

archiver.Start(jobRepository, ctx)
Archiving a Job
// Called automatically when a job completes
archiver.TriggerArchiving(job)

The function returns immediately. Actual archiving happens in the background.

Graceful Shutdown
// Shutdown with 10 second timeout
if err := archiver.Shutdown(10 * time.Second); err != nil {
    log.Printf("Archiver shutdown timeout: %v", err)
}

Shutdown process:

  1. Closes channel (rejects new jobs)
  2. Waits for pending jobs (up to timeout)
  3. Cancels context if timeout exceeded
  4. Waits for worker to exit cleanly

Configuration

Channel Buffer Size

The archiving channel has a buffer of 128 jobs. If more than 128 jobs are queued simultaneously, TriggerArchiving() will block until space is available.

To adjust:

// In archiveWorker.go Start() function
archiveChannel = make(chan *schema.Job, 256) // Increase buffer
Scope Selection

Archive data scopes are automatically selected based on job size:

  • Node scope: Always included
  • Core scope: Included for jobs with ≤8 nodes (reduces data volume for large jobs)
  • Accelerator scope: Included if job used accelerators (NumAcc > 0)

To adjust the node threshold:

// In archiver.go ArchiveJob() function
if job.NumNodes <= 16 { // Change from 8 to 16
    scopes = append(scopes, schema.MetricScopeCore)
}
Resolution

Data is archived at the highest available resolution (typically 60s intervals). To change:

// In archiver.go ArchiveJob() function
jobData, err := metricdispatch.LoadData(job, allMetrics, scopes, ctx, 300)
// 0 = highest resolution
// 300 = 5-minute resolution

Error Handling

Automatic Retry

The archiver does not automatically retry failed archiving operations. If archiving fails:

  1. Error is logged
  2. Job is marked as MonitoringStatusArchivingFailed in database
  3. Worker continues processing other jobs
Manual Retry

To re-archive failed jobs, query for jobs with MonitoringStatusArchivingFailed and call TriggerArchiving() again.

Performance Considerations

Single Worker Thread

The archiver uses a single worker goroutine. For high-throughput systems:

  • Large channel buffer (128) prevents blocking
  • Archiving is typically I/O bound (writing to storage)
  • Single worker prevents overwhelming storage backend
Shutdown Timeout

Recommended timeout values:

  • Development: 5-10 seconds
  • Production: 10-30 seconds
  • High-load: 30-60 seconds

Choose based on:

  • Average archiving time per job
  • Storage backend latency
  • Acceptable shutdown delay

Monitoring

Logging

The archiver logs:

  • Info: Startup, shutdown, successful completions
  • Debug: Individual job archiving times
  • Error: Archiving failures with job ID and reason
  • Warn: Shutdown timeout exceeded
Metrics

Monitor these signals for archiver health:

  • Jobs with MonitoringStatusArchivingFailed
  • Time from job stop to successful archive
  • Shutdown timeout occurrences

Thread Safety

All exported functions are safe for concurrent use:

  • Start() - Safe to call once
  • TriggerArchiving() - Safe from multiple goroutines
  • Shutdown() - Safe to call once

Internal state is protected by:

  • Channel synchronization (archiveChannel)
  • WaitGroup for pending count (archivePending)
  • Context for cancellation (shutdownCtx)

Files

  • archiveWorker.go: Worker lifecycle, channel management, shutdown logic
  • archiver.go: Core archiving logic, metric loading, statistics calculation

Dependencies

  • internal/repository: Database operations for job metadata
  • internal/metricdispatch: Loading metric data from various backends
  • pkg/archive: Archive backend abstraction (filesystem, S3, SQLite)
  • cc-lib/schema: Job and metric data structures

Documentation

Overview

Package archiver provides asynchronous job archiving functionality for ClusterCockpit.

The archiver runs a background worker goroutine that processes job archiving requests from a buffered channel. When jobs complete, their metric data is archived from the metric store to the configured archive backend (filesystem, S3, etc.).

Architecture

The archiver uses a producer-consumer pattern:

  • Producer: TriggerArchiving() sends jobs to archiveChannel
  • Consumer: archivingWorker() processes jobs from the channel
  • Coordination: sync.WaitGroup tracks pending archive operations

Lifecycle

  1. Start(repo, ctx) - Initialize worker with context for cancellation
  2. TriggerArchiving(job) - Queue job for archiving (called when job stops)
  3. archivingWorker() - Background goroutine processes jobs
  4. Shutdown(timeout) - Graceful shutdown with timeout

Graceful Shutdown

The archiver supports graceful shutdown with configurable timeout:

  • Closes channel to reject new jobs
  • Waits for pending jobs to complete (up to timeout)
  • Cancels context if timeout exceeded
  • Ensures worker goroutine exits cleanly

Example Usage

// Initialize archiver
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
archiver.Start(jobRepository, ctx)

// Trigger archiving when job completes
archiver.TriggerArchiving(job)

// Graceful shutdown with 10 second timeout
if err := archiver.Shutdown(10 * time.Second); err != nil {
    log.Printf("Archiver shutdown timeout: %v", err)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ArchiveJob

func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error)

ArchiveJob archives a completed job's metric data to the configured archive backend.

This function performs the following operations:

  1. Loads all metric data for the job from the metric data repository
  2. Calculates job-level statistics (avg, min, max) for each metric
  3. Stores the job metadata and metric data to the archive backend

Metric data is retrieved at the highest available resolution (typically 60s) for the following scopes:

  • Node scope (always)
  • Core scope (for jobs with ≤8 nodes, to reduce data volume)
  • Accelerator scope (if job used accelerators)

The function respects context cancellation. If ctx is cancelled (e.g., during shutdown timeout), the operation will be interrupted and return an error.

Parameters:

  • job: The job to archive (must be a completed job)
  • ctx: Context for cancellation and timeout control

Returns:

  • *schema.Job with populated Statistics field
  • error if data loading or archiving fails

If config.Keys.DisableArchive is true, only job statistics are calculated and returned (no data is written to archive backend).

func Shutdown added in v1.5.0

func Shutdown(timeout time.Duration) error

Shutdown performs a graceful shutdown of the archiver with a configurable timeout.

The shutdown process:

  1. Closes archiveChannel - no new jobs will be accepted
  2. Waits for pending jobs to complete (up to timeout duration)
  3. If timeout is exceeded: - Cancels shutdownCtx to interrupt ongoing ArchiveJob operations - Returns error indicating timeout
  4. Waits for worker goroutine to exit cleanly

Parameters:

  • timeout: Maximum duration to wait for pending jobs to complete (recommended: 10-30 seconds for production)

Returns:

  • nil if all jobs completed within timeout
  • error if timeout was exceeded (some jobs may not have been archived)

Jobs that don't complete within the timeout will be marked as failed. The function always ensures the worker goroutine exits before returning.

Example:

if err := archiver.Shutdown(10 * time.Second); err != nil {
    log.Printf("Some jobs did not complete: %v", err)
}

func Start

func Start(r *repository.JobRepository, ctx context.Context)

Start initializes the archiver and starts the background worker goroutine.

The archiver processes job archiving requests asynchronously via a buffered channel. Jobs are sent to the channel using TriggerArchiving() and processed by the worker.

Parameters:

  • r: JobRepository instance for database operations
  • ctx: Context for cancellation (shutdown signal propagation)

The worker goroutine will run until:

  • ctx is cancelled (via parent shutdown)
  • archiveChannel is closed (via Shutdown())

Must be called before TriggerArchiving(). Safe to call only once.

func TriggerArchiving

func TriggerArchiving(job *schema.Job)

TriggerArchiving queues a job for asynchronous archiving.

This function should be called when a job completes (stops) to archive its metric data from the metric store to the configured archive backend.

The function:

  1. Increments the pending job counter (WaitGroup)
  2. Sends the job to the archiving channel (buffered, capacity 128)
  3. Returns immediately (non-blocking unless channel is full)

The actual archiving is performed asynchronously by the worker goroutine. Upon completion, the worker will decrement the pending counter.

Panics if Start() has not been called first.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL