Documentation
¶
Overview ¶
Package archiver provides asynchronous job archiving functionality for ClusterCockpit.
The archiver runs a background worker goroutine that processes job archiving requests from a buffered channel. When jobs complete, their metric data is archived from the metric store to the configured archive backend (filesystem, S3, etc.).
Architecture ¶
The archiver uses a producer-consumer pattern:
- Producer: TriggerArchiving() sends jobs to archiveChannel
- Consumer: archivingWorker() processes jobs from the channel
- Coordination: sync.WaitGroup tracks pending archive operations
Lifecycle ¶
- Start(repo, ctx) - Initialize worker with context for cancellation
- TriggerArchiving(job) - Queue job for archiving (called when job stops)
- archivingWorker() - Background goroutine processes jobs
- Shutdown(timeout) - Graceful shutdown with timeout
Graceful Shutdown ¶
The archiver supports graceful shutdown with configurable timeout:
- Closes channel to reject new jobs
- Waits for pending jobs to complete (up to timeout)
- Cancels context if timeout exceeded
- Ensures worker goroutine exits cleanly
Example Usage ¶
// Initialize archiver
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
archiver.Start(jobRepository, ctx)
// Trigger archiving when job completes
archiver.TriggerArchiving(job)
// Graceful shutdown with 10 second timeout
if err := archiver.Shutdown(10 * time.Second); err != nil {
log.Printf("Archiver shutdown timeout: %v", err)
}
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ArchiveJob ¶
ArchiveJob archives a completed job's metric data to the configured archive backend.
This function performs the following operations:
- Loads all metric data for the job from the metric data repository
- Calculates job-level statistics (avg, min, max) for each metric
- Stores the job metadata and metric data to the archive backend
Metric data is retrieved at the highest available resolution (typically 60s) for the following scopes:
- Node scope (always)
- Core scope (for jobs with ≤8 nodes, to reduce data volume)
- Accelerator scope (if job used accelerators)
The function respects context cancellation. If ctx is cancelled (e.g., during shutdown timeout), the operation will be interrupted and return an error.
Parameters:
- job: The job to archive (must be a completed job)
- ctx: Context for cancellation and timeout control
Returns:
- *schema.Job with populated Statistics field
- error if data loading or archiving fails
If config.Keys.DisableArchive is true, only job statistics are calculated and returned (no data is written to archive backend).
func Shutdown ¶ added in v1.5.0
Shutdown performs a graceful shutdown of the archiver with a configurable timeout.
The shutdown process:
- Closes archiveChannel - no new jobs will be accepted
- Waits for pending jobs to complete (up to timeout duration)
- If timeout is exceeded: - Cancels shutdownCtx to interrupt ongoing ArchiveJob operations - Returns error indicating timeout
- Waits for worker goroutine to exit cleanly
Parameters:
- timeout: Maximum duration to wait for pending jobs to complete (recommended: 10-30 seconds for production)
Returns:
- nil if all jobs completed within timeout
- error if timeout was exceeded (some jobs may not have been archived)
Jobs that don't complete within the timeout will be marked as failed. The function always ensures the worker goroutine exits before returning.
Example:
if err := archiver.Shutdown(10 * time.Second); err != nil {
log.Printf("Some jobs did not complete: %v", err)
}
func Start ¶
func Start(r *repository.JobRepository, ctx context.Context)
Start initializes the archiver and starts the background worker goroutine.
The archiver processes job archiving requests asynchronously via a buffered channel. Jobs are sent to the channel using TriggerArchiving() and processed by the worker.
Parameters:
- r: JobRepository instance for database operations
- ctx: Context for cancellation (shutdown signal propagation)
The worker goroutine will run until:
- ctx is cancelled (via parent shutdown)
- archiveChannel is closed (via Shutdown())
Must be called before TriggerArchiving(). Safe to call only once.
func TriggerArchiving ¶
TriggerArchiving queues a job for asynchronous archiving.
This function should be called when a job completes (stops) to archive its metric data from the metric store to the configured archive backend.
The function:
- Increments the pending job counter (WaitGroup)
- Sends the job to the archiving channel (buffered, capacity 128)
- Returns immediately (non-blocking unless channel is full)
The actual archiving is performed asynchronously by the worker goroutine. Upon completion, the worker will decrement the pending counter.
Panics if Start() has not been called first.
Types ¶
This section is empty.