Documentation
¶
Index ¶
- Constants
- func BatchLoadExpectedChildrenByDSTIDs(database *db.DB, dstParentIDs []string, dstIDToPath map[string]string) (expectedFoldersMap map[string][]types.Folder, ...)
- func BatchLoadRetryDstCleanup(database *db.DB, srcFolderIDs []string) (map[string]*RetryDstCleanup, error)
- func BuildExpectedMapsFromDstWithChildren(dstBatch []db.FetchResult, childrenByDstID map[string][]*db.NodeState) (expectedFoldersMap map[string][]types.Folder, ...)
- func SeedRootTask(queueType string, rootFolder types.Folder, rootNodeID string, database *db.DB) error
- func SeedRootTaskWithSrcID(queueType string, rootFolder types.Folder, rootNodeID string, srcID string, ...) error
- func SeedRootTasks(srcRoot types.Folder, dstRoot types.Folder, database *db.DB) error
- type ChildResult
- type CompletionCheckOptions
- type CopyTask
- type CopyWorker
- type EngineCaches
- type ExternalQueueMetrics
- type InternalQueueMetrics
- type LevelCache
- func (lc *LevelCache) Clear()
- func (lc *LevelCache) Count() int
- func (lc *LevelCache) Get(id string) *db.NodeState
- func (lc *LevelCache) GetByPath(path string) *db.NodeState
- func (lc *LevelCache) GetRef(id string) *db.NodeState
- func (lc *LevelCache) ListChildrenByParentPath(parentPath string) []*db.NodeState
- func (lc *LevelCache) ListPending(afterID string, limit int) []*db.NodeState
- func (lc *LevelCache) ListPendingCopy(afterID string, limit int, nodeType string) []*db.NodeState
- func (lc *LevelCache) Put(id string, n *db.NodeState)
- func (lc *LevelCache) PutIfAbsent(id string, n *db.NodeState) bool
- func (lc *LevelCache) Snapshot() map[string]*db.NodeState
- func (lc *LevelCache) UpdateStatus(id string, traversalStatus, copyStatus string)
- type LevelStats
- type NodeCache
- func (nc *NodeCache) DropLevel(depth int)
- func (nc *NodeCache) EnsureLevel(depth int) *LevelCache
- func (nc *NodeCache) EnsureLevelStats(depth int) *LevelStats
- func (nc *NodeCache) GetLevel(depth int) *LevelCache
- func (nc *NodeCache) GetLevelStats(depth int) *LevelStats
- func (nc *NodeCache) IncrementCompleted(depth int)
- func (nc *NodeCache) IncrementPending(depth int)
- func (nc *NodeCache) LevelDepths() []int
- func (nc *NodeCache) PromoteLevel(fromDepth, toDepth int)
- func (nc *NodeCache) RecordCopyTransition(depth int, oldStatus, newStatus string)
- func (nc *NodeCache) RecordTraversalTransition(depth int, oldStatus, newStatus string)
- func (nc *NodeCache) SetLevelStats(depth int, pending, successful, failed, completed int64)
- func (nc *NodeCache) TotalNodeCount() int
- type Queue
- func (q *Queue) Add(task *TaskBase) bool
- func (q *Queue) AddWorker(worker Worker)
- func (q *Queue) AdvanceCopyRound()
- func (q *Queue) AdvanceTraversalRound()
- func (q *Queue) CheckCopyCompletion(currentRound int, wasFirstPull bool) bool
- func (q *Queue) CheckTraversalCompletion(currentRound int) bool
- func (q *Queue) Clear()
- func (q *Queue) Close()
- func (q *Queue) CompleteCopyTask(task *TaskBase, executionDelta time.Duration)
- func (q *Queue) CompleteTraversalTask(task *TaskBase, executionDelta time.Duration)
- func (q *Queue) EnsureRoundExpectedFromStats()
- func (q *Queue) FailCopyTask(task *TaskBase, executionDelta time.Duration)
- func (q *Queue) FailTraversalTask(task *TaskBase, executionDelta time.Duration)
- func (q *Queue) GetAverageExecutionTime() time.Duration
- func (q *Queue) GetBytesTransferredTotal() int64
- func (q *Queue) GetCopyPass() int
- func (q *Queue) GetExecutionTimeBufferSize() int
- func (q *Queue) GetExecutionTimeDeltas() []time.Duration
- func (q *Queue) GetFilesCreatedTotal() int64
- func (q *Queue) GetFilesDiscoveredTotal() int64
- func (q *Queue) GetFoldersCreatedTotal() int64
- func (q *Queue) GetFoldersDiscoveredTotal() int64
- func (q *Queue) GetMode() QueueMode
- func (q *Queue) GetPendingCount() int
- func (q *Queue) GetRound() int
- func (q *Queue) GetRoundStats(round int) *RoundStats
- func (q *Queue) GetTotalCompleted() int
- func (q *Queue) GetTotalDiscovered() int64
- func (q *Queue) GetTotalFailed() int
- func (q *Queue) InProgressCount() int
- func (q *Queue) InitializeCopyWithContext(database *db.DB, srcAdapter, dstAdapter types.FSAdapter, ...)
- func (q *Queue) InitializeWithContext(database *db.DB, adapter types.FSAdapter, shutdownCtx context.Context)
- func (q *Queue) IsExhausted() bool
- func (q *Queue) IsPaused() bool
- func (q *Queue) Lease() *TaskBase
- func (q *Queue) Name() string
- func (q *Queue) NodeCache() *NodeCache
- func (q *Queue) OtherNodeCache() *NodeCache
- func (q *Queue) Pause()
- func (q *Queue) PullCopyTasks(force bool)
- func (q *Queue) PullRetryTasks(force bool)
- func (q *Queue) PullTasksIfNeeded(force bool)
- func (q *Queue) PullTraversalTasks(force bool)
- func (q *Queue) RehydrateLevelFromDB(depth int)
- func (q *Queue) ReportTaskResult(task *TaskBase, result TaskExecutionResult)
- func (q *Queue) Resume()
- func (q *Queue) Run()
- func (q *Queue) SetCopyPass(pass int)
- func (q *Queue) SetMaxKnownDepth(depth int)
- func (q *Queue) SetMode(mode QueueMode)
- func (q *Queue) SetNodeCache(c *NodeCache)
- func (q *Queue) SetObserver(observer *QueueObserver)
- func (q *Queue) SetOtherNodeCache(c *NodeCache)
- func (q *Queue) SetRound(round int)
- func (q *Queue) SetShutdownContext(ctx context.Context)
- func (q *Queue) SetState(state QueueState)
- func (q *Queue) SetStatsChannel(ch chan QueueStats)
- func (q *Queue) SetTraversalCacheLoaded(loaded bool)
- func (q *Queue) SetWorkers(workers []Worker)
- func (q *Queue) Shutdown()
- func (q *Queue) State() QueueState
- func (q *Queue) Stats() QueueStats
- func (q *Queue) TotalTracked() int
- type QueueCoordinator
- func (c *QueueCoordinator) CanDstStartRound(targetRound int) bool
- func (c *QueueCoordinator) CanSrcStartRound(srcRound int) bool
- func (c *QueueCoordinator) GetRound(which string) int
- func (c *QueueCoordinator) IsCompleted(queueType string) bool
- func (c *QueueCoordinator) MarkCompleted(which string)
- func (c *QueueCoordinator) SetMaxSrcAhead(n int)
- func (c *QueueCoordinator) UpdateRound(which string, round int)
- func (c *QueueCoordinator) WaitSealBackpressure(round int, database *db.DB)
- type QueueMode
- type QueueObserver
- type QueueState
- type QueueStateSnapshot
- type QueueStats
- type RetryDstChild
- type RetryDstCleanup
- type RoundInfo
- type RoundStats
- type SrcNodeMeta
- type TaskBase
- type TaskExecutionResult
- type TaskResult
- type TraversalWorker
- type UploadTask
- type Worker
Constants ¶
const ( TaskTypeSrcTraversal = "src-traversal" TaskTypeDstTraversal = "dst-traversal" TaskTypeUpload = "upload" TaskTypeCopy = "copy" TaskTypeCopyFolder = "copy-folder" // Copy phase: create folder TaskTypeCopyFile = "copy-file" // Copy phase: copy file with streaming )
Task types
Variables ¶
This section is empty.
Functions ¶
func BatchLoadExpectedChildrenByDSTIDs ¶
func BatchLoadExpectedChildrenByDSTIDs(database *db.DB, dstParentIDs []string, dstIDToPath map[string]string) ( expectedFoldersMap map[string][]types.Folder, expectedFilesMap map[string][]types.File, srcIDMap map[string]map[string]string, srcIDToMeta map[string]SrcNodeMeta, err error, )
BatchLoadExpectedChildrenByDSTIDs loads SRC children for the given DST folder IDs by joining on parent_path = dst.path. Prefer ListDstBatchWithSrcChildren + BuildExpectedMapsFromDstWithChildren for keyset-based DST pull (no IN query). Returns expectedFoldersMap, expectedFilesMap (keyed by DST task ID), srcIDMap (Type+DisplayName -> SRC node ID per DST ID), and srcIDToMeta (SRC ID -> meta).
func BatchLoadRetryDstCleanup ¶
func BatchLoadRetryDstCleanup(database *db.DB, srcFolderIDs []string) (map[string]*RetryDstCleanup, error)
BatchLoadRetryDstCleanup loads DST counterpart and children meta for each SRC folder ID (for retry mode DST cleanup).
func BuildExpectedMapsFromDstWithChildren ¶
func BuildExpectedMapsFromDstWithChildren(dstBatch []db.FetchResult, childrenByDstID map[string][]*db.NodeState) ( expectedFoldersMap map[string][]types.Folder, expectedFilesMap map[string][]types.File, srcIDMap map[string]map[string]string, srcIDToMeta map[string]SrcNodeMeta, )
BuildExpectedMapsFromDstWithChildren builds expectedFoldersMap, expectedFilesMap, srcIDMap, and srcIDToMeta from a DST batch and its SRC children (e.g. from ListDstBatchWithSrcChildren). Keyed by DST node ID.
func SeedRootTask ¶
func SeedRootTask(queueType string, rootFolder types.Folder, rootNodeID string, database *db.DB) error
SeedRootTask inserts the initial root folder task into DuckDB to kickstart traversal. For src: sets traversal_status='Pending' and copy_status='Pending' For dst: sets traversal_status='Pending' Ensures stats bucket exists and is updated with root task counts. rootNodeID should be a deterministic ID generated by db.DeterministicNodeID().
func SeedRootTaskWithSrcID ¶
func SeedRootTaskWithSrcID(queueType string, rootFolder types.Folder, rootNodeID string, srcID string, database *db.DB) error
SeedRootTaskWithSrcID inserts the initial root folder task into DuckDB with a pre-set SrcID. This is used for DST root nodes that need to be matched to SRC root nodes. rootNodeID should be a deterministic ID generated by db.DeterministicNodeID().
func SeedRootTasks ¶
SeedRootTasks is a convenience function to seed both src and dst root tasks at once. Writes root tasks to DuckDB. Generates deterministic IDs for both root nodes and matches DST root to SRC root.
Types ¶
type ChildResult ¶
type ChildResult struct {
Folder types.Folder // Folder info (if folder)
File types.File // File info (if file)
Status string // "pending", "successful", "missing", "not_on_src"
IsFile bool // true if this is a file, false if folder
SrcID string // ULID of corresponding SRC node (for DST nodes only, set during matching)
SrcCopyStatus string // Copy status to update on SRC node (if SrcID is set and match found): "pending" or "successful", empty if no update needed
}
ChildResult represents a discovered child node with its traversal status.
type CompletionCheckOptions ¶
type CompletionCheckOptions struct {
CheckRoundComplete bool // Check if current round is complete
CheckFinalCompletion bool // Check if traversal/copy is complete (mode-specific logic in CheckTraversalCompletion / CheckCopyCompletion)
AdvanceRoundIfComplete bool // Advance to next round if current round is complete
WasFirstPull bool // For copy mode only: first pull of the round (CheckTraversalCompletion derives this itself)
}
CompletionCheckOptions configures what actions to take during completion checks.
type CopyTask ¶
type CopyTask struct {
TaskBase
SrcId string
DstId string
DstCtx types.ServiceContext
}
CopyTask represents a generic copy operation.
type CopyWorker ¶
type CopyWorker struct {
// contains filtered or unexported fields
}
CopyWorker executes copy tasks by creating folders or streaming files from source to destination. Each worker runs independently in its own goroutine, continuously polling the queue for work.
func NewCopyWorker ¶
func NewCopyWorker( id string, queue *Queue, database *db.DB, srcAdapter types.FSAdapter, dstAdapter types.FSAdapter, shutdownCtx context.Context, ) *CopyWorker
NewCopyWorker creates a worker that executes copy tasks. shutdownCtx is optional - if provided, the worker will check for cancellation and exit on shutdown.
func (*CopyWorker) Run ¶
func (w *CopyWorker) Run()
Run is the main worker loop. It continuously polls the queue for tasks. When a task is found, it leases it, executes it, and reports the result. When no work is available or queue is paused, it briefly sleeps before polling again. When queue is exhausted, the worker exits.
type EngineCaches ¶
type EngineCaches struct {
Src *NodeCache // SRC traversal (and copy phase: SRC table)
Dst *NodeCache // DST traversal
}
EngineCaches holds separate node caches for SRC and DST. Each queue mutates its own cache; DST cross-queries SRC (read-only) for expected children via OtherNodeCache(). Copy phase uses Src for the SRC table and Dst for cross-check when needed.
func NewEngineCaches ¶
func NewEngineCaches() *EngineCaches
NewEngineCaches creates a new SRC and DST node cache pair for memory-first flow.
type ExternalQueueMetrics ¶
type ExternalQueueMetrics struct {
// Monotonic counters (traversal phase)
FilesDiscoveredTotal int64 `json:"files_discovered_total"`
FoldersDiscoveredTotal int64 `json:"folders_discovered_total"`
// EMA-smoothed rates (2-5 second window) - traversal phase
DiscoveryRateItemsPerSec float64 `json:"discovery_rate_items_per_sec"`
// Verification counts (for O(1) stats bucket lookups)
TotalDiscovered int64 `json:"total_discovered"` // files + folders
TotalPending int `json:"total_pending"` // pending across all rounds (from DB)
TotalFailed int `json:"total_failed"` // failed across all rounds
// Copy phase metrics (monotonic counters)
Folders int64 `json:"folders"` // Total folders created
Files int64 `json:"files"` // Total files created
Total int64 `json:"total"` // Total items (folders + files)
Bytes int64 `json:"bytes"` // Total bytes transferred
// Copy phase rates (EMA-smoothed)
ItemsPerSecond float64 `json:"items_per_second"` // Items/sec (folders + files)
BytesPerSecond float64 `json:"bytes_per_second"` // Bytes/sec
// Current state (for API)
QueueStats
Round int `json:"round"`
}
ExternalQueueMetrics contains user-facing metrics published to DuckDB for API access.
type InternalQueueMetrics ¶
type InternalQueueMetrics struct {
// State-based time tracking (additive counters)
TimeProcessing time.Duration
TimeWaitingOnQueue time.Duration
TimeWaitingOnFS time.Duration
TimeRateLimited time.Duration
TimePausedRoundBoundary time.Duration
TimeIdleNoWork time.Duration
// Capacity metrics
TasksCompletedWhileActive int64
ActiveProcessingTime time.Duration
// Utilization metrics
WallClockTime time.Duration
LastState QueueState
LastStateChangeTime time.Time
}
InternalQueueMetrics contains control system metrics stored in memory for autoscaling decisions.
type LevelCache ¶
type LevelCache struct {
Nodes map[string]*db.NodeState // id -> node state
ByPath map[string]*db.NodeState // path -> node (same ptrs as Nodes)
ByParentPath map[string]map[string]*db.NodeState // parentPath -> id -> node (same ptrs)
ByTraversalStatus map[string]map[string]struct{} // traversal status -> id set (pending, in_progress, successful, failed)
ByCopyStatus map[string]map[string]struct{} // copy status -> id set (pending, in_progress, successful, failed)
// contains filtered or unexported fields
}
LevelCache holds exactly one BFS level in memory (nodes keyed by id). Only two active levels exist at a time: N and N+1; sealed levels are flushed to DB and dropped. ByPath and ByParentPath are O(1) indexes into the same nodes (no duplicate node data). ByTraversalStatus and ByCopyStatus are status-keyed ID sets for O(pending) ListPending/ListPendingCopy (no full-level scan).
func (*LevelCache) Clear ¶
func (lc *LevelCache) Clear()
Clear removes all nodes. Call after sealing this level.
func (*LevelCache) Count ¶
func (lc *LevelCache) Count() int
Count returns the number of nodes in this level.
func (*LevelCache) Get ¶
func (lc *LevelCache) Get(id string) *db.NodeState
Get returns a copy of the node state for id, or nil if not present.
func (*LevelCache) GetByPath ¶
func (lc *LevelCache) GetByPath(path string) *db.NodeState
GetByPath returns a copy of the node with the given path, or nil if not present (O(1)).
func (*LevelCache) GetRef ¶
func (lc *LevelCache) GetRef(id string) *db.NodeState
GetRef returns the internal pointer for in-place read (caller must not mutate if shared). Prefer Get for safety.
func (*LevelCache) ListChildrenByParentPath ¶
func (lc *LevelCache) ListChildrenByParentPath(parentPath string) []*db.NodeState
ListChildrenByParentPath returns nodes at this level whose parent_path equals parentPath (O(1) index lookup + O(children) copy).
func (*LevelCache) ListPending ¶
func (lc *LevelCache) ListPending(afterID string, limit int) []*db.NodeState
ListPending returns up to limit nodes with traversal_status == pending and id > afterID, in id order. Uses status-keyed ID set for O(pending) instead of O(n) scan.
func (*LevelCache) ListPendingCopy ¶
ListPendingCopy returns up to limit nodes with copy_status == pending and id > afterID, in id order. If nodeType != "" filter by type. Uses status-keyed ID set for O(pending) instead of O(n) scan.
func (*LevelCache) Put ¶
func (lc *LevelCache) Put(id string, n *db.NodeState)
Put inserts or overwrites the node for id. Caller may pass the same pointer that will be stored. Indexes ByPath, ByParentPath, and status maps are kept in sync.
func (*LevelCache) PutIfAbsent ¶
func (lc *LevelCache) PutIfAbsent(id string, n *db.NodeState) bool
PutIfAbsent inserts n only if id is not already present. Returns true if inserted.
func (*LevelCache) Snapshot ¶
func (lc *LevelCache) Snapshot() map[string]*db.NodeState
Snapshot returns a copy of all node states in this level (for seal flush). Caller must not mutate the map.
func (*LevelCache) UpdateStatus ¶
func (lc *LevelCache) UpdateStatus(id string, traversalStatus, copyStatus string)
UpdateStatus updates traversal_status and optionally copy_status for the node at id. No-op if not present. Status-keyed ID sets are updated so ListPending/ListPendingCopy stay O(pending).
type LevelStats ¶
type LevelStats struct {
Pending int64
Successful int64
Failed int64
Completed int64 // tasks completed this round (success + final failure)
CopyPending int64
CopySuccessful int64
CopyFailed int64
}
LevelStats holds per-level counters for completion detection (in-memory; persisted at seal). For traversal: Pending/Successful/Failed are traversal status counts. For copy phase (SRC): use Copy* for copy status.
type NodeCache ¶
type NodeCache struct {
Levels map[int]*LevelCache // depth -> level cache
Stats map[int]*LevelStats // depth -> stats for that level
// contains filtered or unexported fields
}
NodeCache holds per-level caches. Only levels N and N+1 are active; sealed levels are flushed and removed.
func (*NodeCache) DropLevel ¶
DropLevel removes the level and its stats from memory. Call after sealing.
func (*NodeCache) EnsureLevel ¶
func (nc *NodeCache) EnsureLevel(depth int) *LevelCache
EnsureLevel returns the LevelCache for depth, creating it if needed.
func (*NodeCache) EnsureLevelStats ¶
func (nc *NodeCache) EnsureLevelStats(depth int) *LevelStats
EnsureLevelStats returns the LevelStats for depth, creating if needed (call when syncing from DB or before updating).
func (*NodeCache) GetLevel ¶
func (nc *NodeCache) GetLevel(depth int) *LevelCache
GetLevel returns the LevelCache for depth, or nil if not present.
func (*NodeCache) GetLevelStats ¶
func (nc *NodeCache) GetLevelStats(depth int) *LevelStats
GetLevelStats returns the LevelStats for depth, or nil.
func (*NodeCache) IncrementCompleted ¶
IncrementCompleted increments the completed count for the level (task finished: success or final failure).
func (*NodeCache) IncrementPending ¶
IncrementPending adds one to the pending count for the level (e.g. when adding a new node to the level).
func (*NodeCache) LevelDepths ¶
LevelDepths returns sorted level numbers present in the cache (for iteration over levels).
func (*NodeCache) PromoteLevel ¶
PromoteLevel makes the cache at fromDepth (e.g. N+1) become the cache at toDepth (e.g. N), and creates a fresh empty cache at fromDepth. Call after sealing N: flush and drop N, then PromoteLevel(N+1, N) so the former N+1 becomes the new current level.
func (*NodeCache) RecordCopyTransition ¶
RecordCopyTransition updates level copy stats for SRC (pending->in_progress on pull; in_progress->successful/failed on complete/fail).
func (*NodeCache) RecordTraversalTransition ¶
RecordTraversalTransition updates level stats for one status change (e.g. pending -> successful).
func (*NodeCache) SetLevelStats ¶
SetLevelStats overwrites the stats for depth (e.g. when bootstrapping from DB).
func (*NodeCache) TotalNodeCount ¶
TotalNodeCount returns the total number of nodes across all levels (for memory pressure checks).
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue maintains round-based task queues for BFS traversal coordination. It handles task leasing, retry logic, and cross-queue task propagation. All operational state lives in DuckDB, flushed via per-queue buffers.
func NewQueue ¶
func NewQueue(name string, maxRetries int, workerCount int, coordinator *QueueCoordinator) *Queue
NewQueue creates a new Queue instance.
func (*Queue) Add ¶
Add enqueues a task into the pending buffer. Returns false if task is nil, has empty ID, or is already in progress/pending.
func (*Queue) AddWorker ¶
AddWorker registers a worker with this queue for reference. Workers manage their own lifecycle - this is just for tracking/debugging.
func (*Queue) AdvanceCopyRound ¶
func (q *Queue) AdvanceCopyRound()
AdvanceCopyRound handles copy-specific round advancement logic. Checks for pending tasks matching the current pass and advances to the next applicable round.
func (*Queue) AdvanceTraversalRound ¶
func (q *Queue) AdvanceTraversalRound()
AdvanceTraversalRound handles traversal/retry-specific round advancement logic. For traversal/retry modes, simply increments the round by 1.
func (*Queue) CheckCopyCompletion ¶
CheckCopyCompletion checks if the copy phase should switch passes or complete. Returns true if the queue should mark as complete, false otherwise. This is called when advanceToNextRound can't find a next round for the current pass.
func (*Queue) CheckTraversalCompletion ¶
CheckTraversalCompletion checks if traversal/retry phase should complete. All traversal-specific completion logic lives here: cache loaded, attempted pull, first pull returned 0, no pending in cache. Returns true if the queue should mark as complete, false otherwise.
func (*Queue) Clear ¶
func (q *Queue) Clear()
Clear removes all tasks from DuckDB and resets in-progress tracking. Note: This is a destructive operation - use with caution.
func (*Queue) Close ¶
func (q *Queue) Close()
Close stops the queue and cleans up resources: stats publishing loop, output buffer, and sets state to Stopped if not already Completed or Stopped.
func (*Queue) CompleteCopyTask ¶
CompleteCopyTask handles successful completion of copy tasks. Updates copy status, creates DST node entry, and updates join-lookup mapping.
func (*Queue) CompleteTraversalTask ¶
CompleteTraversalTask handles successful completion of traversal/retry tasks. This includes child discovery, status updates, and buffer operations.
func (*Queue) EnsureRoundExpectedFromStats ¶
func (q *Queue) EnsureRoundExpectedFromStats()
EnsureRoundExpectedFromStats sets Expected for the current round from the stats bucket (O(1) lookup). Call after SetRound (e.g. on init or resume) so Expected reflects actual pending count and survives restarts.
func (*Queue) FailCopyTask ¶
FailCopyTask handles failure of copy tasks. Updates copy status to failed if max retries exceeded, or back to pending if retrying.
func (*Queue) FailTraversalTask ¶
FailTraversalTask handles failure of traversal/retry tasks. Retries up to maxRetries, then marks as failed.
func (*Queue) GetAverageExecutionTime ¶
GetAverageExecutionTime returns the current average task execution time.
func (*Queue) GetBytesTransferredTotal ¶
GetBytesTransferredTotal returns the total bytes transferred during copy phase.
func (*Queue) GetCopyPass ¶
GetCopyPass returns the current copy pass.
func (*Queue) GetExecutionTimeBufferSize ¶
GetExecutionTimeBufferSize returns the current size of the execution time buffer.
func (*Queue) GetExecutionTimeDeltas ¶
GetExecutionTimeDeltas returns a copy of the execution time deltas buffer.
func (*Queue) GetFilesCreatedTotal ¶
GetFilesCreatedTotal returns the total files created during copy phase.
func (*Queue) GetFilesDiscoveredTotal ¶
func (*Queue) GetFoldersCreatedTotal ¶
GetFoldersCreatedTotal returns the total folders created during copy phase.
func (*Queue) GetFoldersDiscoveredTotal ¶
func (*Queue) GetPendingCount ¶
GetPendingCount returns the number of tasks in the pending buffer.
func (*Queue) GetRoundStats ¶
func (q *Queue) GetRoundStats(round int) *RoundStats
GetRoundStats returns the statistics for a specific round. Returns nil if the round has no stats yet.
func (*Queue) GetTotalCompleted ¶
GetTotalCompleted returns the total number of completed tasks across all rounds.
func (*Queue) GetTotalDiscovered ¶
GetTotalDiscovered returns the total number of items discovered (files + folders).
func (*Queue) GetTotalFailed ¶
GetTotalFailed returns the total number of failed tasks across all rounds.
func (*Queue) InProgressCount ¶
InProgressCount returns the number of tasks currently being executed.
func (*Queue) InitializeCopyWithContext ¶
func (q *Queue) InitializeCopyWithContext(database *db.DB, srcAdapter, dstAdapter types.FSAdapter, shutdownCtx context.Context)
InitializeCopyWithContext sets up a copy queue with both source and destination adapters. This is specifically for copy mode which requires both adapters.
func (*Queue) InitializeWithContext ¶
func (q *Queue) InitializeWithContext(database *db.DB, adapter types.FSAdapter, shutdownCtx context.Context)
InitializeWithContext sets up the queue with DuckDB, context, and filesystem adapter references. Creates and starts workers immediately - they'll poll for tasks autonomously. shutdownCtx is optional - if provided, workers will check for cancellation and exit on shutdown. For copy mode, InitializeCopyWithContext should be used instead to provide both adapters.
func (*Queue) IsExhausted ¶
IsExhausted returns true if the queue has finished all traversal or has been stopped.
func (*Queue) Lease ¶
Lease attempts to lease a task for execution atomically. Returns nil if no tasks are available, queue is paused, or completed.
func (*Queue) OtherNodeCache ¶
OtherNodeCache returns the other queue's node cache for read-only cross-check (may be nil).
func (*Queue) Pause ¶
func (q *Queue) Pause()
Pause pauses the queue (workers will not lease new tasks).
func (*Queue) PullCopyTasks ¶
PullCopyTasks pulls copy tasks from DuckDB for the current round. Pulls from SRC copy status buckets, filters by pass (folders vs files), and skips round 0. Uses getter/setter methods - no direct mutex access.
func (*Queue) PullRetryTasks ¶
PullRetryTasks pulls retry tasks from failed/pending status buckets. Checks maxKnownDepth and scans all known levels up to maxKnownDepth, then uses normal traversal logic for deeper levels. Uses getter/setter methods - no direct mutex access.
func (*Queue) PullTasksIfNeeded ¶
func (*Queue) PullTraversalTasks ¶
PullTraversalTasks pulls traversal tasks from DuckDB for the current round. Uses getter/setter methods - no direct mutex access.
func (*Queue) RehydrateLevelFromDB ¶
RehydrateLevelFromDB loads all nodes at the given depth from the DB into this queue's node cache and syncs level stats. Used on resume so the memory-first path can continue from sealed state without re-pulling from DB on first pull.
func (*Queue) ReportTaskResult ¶
func (q *Queue) ReportTaskResult(task *TaskBase, result TaskExecutionResult)
ReportTaskResult reports the result of a task execution and handles post-processing. This is the event-driven entry point that replaces separate Complete()/Fail() calls. After processing the result, it checks if we need to pull more tasks or advance rounds.
func (*Queue) Run ¶
func (q *Queue) Run()
Run is the main queue coordination loop. It has an outer loop for rounds and an inner loop for each round. The outer loop checks coordinator gates before starting each round (DST only). The inner loop processes tasks until the round is complete.
func (*Queue) SetCopyPass ¶
SetCopyPass sets the current copy pass.
func (*Queue) SetMaxKnownDepth ¶
SetMaxKnownDepth sets the maximum depth for traversal/copy. Set to -1 to auto-detect.
func (*Queue) SetNodeCache ¶
SetNodeCache sets this queue's node cache (read-write). Call before Run() when using memory-first flow.
func (*Queue) SetObserver ¶
func (q *Queue) SetObserver(observer *QueueObserver)
SetObserver registers this queue with an observer for DuckDB stats publishing. The observer will poll this queue directly for statistics.
func (*Queue) SetOtherNodeCache ¶
SetOtherNodeCache sets the other queue's node cache (read-only for cross-check). Call before Run() when using memory-first flow.
func (*Queue) SetShutdownContext ¶
SetShutdownContext sets the shutdown context for the queue.
func (*Queue) SetState ¶
func (q *Queue) SetState(state QueueState)
SetState sets the queue lifecycle state.
func (*Queue) SetStatsChannel ¶
func (q *Queue) SetStatsChannel(ch chan QueueStats)
Shutdown gracefully shuts down the queue. No buffer to stop - all writes are direct/synchronous now. SetStatsChannel sets the channel for publishing queue statistics for UDP logging. The queue will periodically publish stats to this channel.
func (*Queue) SetTraversalCacheLoaded ¶
SetTraversalCacheLoaded sets whether the level cache has been loaded from DB (e.g. after RehydrateLevelFromDB). Until true, CheckTraversalCompletion returns false so the queue does not complete before round 0 is populated. Call after loading cache at startup; retry/sweeps should set true when their cache is ready.
func (*Queue) SetWorkers ¶
SetWorkers sets the workers associated with this queue.
func (*Queue) Shutdown ¶
func (q *Queue) Shutdown()
Shutdown stops the stats publishing loop and cleans up resources.
func (*Queue) State ¶
func (q *Queue) State() QueueState
State returns the current queue lifecycle state.
func (*Queue) Stats ¶
func (q *Queue) Stats() QueueStats
Stats returns a snapshot of the queue's current state. Uses only in-memory counters - no database queries.
func (*Queue) TotalTracked ¶
TotalTracked returns the total number of tasks across all rounds (pending + in-progress).
type QueueCoordinator ¶
type QueueCoordinator struct {
// contains filtered or unexported fields
}
QueueCoordinator manages round advancement gates for dual-BFS traversal. It enforces: DST cannot advance to round N until SRC has completed rounds N and N+1; SRC may not run ahead of DST by more than MaxSrcAhead rounds (default 3).
func NewQueueCoordinator ¶
func NewQueueCoordinator() *QueueCoordinator
NewQueueCoordinator creates a new coordinator.
func (*QueueCoordinator) CanDstStartRound ¶
func (c *QueueCoordinator) CanDstStartRound(targetRound int) bool
CanDstStartRound returns true if DST can start processing the specified round. DST can start round N if:
- SRC has completed traversal entirely (DST can proceed freely), OR
- SRC has completed rounds N and N+1 (SRC is at round N+2 or higher)
Note: Since DST can now freely advance to a round and then pause, the check is N+2 (if DST wants to start round 4, SRC needs to have completed rounds 4 and 5, so SRC >= 6). Once SRC is completed, DST can proceed at full speed with no restrictions.
func (*QueueCoordinator) CanSrcStartRound ¶
func (c *QueueCoordinator) CanSrcStartRound(srcRound int) bool
CanSrcStartRound returns true if SRC can run the given round: srcRound <= dstRound + maxSrcAhead (or SRC is done).
func (*QueueCoordinator) GetRound ¶
func (c *QueueCoordinator) GetRound(which string) int
GetRound returns the current round for SRC or DST.
func (*QueueCoordinator) IsCompleted ¶
func (c *QueueCoordinator) IsCompleted(queueType string) bool
IsCompleted returns true if the specified queue ("src", "dst", or "both") has completed traversal.
func (*QueueCoordinator) MarkCompleted ¶
func (c *QueueCoordinator) MarkCompleted(which string)
MarkCompleted marks SRC or DST as completed based on the argument ("src" or "dst").
func (*QueueCoordinator) SetMaxSrcAhead ¶
func (c *QueueCoordinator) SetMaxSrcAhead(n int)
SetMaxSrcAhead sets the maximum rounds SRC may run ahead of DST (default 3).
func (*QueueCoordinator) UpdateRound ¶
func (c *QueueCoordinator) UpdateRound(which string, round int)
UpdateRound updates the current round for SRC or DST.
func (*QueueCoordinator) WaitSealBackpressure ¶
func (c *QueueCoordinator) WaitSealBackpressure(round int, database *db.DB)
WaitSealBackpressure blocks until the seal buffer has flushed through (round-2) when round >= 2. Flushes pending seal jobs first so we don't block on the buffer's interval timer. Call before advancing to the next round. Pulling/processing from cache is not blocked; only round advancement waits. Pass database from the queue.
type QueueObserver ¶
type QueueObserver struct {
// contains filtered or unexported fields
}
QueueObserver collects statistics from queues by polling them directly and publishes them to DuckDB periodically. Similar to QueueCoordinator, but focused on observability rather than coordination.
func NewQueueObserver ¶
func NewQueueObserver(database *db.DB, updateInterval time.Duration) *QueueObserver
NewQueueObserver creates a new observer that will publish stats to DuckDB. updateInterval is how often stats are written to DuckDB (default: 200ms).
func (*QueueObserver) RegisterQueue ¶
func (o *QueueObserver) RegisterQueue(queueName string, queue *Queue)
RegisterQueue registers a queue with the observer. The observer will poll this queue directly for statistics.
func (*QueueObserver) Start ¶
func (o *QueueObserver) Start()
Start begins the observer loop that publishes stats to DuckDB. This is called automatically when the first queue is registered, but can be called manually.
func (*QueueObserver) Stop ¶
func (o *QueueObserver) Stop()
Stop stops the observer loop and cleans up resources. This should only be called once. Calling it multiple times is safe but has no effect.
func (*QueueObserver) UnregisterQueue ¶
func (o *QueueObserver) UnregisterQueue(queueName string)
UnregisterQueue removes a queue from the observer.
type QueueState ¶
type QueueState string
QueueState represents the lifecycle state of a queue.
const ( QueueStateRunning QueueState = "running" // Queue is active and processing QueueStatePaused QueueState = "paused" // Queue is paused QueueStateStopped QueueState = "stopped" // Queue is stopped QueueStateWaiting QueueState = "waiting" // Queue is waiting for coordinator to allow advancement (DST only) QueueStateCompleted QueueState = "completed" // Traversal complete (max depth reached) )
type QueueStateSnapshot ¶
type QueueStateSnapshot struct {
State QueueState
Round int
PendingCount int
InProgressCount int
Pulling bool
LastPullWasPartial bool
FirstPullForRound bool
PullLowWM int
Database *db.DB
Mode QueueMode
}
getStateSnapshot returns a snapshot of queue state for use in logic functions
type QueueStats ¶
type QueueStats struct {
Name string
Round int
Pending int
InProgress int
TotalTracked int
Workers int
}
Stats returns current queue statistics.
type RetryDstChild ¶
RetryDstChild holds DST child node meta for retry DST cleanup; populated at pull to avoid per-child DB lookups.
type RetryDstCleanup ¶
type RetryDstCleanup struct {
DstID string
DstDepth int
DstOldStatus string
Children []RetryDstChild
}
RetryDstCleanup holds DST counterpart and its children meta for SRC folder tasks in retry mode; populated at pull.
type RoundInfo ¶
type RoundInfo struct {
Round int // Round number
PullCount int // Number of pull operations (queries) performed this round
ItemsYielded int // Pulled amount: total items actually returned from DB queries this round (like completed count but for pulls)
ExpectedCount int // Expected items from DB (if known)
TasksCompleted int // Successfully completed tasks
TasksFailed int // Failed tasks
StartTime time.Time // When this round started
LastPullTime time.Time // Timestamp of last pull operation
AvgTasksPerSec float64 // Rolling average tasks/sec
LastPartialPull bool // Whether the last pull was partial (< batch size)
}
RoundInfo tracks statistics and metadata for a specific BFS round.
type RoundStats ¶
type RoundStats struct {
Expected int // Expected tasks for this round (folder children inserted)
Completed int // Tasks completed in this round (successful + failed)
Failed int // Tasks failed in this round
}
RoundStats tracks statistics for a specific round.
type SrcNodeMeta ¶
SrcNodeMeta holds Depth and CopyStatus for an SRC node; used by DST tasks at completion to avoid per-child DB lookups.
type TaskBase ¶
type TaskBase struct {
ID string // ULID for internal tracking (database keys)
Type string // Task type: "src-traversal", "dst-traversal", "upload", etc.
Folder types.Folder // Folder to process (if applicable)
File types.File // File to process (if applicable)
Locked bool // Whether this task is currently leased by a worker
Attempts int // Number of execution attempts
Status string // Execution result: "successful", "failed"
ExpectedFolders []types.Folder // Expected folders (dst tasks only)
ExpectedFiles []types.File // Expected files (dst tasks only)
ExpectedSrcIDMap map[string]string // Map of Type+Name -> SRC node ID for matching (dst tasks only)
ExpectedSrcNodeMeta map[string]SrcNodeMeta // SRC node Depth/CopyStatus keyed by SRC ID (dst tasks only, populated at pull)
RetryDstCleanup *RetryDstCleanup // DST counterpart + children meta for SRC folder in retry mode (populated at pull)
DiscoveredChildren []ChildResult // Children discovered during execution
Round int // The round this task belongs to (for buffer coordination)
LeaseTime time.Time // Time when task was leased (for execution time tracking)
// Copy phase specific fields
CopyPass int // Copy pass number (1 for folders, 2 for files)
BytesTransferred int64 // Bytes transferred for file copy tasks
DstParentID string // Destination parent folder ID for creation
}
TaskBase represents the foundational structure for all task types. Workers lease tasks, mark them Locked, and attempt execution. Tasks are identified by ULID (ID) for internal tracking.
func (*TaskBase) Identifier ¶
Identifier returns the unique identifier for this task (absolute path).
func (*TaskBase) LocationPath ¶
LocationPath returns the logical, root-relative path for this task.
type TaskExecutionResult ¶
type TaskExecutionResult string
TaskExecutionResult represents the result of a task execution.
const ( TaskExecutionResultSuccessful TaskExecutionResult = "successful" TaskExecutionResultFailed TaskExecutionResult = "failed" )
type TaskResult ¶
type TaskResult struct {
Task *TaskBase
Success bool
Error error
Data any // Optional result data (e.g., ListResult)
}
TaskResult represents the outcome of a task execution.
type TraversalWorker ¶
type TraversalWorker struct {
// contains filtered or unexported fields
}
TraversalWorker executes traversal tasks by listing children and recording them to DuckDB. Each worker runs independently in its own goroutine, continuously polling the queue for work.
func NewTraversalWorker ¶
func NewTraversalWorker( id string, queue *Queue, database *db.DB, adapter types.FSAdapter, queueName string, shutdownCtx context.Context, ) *TraversalWorker
NewTraversalWorker creates a worker that executes traversal tasks. shutdownCtx is optional - if provided, the worker will check for cancellation and exit on shutdown.
func (*TraversalWorker) Run ¶
func (w *TraversalWorker) Run()
Run is the main worker loop. It continuously polls the queue for tasks. When a task is found, it leases it, executes it, and reports the result. When no work is available or queue is paused, it briefly sleeps before polling again. When queue is exhausted, the worker exits.
type UploadTask ¶
type UploadTask struct {
TaskBase
SrcId string // Source file identifier
DstId string // Destination parent folder identifier
DstCtx types.ServiceContext
}
UploadTask represents a task to upload a file from source to destination.