compaction

package
v0.0.0-...-706b979 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: AGPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const ManifestBasePath = "_compaction_state"

ManifestBasePath is the base directory for storing compaction manifests

View Source
const ManifestMaxAge = 7 * 24 * time.Hour // 7 days

ManifestMaxAge is the maximum age for manifests before they're considered stale. Manifests older than this are deleted during recovery - they likely indicate a deeper problem that requires investigation.

View Source
const MaxFilesPerBatch = 30

MaxFilesPerBatch is the maximum number of files to process in a single compaction job. DuckDB can segfault/abort when processing too many files in a single read_parquet() call. This limit prevents OOM and crashes on partitions with many large files. With 1M buffer size, files are ~10-14MB each, so 30 files ≈ 300-420MB per batch.

Variables

View Source
var ErrCompactionRoleGated = fmt.Errorf("compaction: node role is not compactor")

ErrCompactionRoleGated is returned by TriggerNow on a node whose role does not have CanCompact (Phase 4). It lets API handlers surface a clear "this node is not the compactor" message instead of a generic 500 or a silent no-op.

View Source
var ErrCycleAlreadyRunning = errors.New("compaction cycle already running")

ErrCycleAlreadyRunning is returned when attempting to start a compaction cycle while one is already in progress

View Source
var ErrNotLeader = errors.New("compaction bridge: not the Raft leader")

ErrNotLeader is returned by a ManifestBridge when the local node is not the Raft leader and therefore cannot append to the cluster manifest. The watcher treats this as a transient retry condition: the completion manifest is kept on disk, no counters are incremented, and the next poll cycle tries again. By the time Raft leadership stabilizes (sub-second in practice) the retry succeeds.

