Documentation
¶
Index ¶
- Constants
- Variables
- func ClassifySubprocessError(err error, stderr string) (recoverable bool, reason string)
- type BaseTier
- type BaseTierConfig
- type Candidate
- type ClusterGate
- type CompactedFile
- type CompactedOutput
- type CompletionManifest
- type CompletionState
- type CompletionWatcher
- type CompletionWatcherConfig
- type DailyTier
- func (t *DailyTier) FindCandidates(ctx context.Context, database, measurement string) ([]Candidate, error)
- func (t *DailyTier) GetPartitionLevel() string
- func (t *DailyTier) GetStats() map[string]interface{}
- func (t *DailyTier) GetTierName() string
- func (t *DailyTier) IsCompactedFile(filename string) bool
- func (t *DailyTier) ShouldCompact(files []string, partitionTime time.Time) bool
- type DailyTierConfig
- type DeleteSourceOp
- type HourlyTier
- func (t *HourlyTier) FindCandidates(ctx context.Context, database, measurement string) ([]Candidate, error)
- func (t *HourlyTier) GetPartitionLevel() string
- func (t *HourlyTier) GetStats() map[string]interface{}
- func (t *HourlyTier) GetTierName() string
- func (t *HourlyTier) IsCompactedFile(filename string) bool
- func (t *HourlyTier) ShouldCompact(files []string, partitionTime time.Time) bool
- type HourlyTierConfig
- type Job
- type JobConfig
- type JobStatus
- type LockManager
- type Manager
- func (m *Manager) CleanupOrphanedCompletionManifests(orphanTimeout time.Duration) error
- func (m *Manager) CleanupOrphanedTempDirs() error
- func (m *Manager) CompactPartition(ctx context.Context, candidate Candidate) error
- func (m *Manager) FindCandidates(ctx context.Context) ([]Candidate, error)
- func (m *Manager) GetCurrentCycleID() int64
- func (m *Manager) GetSortKeys(measurement string) []string
- func (m *Manager) IsCycleRunning() bool
- func (m *Manager) RunCompactionCycle(ctx context.Context) (int64, error)
- func (m *Manager) RunCompactionCycleForDatabase(ctx context.Context, database string, tierNames []string) (int64, error)
- func (m *Manager) RunCompactionCycleForTiers(ctx context.Context, tierNames []string) (int64, error)
- func (m *Manager) SetOnCompactionComplete(fn func())
- func (m *Manager) Stats() map[string]interface{}
- type ManagerConfig
- type Manifest
- type ManifestBridge
- type ManifestManager
- func (m *ManifestManager) DeleteManifest(ctx context.Context, manifestPath string) error
- func (m *ManifestManager) GenerateManifestPath(tier, database, partitionPath, jobID string) string
- func (m *ManifestManager) GetFilesInManifests(ctx context.Context) (map[string]struct{}, error)
- func (m *ManifestManager) IsFileInManifest(ctx context.Context, filePath string) (bool, error)
- func (m *ManifestManager) ListManifests(ctx context.Context) ([]string, error)
- func (m *ManifestManager) ReadManifest(ctx context.Context, manifestPath string) (*Manifest, error)
- func (m *ManifestManager) RecoverOrphanedManifests(ctx context.Context) (int, error)
- func (m *ManifestManager) WriteManifest(ctx context.Context, manifest *Manifest) (string, error)
- type ManifestStatus
- type Scheduler
- type SchedulerConfig
- type SubprocessJobConfig
- type SubprocessJobResult
- type Tier
Constants ¶
const ManifestBasePath = "_compaction_state"
ManifestBasePath is the base directory for storing compaction manifests
const ManifestMaxAge = 7 * 24 * time.Hour // 7 days
ManifestMaxAge is the maximum age for manifests before they're considered stale. Manifests older than this are deleted during recovery - they likely indicate a deeper problem that requires investigation.
const MaxFilesPerBatch = 30
MaxFilesPerBatch is the maximum number of files to process in a single compaction job. DuckDB can segfault/abort when processing too many files in a single read_parquet() call. This limit prevents OOM and crashes on partitions with many large files. With 1M buffer size, files are ~10-14MB each, so 30 files ≈ 300-420MB per batch.
Variables ¶
var ErrCompactionRoleGated = fmt.Errorf("compaction: node role is not compactor")
ErrCompactionRoleGated is returned by TriggerNow on a node whose role does not have CanCompact (Phase 4). It lets API handlers surface a clear "this node is not the compactor" message instead of a generic 500 or a silent no-op.
var ErrCycleAlreadyRunning = errors.New("compaction cycle already running")
ErrCycleAlreadyRunning is returned when attempting to start a compaction cycle while one is already in progress
var ErrNotLeader = errors.New("compaction bridge: not the Raft leader")
ErrNotLeader is returned by a ManifestBridge when the local node is not the Raft leader and therefore cannot append to the cluster manifest. The watcher treats this as a transient retry condition: the completion manifest is kept on disk, no counters are incremented, and the next poll cycle tries again. By the time Raft leadership stabilizes (sub-second in practice) the retry succeeds.
Why a sentinel instead of a generic retryable error: we want the watcher to distinguish leader flap (expected, noisy, don't log at Error) from a real bridge failure (unexpected, should surface loudly). errors.Is against ErrNotLeader is the cheapest way to make that distinction explicit.
Functions ¶
func ClassifySubprocessError ¶
ClassifySubprocessError determines if a subprocess error is recoverable via retry. Returns (recoverable, reason) where reason describes the error type.
Recoverable errors (should retry with smaller batch):
- Segmentation faults (memory corruption, often from memory pressure)
- SIGKILL (exit code 137, usually OOM killer)
- Explicit memory errors in stderr
Non-recoverable errors (should not retry):
- Permission denied
- File not found
- Access denied
Types ¶
type BaseTier ¶
type BaseTier struct {
StorageBackend storage.Backend
MinAgeHours int
MinFiles int
TargetSizeMB int
Enabled bool
// Metrics
TotalCompactions int
TotalFilesCompacted int
TotalBytesSaved int64
Logger zerolog.Logger
// contains filtered or unexported fields
}
BaseTier provides common functionality for all compaction tiers
func NewBaseTier ¶
func NewBaseTier(cfg *BaseTierConfig) *BaseTier
NewBaseTier creates a new base tier with common functionality
func (*BaseTier) GetBaseStats ¶
GetBaseStats returns base statistics for a tier
func (*BaseTier) ShouldCompactByFileSuffix ¶
func (t *BaseTier) ShouldCompactByFileSuffix( files []string, compactedSuffix string, isUncompactedInput func(string) bool, ) bool
ShouldCompactByFileSuffix determines if compaction is needed based on file classification. This is a shared helper that implements the common compaction decision logic:
- compactedSuffix: suffix for files already compacted at this tier (e.g., "_compacted.parquet")
- isUncompactedInput: function to determine if a file is valid uncompacted input for this tier
Returns true if:
- No compacted files exist AND enough uncompacted input files are present
- Compacted files exist AND enough new uncompacted input files have accumulated
type BaseTierConfig ¶
type BaseTierConfig struct {
StorageBackend storage.Backend
MinAgeHours int
MinFiles int
TargetSizeMB int
Enabled bool
Logger zerolog.Logger
}
BaseTierConfig holds configuration for creating a base tier
type Candidate ¶
type Candidate struct {
Database string
Measurement string
PartitionPath string
Files []string
FileCount int
Tier string
PartitionTime time.Time
BatchNumber int // Batch number when candidate is split (0 = not batched or first batch)
TotalBatches int // Total number of batches for this partition (0 = not batched)
}
Candidate represents a partition candidate for compaction
func SplitCandidateIntoBatches ¶
SplitCandidateIntoBatches splits a candidate with many files into multiple candidates, each with at most MaxFilesPerBatch files. This prevents DuckDB segfaults when processing thousands of files in a single read_parquet() call.
type ClusterGate ¶
type ClusterGate interface {
// CanCompact reports whether the local node may run compaction. When
// the scheduler sees false, it logs a clear "gated" message and stays
// idle (NOT running, NOT an error).
CanCompact() bool
// Role returns the current node's role string for log messages. The
// scheduler only uses this for human-readable output, never for
// decision-making — CanCompact is authoritative.
Role() string
}
ClusterGate is the minimal interface the compaction scheduler needs to decide whether the local node may run compaction. Phase 4 wires this to a closure that checks RoleCompactor against cfg.Cluster.Role; OSS and standalone nodes pass a nil gate (or construct without one) and are allowed unconditionally.
The interface lives here rather than in the cluster package so the scheduler has no compile-time dependency on the cluster package — the gate is a function pointer plus a tiny interface, not an import.
type CompactedFile ¶
type CompactedFile struct {
Path string // storage-relative path
SHA256 string // hex-encoded 64 chars
SizeBytes int64
Database string
Measurement string
PartitionTime time.Time
Tier string
CreatedAt time.Time
}
CompactedFile is the cluster-agnostic shape the bridge uses to register a compacted output in the Raft manifest. It intentionally excludes raft.FileEntry.LSN (the FSM stamps it at apply time) and raft.FileEntry.OriginNodeID (the bridge sets it to the local node ID because the compactor IS the origin from Phase 2/3's perspective).
The bridge converts CompactedFile to raft.FileEntry inside the cluster package, keeping the compaction package free of any raft.* imports.
type CompactedOutput ¶
type CompactedOutput struct {
Path string `json:"path"` // storage-relative path (e.g. "mydb/cpu/2026/04/11/14/compacted_....parquet")
SHA256 string `json:"sha256"` // hex-encoded 64 chars; bridged into raft.FileEntry.SHA256
SizeBytes int64 `json:"size_bytes"` // authoritative size after upload
Database string `json:"database"` // bridged into raft.FileEntry.Database
Measurement string `json:"measurement"` // bridged into raft.FileEntry.Measurement
PartitionTime time.Time `json:"partition_time"` // bridged into raft.FileEntry.PartitionTime
Tier string `json:"tier"` // "hot", "cold", etc.
CreatedAt time.Time `json:"created_at"` // bridged into raft.FileEntry.CreatedAt
}
CompactedOutput describes a single compacted file produced by a job. All fields are required when the enclosing manifest is in state output_written or sources_deleted — the watcher converts this struct to a raft.FileEntry via the cluster-side bridge.
Outputs is a slice on the manifest (not a single value) because a future compaction strategy (Phase 5+) may split a partition into multiple outputs. Phase 4 always populates exactly one entry per manifest.
type CompletionManifest ¶
type CompletionManifest struct {
JobID string `json:"job_id"` // matches Job.JobID; unique per attempt
Database string `json:"database"` // for operator debugging; Outputs[].Database is authoritative
Measurement string `json:"measurement"` // for operator debugging
PartitionPath string `json:"partition_path"` // for operator debugging
Tier string `json:"tier"` // for operator debugging
State CompletionState `json:"state"` // lifecycle phase; drives watcher behavior
Outputs []CompactedOutput `json:"outputs"` // set when State >= output_written
DeletedSources []string `json:"deleted_sources"` // set when State == sources_deleted; storage-relative paths
CreatedAt time.Time `json:"created_at"` // when the manifest was first written
UpdatedAt time.Time `json:"updated_at"` // bumped on every state transition
}
CompletionManifest is the durable handoff from the compaction subprocess to the parent-side watcher. One file per job. Written atomically via tmp-file-plus-rename; never appended to.
The watcher treats CompletionManifest as append-only by convention: the subprocess only rewrites it at state transitions, and the watcher only reads it. If the watcher picks up a manifest mid-transition (possible if the subprocess is slow between json.Marshal and os.Rename), the read fails cleanly because the final path only exists after the rename completes — readers see either the pre-state or the post-state, never a torn value.
type CompletionState ¶
type CompletionState string
CompletionState is the lifecycle phase of a CompletionManifest. The subprocess advances the state in place as it progresses through the job; the watcher applies manifests whose state is output_written or later.
const ( // CompletionStateWritingOutput means the subprocess has started the job // but has not yet successfully written the compacted file to storage. // Manifests in this state are NOT picked up by the watcher — they may // be stale (subprocess crashed) and will be cleaned up separately. CompletionStateWritingOutput CompletionState = "writing_output" // CompletionStateOutputWritten means the compacted file has been // durably uploaded to storage and its SHA-256 is known. Source files // have NOT yet been deleted. The watcher applies RegisterFile in this // state, but does NOT yet apply DeleteFile for sources because they // may still be present in storage — deleting them from the manifest // while they still exist on disk would leave orphans. The watcher // waits for the sources_deleted state before issuing DeleteFile. CompletionStateOutputWritten CompletionState = "output_written" // CompletionStateSourcesDeleted means source files have been deleted // from storage. The watcher applies both RegisterFile and DeleteFile // in this state and then removes the manifest file. CompletionStateSourcesDeleted CompletionState = "sources_deleted" )
type CompletionWatcher ¶
type CompletionWatcher struct {
// contains filtered or unexported fields
}
CompletionWatcher polls the completion-manifest directory and applies pending manifests via the ManifestBridge. Run one per compactor node.
Lifecycle:
w := NewCompletionWatcher(cfg) w.Start(ctx) // kicks off the poll loop in a background goroutine ... w.Stop() // signals stop, waits for the loop to drain one final poll
The watcher is safe to Start/Stop multiple times. It is NOT safe for concurrent Start calls from different goroutines.
func NewCompletionWatcher ¶
func NewCompletionWatcher(cfg CompletionWatcherConfig) (*CompletionWatcher, error)
NewCompletionWatcher constructs a watcher. Returns an error if any required dependency is missing.
func (*CompletionWatcher) Start ¶
func (w *CompletionWatcher) Start(parentCtx context.Context)
Start launches the poll loop in a background goroutine. Safe to call multiple times — second and subsequent calls are no-ops.
func (*CompletionWatcher) Stats ¶
func (w *CompletionWatcher) Stats() map[string]int64
Stats returns a point-in-time snapshot of the watcher's metrics, suitable for /api/v1/cluster/status. The returned map is safe to marshal as JSON directly — all values are int64 or strings.
func (*CompletionWatcher) Stop ¶
func (w *CompletionWatcher) Stop()
Stop signals the poll loop to exit and waits for it to drain. Safe to call multiple times; second and subsequent calls are no-ops. After Stop, the watcher can be Started again with a fresh context if desired.
type CompletionWatcherConfig ¶
type CompletionWatcherConfig struct {
// Dir is the local-disk directory scanned for pending completion manifests.
// Matches the compaction Manager's CompletionDir field. Must not be empty.
Dir string
// Bridge is the cluster-side manifest appender. Required.
Bridge ManifestBridge
// PollInterval is the time between scan cycles. Default: 1s. Smaller
// values catch up faster but add load; larger values delay visibility
// of compacted files in the cluster manifest.
PollInterval time.Duration
// ApplyTimeout bounds a single bridge call. Default: 5s. The watcher
// derives a fresh context from its own context with this deadline for
// each Register/Delete call so a stuck bridge call doesn't pin the
// poll loop.
ApplyTimeout time.Duration
// Logger receives structured log output. Defaults to Nop if zero.
Logger zerolog.Logger
}
CompletionWatcherConfig bundles the watcher's dependencies and tunables. All fields are required unless explicitly marked optional.
type DailyTier ¶
type DailyTier struct {
*BaseTier
SkipFileAgeCheckDays int // Skip file creation time check for partitions older than this
}
DailyTier implements daily compaction (Tier 2) Compacts hourly-compacted files into daily files
func NewDailyTier ¶
func NewDailyTier(cfg *DailyTierConfig) *DailyTier
NewDailyTier creates a new daily compaction tier
func (*DailyTier) FindCandidates ¶
func (t *DailyTier) FindCandidates(ctx context.Context, database, measurement string) ([]Candidate, error)
FindCandidates finds daily partitions that are candidates for compaction
func (*DailyTier) GetPartitionLevel ¶
GetPartitionLevel returns the partition level
func (*DailyTier) GetTierName ¶
GetTierName returns the tier name
func (*DailyTier) IsCompactedFile ¶
IsCompactedFile checks if a file is a compacted daily file
type DailyTierConfig ¶
type DailyTierConfig struct {
StorageBackend storage.Backend
MinAgeHours int // Don't compact days younger than this (default: 24)
MinFiles int // Only compact days with at least this many files (default: 12)
SkipFileAgeCheckDays int // Skip file creation time check for partitions older than this (default: 7)
TargetSizeMB int // Target size for compacted files (default: 2048)
Enabled bool // Enable daily compaction (default: true)
Logger zerolog.Logger
}
DailyTierConfig holds configuration for daily compaction tier
type DeleteSourceOp ¶
DeleteSourceOp carries the arguments for a DeleteCompactedSource batch operation. A struct rather than bare string fields keeps the batch call signature extensible without breaking callers.
type HourlyTier ¶
type HourlyTier struct {
*BaseTier
}
HourlyTier implements hourly compaction (Tier 1) Compacts small files within hourly partitions
func NewHourlyTier ¶
func NewHourlyTier(cfg *HourlyTierConfig) *HourlyTier
NewHourlyTier creates a new hourly compaction tier
func (*HourlyTier) FindCandidates ¶
func (t *HourlyTier) FindCandidates(ctx context.Context, database, measurement string) ([]Candidate, error)
FindCandidates finds hourly partitions that are candidates for compaction
func (*HourlyTier) GetPartitionLevel ¶
func (t *HourlyTier) GetPartitionLevel() string
GetPartitionLevel returns the partition level
func (*HourlyTier) GetStats ¶
func (t *HourlyTier) GetStats() map[string]interface{}
GetStats returns tier statistics
func (*HourlyTier) GetTierName ¶
func (t *HourlyTier) GetTierName() string
GetTierName returns the tier name
func (*HourlyTier) IsCompactedFile ¶
func (t *HourlyTier) IsCompactedFile(filename string) bool
IsCompactedFile checks if a file is a compacted hourly file
func (*HourlyTier) ShouldCompact ¶
func (t *HourlyTier) ShouldCompact(files []string, partitionTime time.Time) bool
ShouldCompact determines if an hourly partition should be compacted
type HourlyTierConfig ¶
type HourlyTierConfig struct {
StorageBackend storage.Backend
MinAgeHours int // Don't compact partitions younger than this (default: 1)
MinFiles int // Only compact partitions with at least this many files (default: 10)
TargetSizeMB int // Target size for compacted files (default: 512)
Enabled bool // Enable hourly compaction (default: true)
Logger zerolog.Logger
}
HourlyTierConfig holds configuration for hourly compaction tier
type Job ¶
type Job struct {
// Configuration
Measurement string
PartitionPath string
Files []string // Original files to be compacted
StorageBackend storage.Backend
Database string
TargetSizeMB int
Tier string
TempDirectory string // Base temp directory for compaction files
SortKeys []string // Sort keys for this measurement (for ORDER BY in compaction)
// Job metadata
JobID string
StartedAt *time.Time
CompletedAt *time.Time
Status JobStatus
Error error
// Metrics
FilesCompacted int
BytesBefore int64
BytesAfter int64
DurationSeconds float64
// Phase 4: cluster-mode completion manifest. When CompletionDir is
// non-empty, the job writes a local-disk CompletionManifest at each
// state transition so the parent-side CompletionWatcher can apply
// RegisterFile/DeleteFile commands to the Raft manifest. Empty means
// OSS / standalone mode — no manifest is written and behavior is
// byte-identical to pre-Phase-4.
//
// PartitionTime is the tier-scanner's authoritative timestamp for the
// partition (see Candidate.PartitionTime in tier.go). Surfaced in the
// completion manifest so the bridge can set raft.FileEntry.PartitionTime.
// Only meaningful when clusterMode() is true; zero value in OSS.
CompletionDir string
PartitionTime time.Time
// contains filtered or unexported fields
}
Job represents a single compaction job for a partition
type JobConfig ¶
type JobConfig struct {
Measurement string
PartitionPath string
Files []string
StorageBackend storage.Backend
Database string
TargetSizeMB int
Tier string
TempDirectory string // Base temp directory for compaction files (default: ./data/compaction)
SortKeys []string // Sort keys for this measurement (for ORDER BY in compaction)
Logger zerolog.Logger
DB *sql.DB // Shared DuckDB connection (avoids memory retention from temp connections)
ManifestManager *ManifestManager // Manifest manager for crash recovery (optional, recommended)
// CompletionDir is the Phase 4 cluster-mode completion-manifest directory.
// When non-empty, the job writes a local-disk CompletionManifest at each
// state transition. Empty means OSS / standalone — no completion manifest
// is written and the job is byte-compatible with pre-Phase-4 behavior.
CompletionDir string
// JobID, if set, overrides the auto-generated JobID. Phase 4 uses this
// so the parent (which spawns the subprocess) and the subprocess agree
// on the completion-manifest filename. Empty means auto-generate.
JobID string
// PartitionTime is the authoritative partition timestamp from the tier
// scanner. Threaded through so the completion manifest includes it for
// the Raft FileEntry. Zero value in OSS.
PartitionTime time.Time
}
JobConfig holds configuration for creating a compaction job
type LockManager ¶
type LockManager struct {
// contains filtered or unexported fields
}
LockManager manages locks for compaction partitions
func (*LockManager) AcquireLock ¶
func (l *LockManager) AcquireLock(key string) bool
AcquireLock attempts to acquire a lock for a partition
func (*LockManager) IsLocked ¶
func (l *LockManager) IsLocked(key string) bool
IsLocked checks if a partition is locked
func (*LockManager) ReleaseLock ¶
func (l *LockManager) ReleaseLock(key string)
ReleaseLock releases a lock for a partition
type Manager ¶
type Manager struct {
StorageBackend storage.Backend
LockManager *LockManager
ManifestManager *ManifestManager
// Configuration
MinAgeHours int
MinFiles int
TargetSizeMB int
MaxConcurrent int
TempDirectory string // Temp directory for compaction files
MemoryLimit string // DuckDB memory limit for subprocess (e.g., "8GB")
// Phase 4: local-disk directory where compaction subprocesses write
// completion manifests for the parent-side CompletionWatcher to pick
// up. Empty means "OSS mode, no completion-manifest handoff". Set by
// the main wiring to filepath.Join(TempDirectory, ".completion", "pending")
// when clustering + replication are enabled.
CompletionDir string
// Sort key configuration (from ingest config)
SortKeysConfig map[string][]string // measurement -> sort keys
DefaultSortKeys []string // default sort keys
// Tiers
Tiers []Tier
// Metrics
TotalJobsCompleted int
TotalJobsFailed int
TotalFilesCompacted int
TotalBytesSaved int64
TotalManifestsRecov int // Number of manifests recovered
// contains filtered or unexported fields
}
Manager orchestrates compaction jobs across all measurements
func NewManager ¶
func NewManager(cfg *ManagerConfig) *Manager
NewManager creates a new compaction manager
func (*Manager) CleanupOrphanedCompletionManifests ¶
CleanupOrphanedCompletionManifests sweeps the Phase 4 completion-manifest directory for entries in state "writing_output" that are older than orphanTimeout. These represent compaction subprocesses that started writing a manifest but crashed before advancing to output_written — the watcher will never pick them up (they're still in the initial state), and leaving them on disk would cause confusing "stuck in writing_output" log noise on every startup.
Manifests in state output_written or sources_deleted are left alone no matter how old they are — those are valid pending work that the watcher needs to process, and aging them out would silently drop Raft manifest updates the cluster needs.
A zero or negative orphanTimeout means "use 10 minutes" so tests and production agree on a safe default unless the operator explicitly tunes it.
Safe to call multiple times. Safe when CompletionDir is empty (no-op).
func (*Manager) CleanupOrphanedTempDirs ¶
CleanupOrphanedTempDirs removes orphaned temp directories from previous runs. This handles cleanup after pod crashes where the defer cleanup didn't run. Call this on startup before running compaction cycles.
Phase 4: skips reserved subdirectories (see reservedTempSubdirs). Those are managed by their own cleanup paths (e.g. CleanupOrphanedCompletionManifests) and must not be treated as stale temp dirs.
func (*Manager) CompactPartition ¶
CompactPartition compacts a single partition using subprocess isolation. Running compaction in a subprocess ensures that DuckDB's jemalloc memory is fully released when the subprocess exits, preventing memory retention.
func (*Manager) FindCandidates ¶
FindCandidates finds partitions that are candidates for compaction across all databases
func (*Manager) GetCurrentCycleID ¶
GetCurrentCycleID returns the current or most recent cycle ID
func (*Manager) GetSortKeys ¶
GetSortKeys returns sort keys for a measurement. Checks measurement-specific config first, then falls back to default.
func (*Manager) IsCycleRunning ¶
IsCycleRunning returns true if a compaction cycle is currently in progress
func (*Manager) RunCompactionCycle ¶
RunCompactionCycle runs one compaction cycle for all enabled tiers. Returns the cycle ID and an error if the cycle couldn't be started. Returns ErrCycleAlreadyRunning if a cycle is already in progress.
func (*Manager) RunCompactionCycleForDatabase ¶
func (m *Manager) RunCompactionCycleForDatabase(ctx context.Context, database string, tierNames []string) (int64, error)
RunCompactionCycleForDatabase runs a compaction cycle for a single database. tierNames must be non-empty - specify which tiers to run explicitly.
func (*Manager) RunCompactionCycleForTiers ¶
func (m *Manager) RunCompactionCycleForTiers(ctx context.Context, tierNames []string) (int64, error)
RunCompactionCycleForTiers runs a complete compaction cycle for specific tiers across all databases. tierNames must be non-empty - specify which tiers to run explicitly.
func (*Manager) SetOnCompactionComplete ¶
func (m *Manager) SetOnCompactionComplete(fn func())
SetOnCompactionComplete sets the callback invoked after each successful compaction job. This is used to invalidate DuckDB and query caches in the parent process after the compaction subprocess deletes old parquet files.
type ManagerConfig ¶
type ManagerConfig struct {
StorageBackend storage.Backend
LockManager *LockManager
MinAgeHours int
MinFiles int
TargetSizeMB int
MaxConcurrent int
TempDirectory string // Temp directory for compaction files
MemoryLimit string // DuckDB memory limit for subprocess (e.g., "8GB")
CompletionDir string // Phase 4: local-disk completion-manifest dir (empty = OSS mode)
SortKeysConfig map[string][]string // Per-measurement sort keys from ingest config
DefaultSortKeys []string // Default sort keys from ingest config
Tiers []Tier
Logger zerolog.Logger
}
ManagerConfig holds configuration for creating a compaction manager
type Manifest ¶
type Manifest struct {
// Output file information
OutputPath string `json:"output_path"` // Full storage path of compacted file
OutputSize int64 `json:"output_size"` // Expected size of output file (for validation)
// Input files that were compacted
InputFiles []string `json:"input_files"`
// Metadata
Database string `json:"database"`
Measurement string `json:"measurement"`
PartitionPath string `json:"partition_path"`
Tier string `json:"tier"`
Status ManifestStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
JobID string `json:"job_id"`
}
Manifest tracks the state of a compaction operation for crash recovery. If a pod crashes after uploading the compacted file but before deleting source files, the manifest allows recovery to complete the deletion.
type ManifestBridge ¶
type ManifestBridge interface {
RegisterCompactedFile(ctx context.Context, file CompactedFile) error
DeleteCompactedSource(ctx context.Context, path, reason string) error
// BatchFileOps groups all register and delete operations for one manifest
// into a single Raft log entry. The watcher always calls this in applyOne
// instead of the individual methods — O(1) Raft applies per manifest
// regardless of file count.
BatchFileOps(ctx context.Context, registers []CompactedFile, deletes []DeleteSourceOp) error
}
ManifestBridge is the narrow interface the compaction package imports from the cluster package. It abstracts "append to the cluster manifest" without pulling in the Raft types or the coordinator struct directly.
Both methods MUST return ErrNotLeader (via %w wrapping) when the local node is not the Raft leader — the watcher relies on errors.Is to keep the manifest on disk for retry. Any other error is treated as a hard failure and the manifest is kept for inspection but the failure is logged at Error.
Context is accepted so the watcher can bound each apply attempt by its poll interval; implementations typically pass it through to the Raft Apply call's timeout budget.
type ManifestManager ¶
type ManifestManager struct {
// contains filtered or unexported fields
}
ManifestManager handles reading, writing, and recovering from compaction manifests
func NewManifestManager ¶
func NewManifestManager(backend storage.Backend, logger zerolog.Logger) *ManifestManager
NewManifestManager creates a new manifest manager
func (*ManifestManager) DeleteManifest ¶
func (m *ManifestManager) DeleteManifest(ctx context.Context, manifestPath string) error
DeleteManifest removes a manifest from storage
func (*ManifestManager) GenerateManifestPath ¶
func (m *ManifestManager) GenerateManifestPath(tier, database, partitionPath, jobID string) string
GenerateManifestPath generates a unique manifest path for a compaction job
func (*ManifestManager) GetFilesInManifests ¶
func (m *ManifestManager) GetFilesInManifests(ctx context.Context) (map[string]struct{}, error)
GetFilesInManifests returns a set of all input files currently tracked by manifests. This is used to exclude files from compaction candidate scans.
func (*ManifestManager) IsFileInManifest ¶
IsFileInManifest checks if a file is tracked by any manifest
func (*ManifestManager) ListManifests ¶
func (m *ManifestManager) ListManifests(ctx context.Context) ([]string, error)
ListManifests lists all manifest files in storage
func (*ManifestManager) ReadManifest ¶
ReadManifest reads a manifest from storage
func (*ManifestManager) RecoverOrphanedManifests ¶
func (m *ManifestManager) RecoverOrphanedManifests(ctx context.Context) (int, error)
RecoverOrphanedManifests finds and processes orphaned manifests from interrupted compactions. Returns the number of manifests recovered and any error encountered.
func (*ManifestManager) WriteManifest ¶
WriteManifest writes a manifest to storage
type ManifestStatus ¶
type ManifestStatus string
ManifestStatus represents the state of a compaction manifest
const ( // ManifestStatusPending indicates the compaction is in progress ManifestStatusPending ManifestStatus = "pending" )
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler schedules compaction jobs using cron-style schedules
func NewScheduler ¶
func NewScheduler(cfg *SchedulerConfig) (*Scheduler, error)
NewScheduler creates a new compaction scheduler
type SchedulerConfig ¶
type SchedulerConfig struct {
Manager *Manager
Schedule string // Cron schedule string (e.g., "5 * * * *" for every hour at :05)
TierNames []string // Specific tiers to process (must be non-empty and enabled)
Enabled bool // Enable automatic scheduling
ClusterGate ClusterGate // Optional Phase 4 role check; nil means "no gate, allow"
Logger zerolog.Logger
}
SchedulerConfig holds configuration for creating a compaction scheduler
type SubprocessJobConfig ¶
type SubprocessJobConfig struct {
Database string `json:"database"`
Measurement string `json:"measurement"`
PartitionPath string `json:"partition_path"`
Files []string `json:"files"`
Tier string `json:"tier"`
TargetSizeMB int `json:"target_size_mb"`
TempDirectory string `json:"temp_directory"`
SortKeys []string `json:"sort_keys"` // Sort keys for ORDER BY in compaction
MemoryLimit string `json:"memory_limit"` // DuckDB memory limit (e.g., "8GB")
// Storage configuration
StorageType string `json:"storage_type"` // "local" or "s3"
StorageConfig string `json:"storage_config"` // JSON-encoded storage-specific config
// Phase 4: cluster-mode completion-manifest handoff. When CompletionDir
// is non-empty, the job writes a local-disk CompletionManifest that the
// parent-side CompletionWatcher picks up and applies to the Raft manifest.
// PartitionTime is the tier-scanner's authoritative timestamp for the
// partition — bridged into raft.FileEntry.PartitionTime so peers know
// which hour/day this compacted file belongs to.
// JobID is generated by the parent (not NewJob) so both sides agree on
// the completion-manifest filename. Empty values disable the feature.
JobID string `json:"job_id,omitempty"`
CompletionDir string `json:"completion_dir,omitempty"`
PartitionTime time.Time `json:"partition_time,omitempty"`
}
SubprocessJobConfig is the serializable configuration passed to the subprocess. It contains all information needed to run a compaction job in isolation.
type SubprocessJobResult ¶
type SubprocessJobResult struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
FilesCompacted int `json:"files_compacted"`
BytesBefore int64 `json:"bytes_before"`
BytesAfter int64 `json:"bytes_after"`
OutputFile string `json:"output_file,omitempty"`
}
SubprocessJobResult is returned by the subprocess via stdout as JSON. It contains the outcome of the compaction job.
func RunJobInSubprocess ¶
func RunJobInSubprocess(ctx context.Context, config *SubprocessJobConfig, logger zerolog.Logger, extraEnv ...string) (*SubprocessJobResult, error)
RunJobInSubprocess executes compaction in a subprocess for memory isolation. The subprocess runs the same binary with the "compact" subcommand. When the subprocess exits, all DuckDB memory (including jemalloc arenas) is released. extraEnv can be used to pass additional environment variables (e.g., credentials).
func RunSubprocessJob ¶
func RunSubprocessJob(config *SubprocessJobConfig) (*SubprocessJobResult, error)
RunSubprocessJob is called from the subprocess to execute compaction. It creates a new DuckDB connection, runs the job, and returns the result. When this function returns and the subprocess exits, all DuckDB memory is released.
Memory profiling: Set ARC_COMPACTION_PROFILE=1 to enable heap profiling. Profiles are written to /tmp/arc_compaction_heap_before.pprof and /tmp/arc_compaction_heap_after.pprof
type Tier ¶
type Tier interface {
// GetTierName returns the human-readable tier name (e.g., 'daily', 'weekly', 'monthly')
GetTierName() string
// GetPartitionLevel returns the partition level for this tier (e.g., 'day', 'week', 'month')
GetPartitionLevel() string
// FindCandidates finds partitions that are candidates for compaction at this tier level
FindCandidates(ctx context.Context, database, measurement string) ([]Candidate, error)
// ShouldCompact determines if a partition should be compacted based on tier-specific criteria
ShouldCompact(files []string, partitionTime time.Time) bool
// IsCompactedFile checks if a file is already a compacted file from this tier
IsCompactedFile(filename string) bool
// IsEnabled returns whether this tier is enabled
IsEnabled() bool
// GetStats returns tier statistics
GetStats() map[string]interface{}
}
Tier defines the interface for compaction tiers (hourly, daily, weekly, monthly)