Why a sentinel instead of a generic retryable error: we want the watcher to distinguish leader flap (expected, noisy, don't log at Error) from a real bridge failure (unexpected, should surface loudly). errors.Is against ErrNotLeader is the cheapest way to make that distinction explicit.

Functions

func ClassifySubprocessError

func ClassifySubprocessError(err error, stderr string) (recoverable bool, reason string)

ClassifySubprocessError determines if a subprocess error is recoverable via retry. Returns (recoverable, reason) where reason describes the error type.

Recoverable errors (should retry with smaller batch):

  • Segmentation faults (memory corruption, often from memory pressure)
  • SIGKILL (exit code 137, usually OOM killer)
  • Explicit memory errors in stderr

Non-recoverable errors (should not retry):

  • Permission denied
  • File not found
  • Access denied

Types

type BaseTier

type BaseTier struct {
	StorageBackend storage.Backend
	MinAgeHours    int
	MinFiles       int
	TargetSizeMB   int
	Enabled        bool

	// Metrics
	TotalCompactions    int
	TotalFilesCompacted int
	TotalBytesSaved     int64

	Logger zerolog.Logger
	// contains filtered or unexported fields
}

BaseTier provides common functionality for all compaction tiers

func NewBaseTier

func NewBaseTier(cfg *BaseTierConfig) *BaseTier

NewBaseTier creates a new base tier with common functionality

func (*BaseTier) GetBaseStats

func (t *BaseTier) GetBaseStats(tierName string) map[string]interface{}

GetBaseStats returns base statistics for a tier

func (*BaseTier) IsEnabled

func (t *BaseTier) IsEnabled() bool

IsEnabled returns whether this tier is enabled

func (*BaseTier) ShouldCompactByFileSuffix

func (t *BaseTier) ShouldCompactByFileSuffix(
	files []string,
	compactedSuffix string,
	isUncompactedInput func(string) bool,
) bool

ShouldCompactByFileSuffix determines if compaction is needed based on file classification. This is a shared helper that implements the common compaction decision logic:

  • compactedSuffix: suffix for files already compacted at this tier (e.g., "_compacted.parquet")
  • isUncompactedInput: function to determine if a file is valid uncompacted input for this tier

Returns true if:

  • No compacted files exist AND enough uncompacted input files are present
  • Compacted files exist AND enough new uncompacted input files have accumulated

type BaseTierConfig

type BaseTierConfig struct {
	StorageBackend storage.Backend
	MinAgeHours    int
	MinFiles       int
	TargetSizeMB   int
	Enabled        bool
	Logger         zerolog.Logger
}

BaseTierConfig holds configuration for creating a base tier

type Candidate

type Candidate struct {
	Database      string
	Measurement   string
	PartitionPath string
	Files         []string
	FileCount     int
	Tier          string
	PartitionTime time.Time
	BatchNumber   int // Batch number when candidate is split (0 = not batched or first batch)
	TotalBatches  int // Total number of batches for this partition (0 = not batched)
}

Candidate represents a partition candidate for compaction

func SplitCandidateIntoBatches

func SplitCandidateIntoBatches(c Candidate) []Candidate

SplitCandidateIntoBatches splits a candidate with many files into multiple candidates, each with at most MaxFilesPerBatch files. This prevents DuckDB segfaults when processing thousands of files in a single read_parquet() call.

type ClusterGate

type ClusterGate interface {
	// CanCompact reports whether the local node may run compaction. When
	// the scheduler sees false, it logs a clear "gated" message and stays
	// idle (NOT running, NOT an error).
	CanCompact() bool
	// Role returns the current node's role string for log messages. The
	// scheduler only uses this for human-readable output, never for
	// decision-making — CanCompact is authoritative.
	Role() string
}

ClusterGate is the minimal interface the compaction scheduler needs to decide whether the local node may run compaction. Phase 4 wires this to a closure that checks RoleCompactor against cfg.Cluster.Role; OSS and standalone nodes pass a nil gate (or construct without one) and are allowed unconditionally.

The interface lives here rather than in the cluster package so the scheduler has no compile-time dependency on the cluster package — the gate is a function pointer plus a tiny interface, not an import.

type CompactedFile

type CompactedFile struct {
	Path          string // storage-relative path
	SHA256        string // hex-encoded 64 chars
	SizeBytes     int64
	Database      string
	Measurement   string
	PartitionTime time.Time
	Tier          string
	CreatedAt     time.Time
}

CompactedFile is the cluster-agnostic shape the bridge uses to register a compacted output in the Raft manifest. It intentionally excludes raft.FileEntry.LSN (the FSM stamps it at apply time) and raft.FileEntry.OriginNodeID (the bridge sets it to the local node ID because the compactor IS the origin from Phase 2/3's perspective).

The bridge converts CompactedFile to raft.FileEntry inside the cluster package, keeping the compaction package free of any raft.* imports.

type CompactedOutput

type CompactedOutput struct {
	Path          string    `json:"path"`           // storage-relative path (e.g. "mydb/cpu/2026/04/11/14/compacted_....parquet")
	SHA256        string    `json:"sha256"`         // hex-encoded 64 chars; bridged into raft.FileEntry.SHA256
	SizeBytes     int64     `json:"size_bytes"`     // authoritative size after upload
	Database      string    `json:"database"`       // bridged into raft.FileEntry.Database
	Measurement   string    `json:"measurement"`    // bridged into raft.FileEntry.Measurement
	PartitionTime time.Time `json:"partition_time"` // bridged into raft.FileEntry.PartitionTime
	Tier          string    `json:"tier"`           // "hot", "cold", etc.
	CreatedAt     time.Time `json:"created_at"`     // bridged into raft.FileEntry.CreatedAt
}

CompactedOutput describes a single compacted file produced by a job. All fields are required when the enclosing manifest is in state output_written or sources_deleted — the watcher converts this struct to a raft.FileEntry via the cluster-side bridge.

Outputs is a slice on the manifest (not a single value) because a future compaction strategy (Phase 5+) may split a partition into multiple outputs. Phase 4 always populates exactly one entry per manifest.

type CompletionManifest

type CompletionManifest struct {
	JobID          string            `json:"job_id"`          // matches Job.JobID; unique per attempt
	Database       string            `json:"database"`        // for operator debugging; Outputs[].Database is authoritative
	Measurement    string            `json:"measurement"`     // for operator debugging
	PartitionPath  string            `json:"partition_path"`  // for operator debugging
	Tier           string            `json:"tier"`            // for operator debugging
	State          CompletionState   `json:"state"`           // lifecycle phase; drives watcher behavior
	Outputs        []CompactedOutput `json:"outputs"`         // set when State >= output_written
	DeletedSources []string          `json:"deleted_sources"` // set when State == sources_deleted; storage-relative paths
	CreatedAt      time.Time         `json:"created_at"`      // when the manifest was first written
	UpdatedAt      time.Time         `json:"updated_at"`      // bumped on every state transition
}

CompletionManifest is the durable handoff from the compaction subprocess to the parent-side watcher. One file per job. Written atomically via tmp-file-plus-rename; never appended to.

The watcher treats CompletionManifest as append-only by convention: the subprocess only rewrites it at state transitions, and the watcher only reads it. If the watcher picks up a manifest mid-transition (possible if the subprocess is slow between json.Marshal and os.Rename), the read fails cleanly because the final path only exists after the rename completes — readers see either the pre-state or the post-state, never a torn value.

type CompletionState

type CompletionState string

CompletionState is the lifecycle phase of a CompletionManifest. The subprocess advances the state in place as it progresses through the job; the watcher applies manifests whose state is output_written or later.

const (
	// CompletionStateWritingOutput means the subprocess has started the job
	// but has not yet successfully written the compacted file to storage.
	// Manifests in this state are NOT picked up by the watcher — they may
	// be stale (subprocess crashed) and will be cleaned up separately.
	CompletionStateWritingOutput CompletionState = "writing_output"

	// CompletionStateOutputWritten means the compacted file has been
	// durably uploaded to storage and its SHA-256 is known. Source files
	// have NOT yet been deleted. The watcher applies RegisterFile in this
	// state, but does NOT yet apply DeleteFile for sources because they
	// may still be present in storage — deleting them from the manifest
	// while they still exist on disk would leave orphans. The watcher
	// waits for the sources_deleted state before issuing DeleteFile.
	CompletionStateOutputWritten CompletionState = "output_written"

	// CompletionStateSourcesDeleted means source files have been deleted
	// from storage. The watcher applies both RegisterFile and DeleteFile
	// in this state and then removes the manifest file.
	CompletionStateSourcesDeleted CompletionState = "sources_deleted"
)

type CompletionWatcher

type CompletionWatcher struct {
	// contains filtered or unexported fields
}

CompletionWatcher polls the completion-manifest directory and applies pending manifests via the ManifestBridge. Run one per compactor node.

Lifecycle:

w := NewCompletionWatcher(cfg)
w.Start(ctx)  // kicks off the poll loop in a background goroutine
...
w.Stop()      // signals stop, waits for the loop to drain one final poll

The watcher is safe to Start/Stop multiple times. It is NOT safe for concurrent Start calls from different goroutines.

func NewCompletionWatcher

func NewCompletionWatcher(cfg CompletionWatcherConfig) (*CompletionWatcher, error)

NewCompletionWatcher constructs a watcher. Returns an error if any required dependency is missing.

func (*CompletionWatcher) Start

func (w *CompletionWatcher) Start(parentCtx context.Context)

Start launches the poll loop in a background goroutine. Safe to call multiple times — second and subsequent calls are no-ops.

func (*CompletionWatcher) Stats

func (w *CompletionWatcher) Stats() map[string]int64

Stats returns a point-in-time snapshot of the watcher's metrics, suitable for /api/v1/cluster/status. The returned map is safe to marshal as JSON directly — all values are int64 or strings.

func (*CompletionWatcher) Stop

func (w *CompletionWatcher) Stop()

Stop signals the poll loop to exit and waits for it to drain. Safe to call multiple times; second and subsequent calls are no-ops. After Stop, the watcher can be Started again with a fresh context if desired.

type CompletionWatcherConfig

type CompletionWatcherConfig struct {
	// Dir is the local-disk directory scanned for pending completion manifests.
	// Matches the compaction Manager's CompletionDir field. Must not be empty.
	Dir string

	// Bridge is the cluster-side manifest appender. Required.
	Bridge ManifestBridge

	// PollInterval is the time between scan cycles. Default: 1s. Smaller
	// values catch up faster but add load; larger values delay visibility
	// of compacted files in the cluster manifest.
	PollInterval time.Duration

	// ApplyTimeout bounds a single bridge call. Default: 5s. The watcher
	// derives a fresh context from its own context with this deadline for
	// each Register/Delete call so a stuck bridge call doesn't pin the
	// poll loop.
	ApplyTimeout time.Duration

	// Logger receives structured log output. Defaults to Nop if zero.
	Logger zerolog.Logger
}

CompletionWatcherConfig bundles the watcher's dependencies and tunables. All fields are required unless explicitly marked optional.

type DailyTier

type DailyTier struct {
	*BaseTier
	SkipFileAgeCheckDays int // Skip file creation time check for partitions older than this
}

DailyTier implements daily compaction (Tier 2) Compacts hourly-compacted files into daily files

func NewDailyTier

func NewDailyTier(cfg *DailyTierConfig) *DailyTier

NewDailyTier creates a new daily compaction tier

func (*DailyTier) FindCandidates

func (t *DailyTier) FindCandidates(ctx context.Context, database, measurement string) ([]Candidate, error)

FindCandidates finds daily partitions that are candidates for compaction

func (*DailyTier) GetPartitionLevel

func (t *DailyTier) GetPartitionLevel() string

GetPartitionLevel returns the partition level

func (*DailyTier) GetStats

func (t *DailyTier) GetStats() map[string]interface{}

GetStats returns tier statistics

func (*DailyTier) GetTierName

func (t *DailyTier) GetTierName() string

GetTierName returns the tier name

func (*DailyTier) IsCompactedFile

func (t *DailyTier) IsCompactedFile(filename string) bool

IsCompactedFile checks if a file is a compacted daily file

func (*DailyTier) ShouldCompact

func (t *DailyTier) ShouldCompact(files []string, partitionTime time.Time) bool

ShouldCompact determines if a day partition should be compacted Daily tier compacts hourly files (7 path parts) into daily files (6 path parts)

type DailyTierConfig

type DailyTierConfig struct {
	StorageBackend       storage.Backend
	MinAgeHours          int  // Don't compact days younger than this (default: 24)
	MinFiles             int  // Only compact days with at least this many files (default: 12)
	SkipFileAgeCheckDays int  // Skip file creation time check for partitions older than this (default: 7)
	TargetSizeMB         int  // Target size for compacted files (default: 2048)
	Enabled              bool // Enable daily compaction (default: true)
	Logger               zerolog.Logger
}

DailyTierConfig holds configuration for daily compaction tier

type DeleteSourceOp

type DeleteSourceOp struct {
	Path   string
	Reason string
}

DeleteSourceOp carries the arguments for a DeleteCompactedSource batch operation. A struct rather than bare string fields keeps the batch call signature extensible without breaking callers.

type HourlyTier

type HourlyTier struct {
	*BaseTier
}

HourlyTier implements hourly compaction (Tier 1) Compacts small files within hourly partitions

func NewHourlyTier

func NewHourlyTier(cfg *HourlyTierConfig) *HourlyTier

NewHourlyTier creates a new hourly compaction tier

func (*HourlyTier) FindCandidates

func (t *HourlyTier) FindCandidates(ctx context.Context, database, measurement string) ([]Candidate, error)

FindCandidates finds hourly partitions that are candidates for compaction

func (*HourlyTier) GetPartitionLevel

func (t *HourlyTier) GetPartitionLevel() string

GetPartitionLevel returns the partition level

func (*HourlyTier) GetStats

func (t *HourlyTier) GetStats() map[string]interface{}

GetStats returns tier statistics

func (*HourlyTier) GetTierName

func (t *HourlyTier) GetTierName() string

GetTierName returns the tier name

func (*HourlyTier) IsCompactedFile

func (t *HourlyTier) IsCompactedFile(filename string) bool

IsCompactedFile checks if a file is a compacted hourly file

func (*HourlyTier) ShouldCompact

func (t *HourlyTier) ShouldCompact(files []string, partitionTime time.Time) bool

ShouldCompact determines if an hourly partition should be compacted

type HourlyTierConfig

type HourlyTierConfig struct {
	StorageBackend storage.Backend
	MinAgeHours    int  // Don't compact partitions younger than this (default: 1)
	MinFiles       int  // Only compact partitions with at least this many files (default: 10)
	TargetSizeMB   int  // Target size for compacted files (default: 512)
	Enabled        bool // Enable hourly compaction (default: true)
	Logger         zerolog.Logger
}

HourlyTierConfig holds configuration for hourly compaction tier

type Job

type Job struct {
	// Configuration
	Measurement    string
	PartitionPath  string
	Files          []string // Original files to be compacted
	StorageBackend storage.Backend
	Database       string
	TargetSizeMB   int
	Tier           string
	TempDirectory  string   // Base temp directory for compaction files
	SortKeys       []string // Sort keys for this measurement (for ORDER BY in compaction)

	// Job metadata
	JobID       string
	StartedAt   *time.Time
	CompletedAt *time.Time
	Status      JobStatus
	Error       error

	// Metrics
	FilesCompacted  int
	BytesBefore     int64
	BytesAfter      int64
	DurationSeconds float64

	// Phase 4: cluster-mode completion manifest. When CompletionDir is
	// non-empty, the job writes a local-disk CompletionManifest at each
	// state transition so the parent-side CompletionWatcher can apply
	// RegisterFile/DeleteFile commands to the Raft manifest. Empty means
	// OSS / standalone mode — no manifest is written and behavior is
	// byte-identical to pre-Phase-4.
	//
	// PartitionTime is the tier-scanner's authoritative timestamp for the
	// partition (see Candidate.PartitionTime in tier.go). Surfaced in the
	// completion manifest so the bridge can set raft.FileEntry.PartitionTime.
	// Only meaningful when clusterMode() is true; zero value in OSS.
	CompletionDir string
	PartitionTime time.Time
	// contains filtered or unexported fields
}

Job represents a single compaction job for a partition

func NewJob

func NewJob(cfg *JobConfig) *Job

NewJob creates a new compaction job

func (*Job) Run

func (j *Job) Run(ctx context.Context) error

Run executes the compaction job

func (*Job) Stats

func (j *Job) Stats() map[string]interface{}

Stats returns job statistics

type JobConfig

type JobConfig struct {
	Measurement     string
	PartitionPath   string
	Files           []string
	StorageBackend  storage.Backend
	Database        string
	TargetSizeMB    int
	Tier            string
	TempDirectory   string   // Base temp directory for compaction files (default: ./data/compaction)
	SortKeys        []string // Sort keys for this measurement (for ORDER BY in compaction)
	Logger          zerolog.Logger
	DB              *sql.DB          // Shared DuckDB connection (avoids memory retention from temp connections)
	ManifestManager *ManifestManager // Manifest manager for crash recovery (optional, recommended)

	// CompletionDir is the Phase 4 cluster-mode completion-manifest directory.
	// When non-empty, the job writes a local-disk CompletionManifest at each
	// state transition. Empty means OSS / standalone — no completion manifest
	// is written and the job is byte-compatible with pre-Phase-4 behavior.
	CompletionDir string

	// JobID, if set, overrides the auto-generated JobID. Phase 4 uses this
	// so the parent (which spawns the subprocess) and the subprocess agree
	// on the completion-manifest filename. Empty means auto-generate.
	JobID string

	// PartitionTime is the authoritative partition timestamp from the tier
	// scanner. Threaded through so the completion manifest includes it for
	// the Raft FileEntry. Zero value in OSS.
	PartitionTime time.Time
}

JobConfig holds configuration for creating a compaction job

type JobStatus

type JobStatus string

JobStatus represents the status of a compaction job

const (
	JobStatusPending   JobStatus = "pending"
	JobStatusRunning   JobStatus = "running"
	JobStatusCompleted JobStatus = "completed"
	JobStatusFailed    JobStatus = "failed"
)

type LockManager

type LockManager struct {
	// contains filtered or unexported fields
}

LockManager manages locks for compaction partitions

func NewLockManager

func NewLockManager() *LockManager

NewLockManager creates a new lock manager

func (*LockManager) AcquireLock

func (l *LockManager) AcquireLock(key string) bool

AcquireLock attempts to acquire a lock for a partition

func (*LockManager) IsLocked

func (l *LockManager) IsLocked(key string) bool

IsLocked checks if a partition is locked

func (*LockManager) ReleaseLock

func (l *LockManager) ReleaseLock(key string)

ReleaseLock releases a lock for a partition

type Manager

type Manager struct {
	StorageBackend  storage.Backend
	LockManager     *LockManager
	ManifestManager *ManifestManager

	// Configuration
	MinAgeHours   int
	MinFiles      int
	TargetSizeMB  int
	MaxConcurrent int
	TempDirectory string // Temp directory for compaction files
	MemoryLimit   string // DuckDB memory limit for subprocess (e.g., "8GB")
	// Phase 4: local-disk directory where compaction subprocesses write
	// completion manifests for the parent-side CompletionWatcher to pick
	// up. Empty means "OSS mode, no completion-manifest handoff". Set by
	// the main wiring to filepath.Join(TempDirectory, ".completion", "pending")
	// when clustering + replication are enabled.
	CompletionDir string

	// Sort key configuration (from ingest config)
	SortKeysConfig  map[string][]string // measurement -> sort keys
	DefaultSortKeys []string            // default sort keys

	// Tiers
	Tiers []Tier

	// Metrics
	TotalJobsCompleted  int
	TotalJobsFailed     int
	TotalFilesCompacted int
	TotalBytesSaved     int64
	TotalManifestsRecov int // Number of manifests recovered
	// contains filtered or unexported fields
}

Manager orchestrates compaction jobs across all measurements

func NewManager

func NewManager(cfg *ManagerConfig) *Manager

NewManager creates a new compaction manager

func (*Manager) CleanupOrphanedCompletionManifests

func (m *Manager) CleanupOrphanedCompletionManifests(orphanTimeout time.Duration) error

CleanupOrphanedCompletionManifests sweeps the Phase 4 completion-manifest directory for entries in state "writing_output" that are older than orphanTimeout. These represent compaction subprocesses that started writing a manifest but crashed before advancing to output_written — the watcher will never pick them up (they're still in the initial state), and leaving them on disk would cause confusing "stuck in writing_output" log noise on every startup.

Manifests in state output_written or sources_deleted are left alone no matter how old they are — those are valid pending work that the watcher needs to process, and aging them out would silently drop Raft manifest updates the cluster needs.

A zero or negative orphanTimeout means "use 10 minutes" so tests and production agree on a safe default unless the operator explicitly tunes it.

Safe to call multiple times. Safe when CompletionDir is empty (no-op).

func (*Manager) CleanupOrphanedTempDirs

func (m *Manager) CleanupOrphanedTempDirs() error

CleanupOrphanedTempDirs removes orphaned temp directories from previous runs. This handles cleanup after pod crashes where the defer cleanup didn't run. Call this on startup before running compaction cycles.

Phase 4: skips reserved subdirectories (see reservedTempSubdirs). Those are managed by their own cleanup paths (e.g. CleanupOrphanedCompletionManifests) and must not be treated as stale temp dirs.

func (*Manager) CompactPartition

func (m *Manager) CompactPartition(ctx context.Context, candidate Candidate) error

CompactPartition compacts a single partition using subprocess isolation. Running compaction in a subprocess ensures that DuckDB's jemalloc memory is fully released when the subprocess exits, preventing memory retention.

func (*Manager) FindCandidates

func (m *Manager) FindCandidates(ctx context.Context) ([]Candidate, error)

FindCandidates finds partitions that are candidates for compaction across all databases

func (*Manager) GetCurrentCycleID

func (m *Manager) GetCurrentCycleID() int64

GetCurrentCycleID returns the current or most recent cycle ID

func (*Manager) GetSortKeys

func (m *Manager) GetSortKeys(measurement string) []string

GetSortKeys returns sort keys for a measurement. Checks measurement-specific config first, then falls back to default.

func (*Manager) IsCycleRunning

func (m *Manager) IsCycleRunning() bool

IsCycleRunning returns true if a compaction cycle is currently in progress

func (*Manager) RunCompactionCycle

func (m *Manager) RunCompactionCycle(ctx context.Context) (int64, error)

RunCompactionCycle runs one compaction cycle for all enabled tiers. Returns the cycle ID and an error if the cycle couldn't be started. Returns ErrCycleAlreadyRunning if a cycle is already in progress.

func (*Manager) RunCompactionCycleForDatabase

func (m *Manager) RunCompactionCycleForDatabase(ctx context.Context, database string, tierNames []string) (int64, error)

RunCompactionCycleForDatabase runs a compaction cycle for a single database. tierNames must be non-empty - specify which tiers to run explicitly.

func (*Manager) RunCompactionCycleForTiers

func (m *Manager) RunCompactionCycleForTiers(ctx context.Context, tierNames []string) (int64, error)

RunCompactionCycleForTiers runs a complete compaction cycle for specific tiers across all databases. tierNames must be non-empty - specify which tiers to run explicitly.

func (*Manager) SetOnCompactionComplete

func (m *Manager) SetOnCompactionComplete(fn func())

SetOnCompactionComplete sets the callback invoked after each successful compaction job. This is used to invalidate DuckDB and query caches in the parent process after the compaction subprocess deletes old parquet files.

func (*Manager) Stats

func (m *Manager) Stats() map[string]interface{}

Stats returns compaction statistics

type ManagerConfig

type ManagerConfig struct {
	StorageBackend  storage.Backend
	LockManager     *LockManager
	MinAgeHours     int
	MinFiles        int
	TargetSizeMB    int
	MaxConcurrent   int
	TempDirectory   string              // Temp directory for compaction files
	MemoryLimit     string              // DuckDB memory limit for subprocess (e.g., "8GB")
	CompletionDir   string              // Phase 4: local-disk completion-manifest dir (empty = OSS mode)
	SortKeysConfig  map[string][]string // Per-measurement sort keys from ingest config
	DefaultSortKeys []string            // Default sort keys from ingest config
	Tiers           []Tier
	Logger          zerolog.Logger
}

ManagerConfig holds configuration for creating a compaction manager

type Manifest

type Manifest struct {
	// Output file information
	OutputPath string `json:"output_path"` // Full storage path of compacted file
	OutputSize int64  `json:"output_size"` // Expected size of output file (for validation)

	// Input files that were compacted
	InputFiles []string `json:"input_files"`

	// Metadata
	Database      string         `json:"database"`
	Measurement   string         `json:"measurement"`
	PartitionPath string         `json:"partition_path"`
	Tier          string         `json:"tier"`
	Status        ManifestStatus `json:"status"`
	CreatedAt     time.Time      `json:"created_at"`
	JobID         string         `json:"job_id"`
}

Manifest tracks the state of a compaction operation for crash recovery. If a pod crashes after uploading the compacted file but before deleting source files, the manifest allows recovery to complete the deletion.

type ManifestBridge

type ManifestBridge interface {
	RegisterCompactedFile(ctx context.Context, file CompactedFile) error
	DeleteCompactedSource(ctx context.Context, path, reason string) error
	// BatchFileOps groups all register and delete operations for one manifest
	// into a single Raft log entry. The watcher always calls this in applyOne
	// instead of the individual methods — O(1) Raft applies per manifest
	// regardless of file count.
	BatchFileOps(ctx context.Context, registers []CompactedFile, deletes []DeleteSourceOp) error
}

ManifestBridge is the narrow interface the compaction package imports from the cluster package. It abstracts "append to the cluster manifest" without pulling in the Raft types or the coordinator struct directly.

Both methods MUST return ErrNotLeader (via %w wrapping) when the local node is not the Raft leader — the watcher relies on errors.Is to keep the manifest on disk for retry. Any other error is treated as a hard failure and the manifest is kept for inspection but the failure is logged at Error.

Context is accepted so the watcher can bound each apply attempt by its poll interval; implementations typically pass it through to the Raft Apply call's timeout budget.

type ManifestManager

type ManifestManager struct {
	// contains filtered or unexported fields
}

ManifestManager handles reading, writing, and recovering from compaction manifests

func NewManifestManager

func NewManifestManager(backend storage.Backend, logger zerolog.Logger) *ManifestManager

NewManifestManager creates a new manifest manager

func (*ManifestManager) DeleteManifest

func (m *ManifestManager) DeleteManifest(ctx context.Context, manifestPath string) error

DeleteManifest removes a manifest from storage

func (*ManifestManager) GenerateManifestPath

func (m *ManifestManager) GenerateManifestPath(tier, database, partitionPath, jobID string) string

GenerateManifestPath generates a unique manifest path for a compaction job

func (*ManifestManager) GetFilesInManifests

func (m *ManifestManager) GetFilesInManifests(ctx context.Context) (map[string]struct{}, error)

GetFilesInManifests returns a set of all input files currently tracked by manifests. This is used to exclude files from compaction candidate scans.

func (*ManifestManager) IsFileInManifest

func (m *ManifestManager) IsFileInManifest(ctx context.Context, filePath string) (bool, error)

IsFileInManifest checks if a file is tracked by any manifest

func (*ManifestManager) ListManifests

func (m *ManifestManager) ListManifests(ctx context.Context) ([]string, error)

ListManifests lists all manifest files in storage

func (*ManifestManager) ReadManifest

func (m *ManifestManager) ReadManifest(ctx context.Context, manifestPath string) (*Manifest, error)

ReadManifest reads a manifest from storage

func (*ManifestManager) RecoverOrphanedManifests

func (m *ManifestManager) RecoverOrphanedManifests(ctx context.Context) (int, error)

RecoverOrphanedManifests finds and processes orphaned manifests from interrupted compactions. Returns the number of manifests recovered and any error encountered.

func (*ManifestManager) WriteManifest

func (m *ManifestManager) WriteManifest(ctx context.Context, manifest *Manifest) (string, error)

WriteManifest writes a manifest to storage

type ManifestStatus

type ManifestStatus string

ManifestStatus represents the state of a compaction manifest

const (
	// ManifestStatusPending indicates the compaction is in progress
	ManifestStatusPending ManifestStatus = "pending"
)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler schedules compaction jobs using cron-style schedules

func NewScheduler

func NewScheduler(cfg *SchedulerConfig) (*Scheduler, error)

NewScheduler creates a new compaction scheduler

func (*Scheduler) IsEnabled

func (s *Scheduler) IsEnabled() bool

IsEnabled returns whether the scheduler is enabled

func (*Scheduler) IsRunning

func (s *Scheduler) IsRunning() bool

IsRunning returns whether the scheduler is running

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start starts the compaction scheduler

func (*Scheduler) Status

func (s *Scheduler) Status() map[string]interface{}

Status returns scheduler status

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop stops the compaction scheduler

func (*Scheduler) TriggerNow

func (s *Scheduler) TriggerNow(ctx context.Context) (int64, error)

TriggerNow triggers compaction immediately (manual trigger) Returns the cycle ID and any error that occurred

type SchedulerConfig

type SchedulerConfig struct {
	Manager     *Manager
	Schedule    string      // Cron schedule string (e.g., "5 * * * *" for every hour at :05)
	TierNames   []string    // Specific tiers to process (must be non-empty and enabled)
	Enabled     bool        // Enable automatic scheduling
	ClusterGate ClusterGate // Optional Phase 4 role check; nil means "no gate, allow"
	Logger      zerolog.Logger
}

SchedulerConfig holds configuration for creating a compaction scheduler

type SubprocessJobConfig

type SubprocessJobConfig struct {
	Database      string   `json:"database"`
	Measurement   string   `json:"measurement"`
	PartitionPath string   `json:"partition_path"`
	Files         []string `json:"files"`
	Tier          string   `json:"tier"`
	TargetSizeMB  int      `json:"target_size_mb"`
	TempDirectory string   `json:"temp_directory"`
	SortKeys      []string `json:"sort_keys"`    // Sort keys for ORDER BY in compaction
	MemoryLimit   string   `json:"memory_limit"` // DuckDB memory limit (e.g., "8GB")

	// Storage configuration
	StorageType   string `json:"storage_type"`   // "local" or "s3"
	StorageConfig string `json:"storage_config"` // JSON-encoded storage-specific config

	// Phase 4: cluster-mode completion-manifest handoff. When CompletionDir
	// is non-empty, the job writes a local-disk CompletionManifest that the
	// parent-side CompletionWatcher picks up and applies to the Raft manifest.
	// PartitionTime is the tier-scanner's authoritative timestamp for the
	// partition — bridged into raft.FileEntry.PartitionTime so peers know
	// which hour/day this compacted file belongs to.
	// JobID is generated by the parent (not NewJob) so both sides agree on
	// the completion-manifest filename. Empty values disable the feature.
	JobID         string    `json:"job_id,omitempty"`
	CompletionDir string    `json:"completion_dir,omitempty"`
	PartitionTime time.Time `json:"partition_time,omitempty"`
}

SubprocessJobConfig is the serializable configuration passed to the subprocess. It contains all information needed to run a compaction job in isolation.

type SubprocessJobResult

type SubprocessJobResult struct {
	Success        bool   `json:"success"`
	Error          string `json:"error,omitempty"`
	FilesCompacted int    `json:"files_compacted"`
	BytesBefore    int64  `json:"bytes_before"`
	BytesAfter     int64  `json:"bytes_after"`
	OutputFile     string `json:"output_file,omitempty"`
}

SubprocessJobResult is returned by the subprocess via stdout as JSON. It contains the outcome of the compaction job.

func RunJobInSubprocess

func RunJobInSubprocess(ctx context.Context, config *SubprocessJobConfig, logger zerolog.Logger, extraEnv ...string) (*SubprocessJobResult, error)

RunJobInSubprocess executes compaction in a subprocess for memory isolation. The subprocess runs the same binary with the "compact" subcommand. When the subprocess exits, all DuckDB memory (including jemalloc arenas) is released. extraEnv can be used to pass additional environment variables (e.g., credentials).

func RunSubprocessJob

func RunSubprocessJob(config *SubprocessJobConfig) (*SubprocessJobResult, error)

RunSubprocessJob is called from the subprocess to execute compaction. It creates a new DuckDB connection, runs the job, and returns the result. When this function returns and the subprocess exits, all DuckDB memory is released.

Memory profiling: Set ARC_COMPACTION_PROFILE=1 to enable heap profiling. Profiles are written to /tmp/arc_compaction_heap_before.pprof and /tmp/arc_compaction_heap_after.pprof

type Tier

type Tier interface {
	// GetTierName returns the human-readable tier name (e.g., 'daily', 'weekly', 'monthly')
	GetTierName() string

	// GetPartitionLevel returns the partition level for this tier (e.g., 'day', 'week', 'month')
	GetPartitionLevel() string

	// FindCandidates finds partitions that are candidates for compaction at this tier level
	FindCandidates(ctx context.Context, database, measurement string) ([]Candidate, error)

	// ShouldCompact determines if a partition should be compacted based on tier-specific criteria
	ShouldCompact(files []string, partitionTime time.Time) bool

	// IsCompactedFile checks if a file is already a compacted file from this tier
	IsCompactedFile(filename string) bool

	// IsEnabled returns whether this tier is enabled
	IsEnabled() bool

	// GetStats returns tier statistics
	GetStats() map[string]interface{}
}

Tier defines the interface for compaction tiers (hourly, daily, weekly, monthly)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL