queue

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2026 License: LGPL-2.1 Imports: 11 Imported by: 0

README

Queue Package

The queue layer drives source/destination traversal and copy using the database (pkg/db). Two queues (src and dst) perform breadth-first traversal in rounds, with the destination gated by a shared QueueCoordinator. The memory-first flow keeps per-level state in NodeCache/LevelCache; the DB is only written at seal (round advance). The coordinator enforces a configurable SRC-ahead gate (default 3 rounds).


High-level flow

  1. Workers lease tasks from an in-memory buffer refilled by pulling: NodeCache is required; pull from the level cache (e.g. LevelCache.ListPending, ListPendingCopy).
  2. Completion updates: the queue updates the level cache (status, children, copy status) and per-level stats; no DB write until seal.
  3. Round advancement: when the current round is complete, the queue calls seal (in advanceToNextRound): snapshot the level from cache, call database.SealLevel(...) to bulk-append nodes and write stats for that depth, then drop the level and (for traversal) promote N+1 to N.
  4. Coordinator gating: DST may start a round only when SRC is far enough ahead; SRC may not run more than MaxSrcAhead rounds ahead of DST (default 3, configurable via MigrationConfig.MaxSrcAhead).

Relationship with pkg/db

  • Database: The queue holds a *db.DB reference. NodeCache is required. Hot-path reads and writes use the cache; the DB is only used at seal (SealLevel: bulk append nodes + stats snapshot) and for resume rehydration (RehydrateLevelFromDB).
  • Pulls: From cache: LevelCache.ListPending, ListPendingCopy, ListChildrenByParentPath. Results become TaskBase and are added to pendingBuff.
  • Seal: database.SealLevel(table, depth, nodes, pending, successful, failed, completed, copyP, copyS, copyF) persists one level to the DB and writes per-depth stats. Copy stats (copyP, copyS, copyF) are used for SRC in copy phase; pass -1 for traversal-only.
  • Completion checks: Completion uses in-memory level stats and cache state. Observer reads from stats tables and writes queue_stats.

Core components

File Responsibility
queue.go Queue struct, Run loop, seal (advanceToNextRound → SealLevel), coordinator gates, leasing
level_cache.go LevelCache, NodeCache, EngineCaches: per-level nodes and stats; SRC/DST separate caches with bridge for DST cross-querying SRC (read-only); ListPending, ListPendingCopy, DropLevel, PromoteLevel
queue_accessors.go Thread-safe getters/setters, keyset cursors, syncLevelStatsFromDB, RehydrateLevelFromDB
queue_batch.go BuildExpectedMapsFromDstWithChildren, batch load expected children for DST tasks
mode_traversal.go PullTraversalTasks (from cache), traversal completion (cache update only)
mode_retry.go PullRetryTasks; DST cleanup on SRC folder complete
mode_copy.go PullCopyTasks (from cache), copy completion (cache update only), CheckCopyCompletion
worker_traversal.go TraversalWorker: lease → list children / compare → ReportTaskResult
worker_copy.go CopyWorker: lease → create folder / copy file → ReportTaskResult
worker/interface.go Worker interface
task.go TaskBase, ChildResult, task types
seeding.go SeedRootTask, SeedRootTasks (insert root nodes, BootstrapRootStats)
coordinator.go QueueCoordinator: CanDstStartRound, CanSrcStartRound, SetMaxSrcAhead (default 3)
observer.go Polls queues, reads stats from DB, writes queue_stats

Task model

type TaskBase struct {
    ID                 string
    Type               string   // e.g. TaskTypeSrcTraversal, TaskTypeDstTraversal, TaskTypeCopyFolder
    Folder             types.Folder
    File               types.File
    Locked             bool
    Attempts           int
    Status             string
    ExpectedFolders    []types.Folder   // DST: expected from SRC
    ExpectedFiles      []types.File
    ExpectedSrcIDMap   map[string]string
    ExpectedSrcNodeMeta map[string]SrcNodeMeta
    RetryDstCleanup   *RetryDstCleanup  // Retry mode: DST counterpart + children for cleanup
    DiscoveredChildren []ChildResult
    Round              int
    LeaseTime          time.Time
    // ...
}
  • Round is the BFS depth; used by coordinator, stats, and pull queries.
  • DiscoveredChildren is filled by workers and used by the queue to build node inserts and status updates.
  • DST tasks get Expected* and ExpectedSrcIDMap from the pull (e.g. ListDstBatchWithSrcChildren or batch load by parent path).

Storage

  • Cache (required): NodeCache is required. Active levels (current and next) live in LevelCache per queue. EngineCaches holds separate Src and Dst NodeCaches: each queue mutates its own; DST cross-queries SRC (read-only via OtherNodeCache()) for expected children (e.g. ListChildrenByParentPath). Copy phase uses Src for the SRC table and Dst for cross-check. Task completion updates the cache and in-memory LevelStats. At seal, the level is bulk-appended to the DB and stats are written; the level is then dropped (and N+1 promoted for traversal).
  • Database (pkg/db): Node tables (src_nodes, dst_nodes), stats tables (src_stats, dst_stats). Seal writes nodes and stats in one transaction. Resume uses RehydrateLevelFromDB to refill the cache from the DB.
  • queue_stats: Observer writes per-queue metrics JSON for external APIs.

Worker workflow

task := w.queue.Lease()           // from pendingBuff (refilled by pull from DB)
err := w.execute(task)           // list children or compare / copy
if err != nil {
    w.queue.ReportTaskResult(task, Failed)
} else {
    w.queue.ReportTaskResult(task, Successful)  // updates level cache and stats
}
  • Lease: Task is taken from pendingBuff and added to inProgress; its key is tracked so it is not pulled again (status updated in cache).
  • ReportTaskResult: The queue updates the level cache and stats. Seal persists at round advance via SealLevel.

Task pulling

  • Pulling flag: Only one pull runs at a time (getPulling / setPulling).
  • Cache-only: NodeCache is required. Pull uses LevelCache.ListPending / ListPendingCopy for the current round; DB is used only when the level is missing (e.g. resume) or for parent/expected lookups.
  • State checks: Pull only when queue is running and pending count is at or below the low-water mark (or when forced).
  • Coordinator: DST checks CanDstStartRound(currentRound); SRC checks CanSrcStartRound(currentRound) (SRC may not run more than MaxSrcAhead rounds ahead of DST).

Traversal: From cache: GetLevel(round).ListPending(cursor, batchSize) (DST gets expected from other cache).

Copy: From cache: GetLevel(round).ListPendingCopy(cursor, limit, nodeType).

Retry: Same as traversal but over multiple depths (up to maxKnownDepth).


Coordinator

  • QueueCoordinator keeps SRC and DST round numbers and enforces gating.
  • CanDstStartRound: DST may start round N only when SRC has completed rounds N and N+1 (SRC round >= N+2) or SRC is complete.
  • CanSrcStartRound: SRC may run round R only when R <= dstRound + MaxSrcAhead (default 3); configurable via SetMaxSrcAhead / MigrationConfig.MaxSrcAhead.
  • Completion is marked when a queue reaches max depth. Round updates are reported via UpdateRound when advancing.

Queue modes

Traversal (QueueModeTraversal)
  • Pull: from cache (ListPending).
  • Completion: update cache (status, children, stats). At seal: SealLevel bulk-appends level and writes stats; level dropped, N+1 promoted.
  • DST uses coordinator gate and gets expected children from SRC (cache or ListDstBatchWithSrcChildren).
Retry (QueueModeRetry)
  • Pull: pending/failed across known depths. On SRC folder success: DST cleanup (mark DST parent pending, AddNodeDeletions for children). Same cache-only path as traversal.
Copy (QueueModeCopy)
  • Pull: from cache (ListPendingCopy).
  • Completion: update SRC copy status and DST node in cache. At seal, both SRC and DST levels are sealed with copy stats for SRC.
  • Completion check: no pending/in-progress for current pass (from cache stats).

Completion checking

  • Run() loop polls; completion is not event-driven from task completion.
  • checkCompletion: Checks in-memory state (in-progress, pending, last pull partial) and, for copy mode, cache stats for pending/in_progress.
  • Round complete: advanceToNextRound runs seal (bulk append + stats snapshot via SealLevel, then drop/promote level).
  • Final completion: When the first pull of a round returns 0 items and in-progress and pending are 0, the queue is marked completed (mode-specific in CheckTraversalCompletion / CheckCopyCompletion).

Observer

  • Polls queues periodically and reads stats from the database (GetStatsCountAtDepth, etc.) to compute pending/failed totals.
  • Writes aggregated metrics to the queue_stats table via database.RunUpdateWriterTx and Writer.WriteQueueStats (keyed e.g. by src-traversal, dst-traversal, copy).

Resumption

  • Open the existing database (same file as before).
  • InspectMigrationStatus (in pkg/migration) reads node counts and stats from the DB to get pending/failed and min pending depth.
  • Set queue round from that state. RehydrateLevelFromDB(depth) is called for each queue so the level cache is refilled from the DB and level stats are synced; workers then pull from cache. The DB is the source of truth for sealed state.

File layout

pkg/queue/
├── queue.go           # Queue struct, Run, advanceToNextRound (seal), Lease, ReportTaskResult, InitializeWithContext
├── level_cache.go     # LevelCache, NodeCache (per-level nodes and stats)
├── level_cache_test.go
├── coordinator.go     # QueueCoordinator, CanDstStartRound, CanSrcStartRound, SetMaxSrcAhead
├── coordinator_test.go
├── queue_accessors.go # Getters/setters, syncLevelStatsFromDB, RehydrateLevelFromDB
├── queue_batch.go     # BuildExpectedMapsFromDstWithChildren, BatchLoadExpectedChildrenByDSTIDs
├── mode_traversal.go  # PullTraversalTasks, traversal completion (cache only)
├── mode_retry.go      # PullRetryTasks
├── mode_copy.go       # PullCopyTasks, copy completion, CheckCopyCompletion
├── worker_traversal.go
├── worker_copy.go
├── worker/
│   └── interface.go
├── task.go
├── seeding.go
├── observer.go
└── README.md

Summary

  • Cache + seal: NodeCache is required. Hot-path reads/writes use per-level caches; the DB is written only at seal (SealLevel) and used for resume rehydration.
  • Pull: From cache (ListPending / ListPendingCopy) or DB keyset queries; results go into pendingBuff and are leased to workers.
  • Seal: At round advance, sealed level is bulk-appended to the DB and stats snapshot written; level is dropped (and N+1 promoted for traversal).
  • Coordinator enforces DST lead (SRC ahead by N+2) and SRC-ahead cap (default 3). Observer reads stats from the DB and writes queue_stats.
  • Modes: Traversal (BFS by round), Retry (re-process pending/failed, DST cleanup), Copy (by copy_status and type, two passes).

Documentation

Index

Constants

View Source
const (
	TaskTypeSrcTraversal = "src-traversal"
	TaskTypeDstTraversal = "dst-traversal"
	TaskTypeUpload       = "upload"
	TaskTypeCopy         = "copy"
	TaskTypeCopyFolder   = "copy-folder" // Copy phase: create folder
	TaskTypeCopyFile     = "copy-file"   // Copy phase: copy file with streaming
)

Task types

Variables

This section is empty.

Functions

func BatchLoadExpectedChildrenByDSTIDs

func BatchLoadExpectedChildrenByDSTIDs(database *db.DB, dstParentIDs []string, dstIDToPath map[string]string) (
	expectedFoldersMap map[string][]types.Folder,
	expectedFilesMap map[string][]types.File,
	srcIDMap map[string]map[string]string,
	srcIDToMeta map[string]SrcNodeMeta,
	err error,
)

BatchLoadExpectedChildrenByDSTIDs loads SRC children for the given DST folder IDs by joining on parent_path = dst.path. Prefer ListDstBatchWithSrcChildren + BuildExpectedMapsFromDstWithChildren for keyset-based DST pull (no IN query). Returns expectedFoldersMap, expectedFilesMap (keyed by DST task ID), srcIDMap (Type+DisplayName -> SRC node ID per DST ID), and srcIDToMeta (SRC ID -> meta).

func BatchLoadRetryDstCleanup

func BatchLoadRetryDstCleanup(database *db.DB, srcFolderIDs []string) (map[string]*RetryDstCleanup, error)

BatchLoadRetryDstCleanup loads DST counterpart and children meta for each SRC folder ID (for retry mode DST cleanup).

func BuildExpectedMapsFromDstWithChildren

func BuildExpectedMapsFromDstWithChildren(dstBatch []db.FetchResult, childrenByDstID map[string][]*db.NodeState) (
	expectedFoldersMap map[string][]types.Folder,
	expectedFilesMap map[string][]types.File,
	srcIDMap map[string]map[string]string,
	srcIDToMeta map[string]SrcNodeMeta,
)

BuildExpectedMapsFromDstWithChildren builds expectedFoldersMap, expectedFilesMap, srcIDMap, and srcIDToMeta from a DST batch and its SRC children (e.g. from ListDstBatchWithSrcChildren). Keyed by DST node ID.

func SeedRootTask

func SeedRootTask(queueType string, rootFolder types.Folder, rootNodeID string, database *db.DB) error

SeedRootTask inserts the initial root folder task into DuckDB to kickstart traversal. For src: sets traversal_status='Pending' and copy_status='Pending' For dst: sets traversal_status='Pending' Ensures stats bucket exists and is updated with root task counts. rootNodeID should be a deterministic ID generated by db.DeterministicNodeID().

func SeedRootTaskWithSrcID

func SeedRootTaskWithSrcID(queueType string, rootFolder types.Folder, rootNodeID string, srcID string, database *db.DB) error

SeedRootTaskWithSrcID inserts the initial root folder task into DuckDB with a pre-set SrcID. This is used for DST root nodes that need to be matched to SRC root nodes. rootNodeID should be a deterministic ID generated by db.DeterministicNodeID().

func SeedRootTasks

func SeedRootTasks(srcRoot types.Folder, dstRoot types.Folder, database *db.DB) error

SeedRootTasks is a convenience function to seed both src and dst root tasks at once. Writes root tasks to DuckDB. Generates deterministic IDs for both root nodes and matches DST root to SRC root.

Types

type ChildResult

type ChildResult struct {
	Folder        types.Folder // Folder info (if folder)
	File          types.File   // File info (if file)
	Status        string       // "pending", "successful", "missing", "not_on_src"
	IsFile        bool         // true if this is a file, false if folder
	SrcID         string       // ULID of corresponding SRC node (for DST nodes only, set during matching)
	SrcCopyStatus string       // Copy status to update on SRC node (if SrcID is set and match found): "pending" or "successful", empty if no update needed
}

ChildResult represents a discovered child node with its traversal status.

type CompletionCheckOptions

type CompletionCheckOptions struct {
	CheckRoundComplete     bool // Check if current round is complete
	CheckFinalCompletion   bool // Check if traversal/copy is complete (mode-specific logic in CheckTraversalCompletion / CheckCopyCompletion)
	AdvanceRoundIfComplete bool // Advance to next round if current round is complete
	WasFirstPull           bool // For copy mode only: first pull of the round (CheckTraversalCompletion derives this itself)
}

CompletionCheckOptions configures what actions to take during completion checks.

type CopyTask

type CopyTask struct {
	TaskBase
	SrcId  string
	DstId  string
	DstCtx types.ServiceContext
}

CopyTask represents a generic copy operation.

type CopyWorker

type CopyWorker struct {
	// contains filtered or unexported fields
}

CopyWorker executes copy tasks by creating folders or streaming files from source to destination. Each worker runs independently in its own goroutine, continuously polling the queue for work.

func NewCopyWorker

func NewCopyWorker(
	id string,
	queue *Queue,
	database *db.DB,
	srcAdapter types.FSAdapter,
	dstAdapter types.FSAdapter,
	shutdownCtx context.Context,
) *CopyWorker

NewCopyWorker creates a worker that executes copy tasks. shutdownCtx is optional - if provided, the worker will check for cancellation and exit on shutdown.

func (*CopyWorker) Run

func (w *CopyWorker) Run()

Run is the main worker loop. It continuously polls the queue for tasks. When a task is found, it leases it, executes it, and reports the result. When no work is available or queue is paused, it briefly sleeps before polling again. When queue is exhausted, the worker exits.

type EngineCaches

type EngineCaches struct {
	Src *NodeCache // SRC traversal (and copy phase: SRC table)
	Dst *NodeCache // DST traversal
}

EngineCaches holds separate node caches for SRC and DST. Each queue mutates its own cache; DST cross-queries SRC (read-only) for expected children via OtherNodeCache(). Copy phase uses Src for the SRC table and Dst for cross-check when needed.

func NewEngineCaches

func NewEngineCaches() *EngineCaches

NewEngineCaches creates a new SRC and DST node cache pair for memory-first flow.

type ExternalQueueMetrics

type ExternalQueueMetrics struct {
	// Monotonic counters (traversal phase)
	FilesDiscoveredTotal   int64 `json:"files_discovered_total"`
	FoldersDiscoveredTotal int64 `json:"folders_discovered_total"`

	// EMA-smoothed rates (2-5 second window) - traversal phase
	DiscoveryRateItemsPerSec float64 `json:"discovery_rate_items_per_sec"`

	// Verification counts (for O(1) stats bucket lookups)
	TotalDiscovered int64 `json:"total_discovered"` // files + folders
	TotalPending    int   `json:"total_pending"`    // pending across all rounds (from DB)
	TotalFailed     int   `json:"total_failed"`     // failed across all rounds

	// Copy phase metrics (monotonic counters)
	Folders int64 `json:"folders"` // Total folders created
	Files   int64 `json:"files"`   // Total files created
	Total   int64 `json:"total"`   // Total items (folders + files)
	Bytes   int64 `json:"bytes"`   // Total bytes transferred

	// Copy phase rates (EMA-smoothed)
	ItemsPerSecond float64 `json:"items_per_second"` // Items/sec (folders + files)
	BytesPerSecond float64 `json:"bytes_per_second"` // Bytes/sec

	// Current state (for API)
	QueueStats
	Round int `json:"round"`
}

ExternalQueueMetrics contains user-facing metrics published to DuckDB for API access.

type InternalQueueMetrics

type InternalQueueMetrics struct {
	// State-based time tracking (additive counters)
	TimeProcessing          time.Duration
	TimeWaitingOnQueue      time.Duration
	TimeWaitingOnFS         time.Duration
	TimeRateLimited         time.Duration
	TimePausedRoundBoundary time.Duration
	TimeIdleNoWork          time.Duration

	// Capacity metrics
	TasksCompletedWhileActive int64
	ActiveProcessingTime      time.Duration

	// Utilization metrics
	WallClockTime       time.Duration
	LastState           QueueState
	LastStateChangeTime time.Time
}

InternalQueueMetrics contains control system metrics stored in memory for autoscaling decisions.

type LevelCache

type LevelCache struct {
	Nodes             map[string]*db.NodeState            // id -> node state
	ByPath            map[string]*db.NodeState            // path -> node (same ptrs as Nodes)
	ByParentPath      map[string]map[string]*db.NodeState // parentPath -> id -> node (same ptrs)
	ByTraversalStatus map[string]map[string]struct{}      // traversal status -> id set (pending, in_progress, successful, failed)
	ByCopyStatus      map[string]map[string]struct{}      // copy status -> id set (pending, in_progress, successful, failed)
	// contains filtered or unexported fields
}

LevelCache holds exactly one BFS level in memory (nodes keyed by id). Only two active levels exist at a time: N and N+1; sealed levels are flushed to DB and dropped. ByPath and ByParentPath are O(1) indexes into the same nodes (no duplicate node data). ByTraversalStatus and ByCopyStatus are status-keyed ID sets for O(pending) ListPending/ListPendingCopy (no full-level scan).

func NewLevelCache

func NewLevelCache() *LevelCache

NewLevelCache creates an empty level cache.

func (*LevelCache) Clear

func (lc *LevelCache) Clear()

Clear removes all nodes. Call after sealing this level.

func (*LevelCache) Count

func (lc *LevelCache) Count() int

Count returns the number of nodes in this level.

func (*LevelCache) Get

func (lc *LevelCache) Get(id string) *db.NodeState

Get returns a copy of the node state for id, or nil if not present.

func (*LevelCache) GetByPath

func (lc *LevelCache) GetByPath(path string) *db.NodeState

GetByPath returns a copy of the node with the given path, or nil if not present (O(1)).

func (*LevelCache) GetRef

func (lc *LevelCache) GetRef(id string) *db.NodeState

GetRef returns the internal pointer for in-place read (caller must not mutate if shared). Prefer Get for safety.

func (*LevelCache) ListChildrenByParentPath

func (lc *LevelCache) ListChildrenByParentPath(parentPath string) []*db.NodeState

ListChildrenByParentPath returns nodes at this level whose parent_path equals parentPath (O(1) index lookup + O(children) copy).

func (*LevelCache) ListPending

func (lc *LevelCache) ListPending(afterID string, limit int) []*db.NodeState

ListPending returns up to limit nodes with traversal_status == pending and id > afterID, in id order. Uses status-keyed ID set for O(pending) instead of O(n) scan.

func (*LevelCache) ListPendingCopy

func (lc *LevelCache) ListPendingCopy(afterID string, limit int, nodeType string) []*db.NodeState

ListPendingCopy returns up to limit nodes with copy_status == pending and id > afterID, in id order. If nodeType != "" filter by type. Uses status-keyed ID set for O(pending) instead of O(n) scan.

func (*LevelCache) Put

func (lc *LevelCache) Put(id string, n *db.NodeState)

Put inserts or overwrites the node for id. Caller may pass the same pointer that will be stored. Indexes ByPath, ByParentPath, and status maps are kept in sync.

func (*LevelCache) PutIfAbsent

func (lc *LevelCache) PutIfAbsent(id string, n *db.NodeState) bool

PutIfAbsent inserts n only if id is not already present. Returns true if inserted.

func (*LevelCache) Snapshot

func (lc *LevelCache) Snapshot() map[string]*db.NodeState

Snapshot returns a copy of all node states in this level (for seal flush). Caller must not mutate the map.

func (*LevelCache) UpdateStatus

func (lc *LevelCache) UpdateStatus(id string, traversalStatus, copyStatus string)

UpdateStatus updates traversal_status and optionally copy_status for the node at id. No-op if not present. Status-keyed ID sets are updated so ListPending/ListPendingCopy stay O(pending).

type LevelStats

type LevelStats struct {
	Pending        int64
	Successful     int64
	Failed         int64
	Completed      int64 // tasks completed this round (success + final failure)
	CopyPending    int64
	CopySuccessful int64
	CopyFailed     int64
}

LevelStats holds per-level counters for completion detection (in-memory; persisted at seal). For traversal: Pending/Successful/Failed are traversal status counts. For copy phase (SRC): use Copy* for copy status.

type NodeCache

type NodeCache struct {
	Levels map[int]*LevelCache // depth -> level cache
	Stats  map[int]*LevelStats // depth -> stats for that level
	// contains filtered or unexported fields
}

NodeCache holds per-level caches. Only levels N and N+1 are active; sealed levels are flushed and removed.

func NewNodeCache

func NewNodeCache() *NodeCache

NewNodeCache creates an empty node cache.

func (*NodeCache) DropLevel

func (nc *NodeCache) DropLevel(depth int)

DropLevel removes the level and its stats from memory. Call after sealing.

func (*NodeCache) EnsureLevel

func (nc *NodeCache) EnsureLevel(depth int) *LevelCache

EnsureLevel returns the LevelCache for depth, creating it if needed.

func (*NodeCache) EnsureLevelStats

func (nc *NodeCache) EnsureLevelStats(depth int) *LevelStats

EnsureLevelStats returns the LevelStats for depth, creating if needed (call when syncing from DB or before updating).

func (*NodeCache) GetLevel

func (nc *NodeCache) GetLevel(depth int) *LevelCache

GetLevel returns the LevelCache for depth, or nil if not present.

func (*NodeCache) GetLevelStats

func (nc *NodeCache) GetLevelStats(depth int) *LevelStats

GetLevelStats returns the LevelStats for depth, or nil.

func (*NodeCache) IncrementCompleted

func (nc *NodeCache) IncrementCompleted(depth int)

IncrementCompleted increments the completed count for the level (task finished: success or final failure).

func (*NodeCache) IncrementPending

func (nc *NodeCache) IncrementPending(depth int)

IncrementPending adds one to the pending count for the level (e.g. when adding a new node to the level).

func (*NodeCache) LevelDepths

func (nc *NodeCache) LevelDepths() []int

LevelDepths returns sorted level numbers present in the cache (for iteration over levels).

func (*NodeCache) PromoteLevel

func (nc *NodeCache) PromoteLevel(fromDepth, toDepth int)

PromoteLevel makes the cache at fromDepth (e.g. N+1) become the cache at toDepth (e.g. N), and creates a fresh empty cache at fromDepth. Call after sealing N: flush and drop N, then PromoteLevel(N+1, N) so the former N+1 becomes the new current level.

func (*NodeCache) RecordCopyTransition

func (nc *NodeCache) RecordCopyTransition(depth int, oldStatus, newStatus string)

RecordCopyTransition updates level copy stats for SRC (pending->in_progress on pull; in_progress->successful/failed on complete/fail).

func (*NodeCache) RecordTraversalTransition

func (nc *NodeCache) RecordTraversalTransition(depth int, oldStatus, newStatus string)

RecordTraversalTransition updates level stats for one status change (e.g. pending -> successful).

func (*NodeCache) SetLevelStats

func (nc *NodeCache) SetLevelStats(depth int, pending, successful, failed, completed int64)

SetLevelStats overwrites the stats for depth (e.g. when bootstrapping from DB).

func (*NodeCache) TotalNodeCount

func (nc *NodeCache) TotalNodeCount() int

TotalNodeCount returns the total number of nodes across all levels (for memory pressure checks).

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue maintains round-based task queues for BFS traversal coordination. It handles task leasing, retry logic, and cross-queue task propagation. All operational state lives in DuckDB, flushed via per-queue buffers.

func NewQueue

func NewQueue(name string, maxRetries int, workerCount int, coordinator *QueueCoordinator) *Queue

NewQueue creates a new Queue instance.

func (*Queue) Add

func (q *Queue) Add(task *TaskBase) bool

Add enqueues a task into the pending buffer. Returns false if task is nil, has empty ID, or is already in progress/pending.

func (*Queue) AddWorker

func (q *Queue) AddWorker(worker Worker)

AddWorker registers a worker with this queue for reference. Workers manage their own lifecycle - this is just for tracking/debugging.

func (*Queue) AdvanceCopyRound

func (q *Queue) AdvanceCopyRound()

AdvanceCopyRound handles copy-specific round advancement logic. Checks for pending tasks matching the current pass and advances to the next applicable round.

func (*Queue) AdvanceTraversalRound

func (q *Queue) AdvanceTraversalRound()

AdvanceTraversalRound handles traversal/retry-specific round advancement logic. For traversal/retry modes, simply increments the round by 1.

func (*Queue) CheckCopyCompletion

func (q *Queue) CheckCopyCompletion(currentRound int, wasFirstPull bool) bool

CheckCopyCompletion checks if the copy phase should switch passes or complete. Returns true if the queue should mark as complete, false otherwise. This is called when advanceToNextRound can't find a next round for the current pass.

func (*Queue) CheckTraversalCompletion

func (q *Queue) CheckTraversalCompletion(currentRound int) bool

CheckTraversalCompletion checks if traversal/retry phase should complete. All traversal-specific completion logic lives here: cache loaded, attempted pull, first pull returned 0, no pending in cache. Returns true if the queue should mark as complete, false otherwise.

func (*Queue) Clear

func (q *Queue) Clear()

Clear removes all tasks from DuckDB and resets in-progress tracking. Note: This is a destructive operation - use with caution.

func (*Queue) Close

func (q *Queue) Close()

Close stops the queue and cleans up resources: stats publishing loop, output buffer, and sets state to Stopped if not already Completed or Stopped.

func (*Queue) CompleteCopyTask

func (q *Queue) CompleteCopyTask(task *TaskBase, executionDelta time.Duration)

CompleteCopyTask handles successful completion of copy tasks. Updates copy status, creates DST node entry, and updates join-lookup mapping.

func (*Queue) CompleteTraversalTask

func (q *Queue) CompleteTraversalTask(task *TaskBase, executionDelta time.Duration)

CompleteTraversalTask handles successful completion of traversal/retry tasks. This includes child discovery, status updates, and buffer operations.

func (*Queue) EnsureRoundExpectedFromStats

func (q *Queue) EnsureRoundExpectedFromStats()

EnsureRoundExpectedFromStats sets Expected for the current round from the stats bucket (O(1) lookup). Call after SetRound (e.g. on init or resume) so Expected reflects actual pending count and survives restarts.

func (*Queue) FailCopyTask

func (q *Queue) FailCopyTask(task *TaskBase, executionDelta time.Duration)

FailCopyTask handles failure of copy tasks. Updates copy status to failed if max retries exceeded, or back to pending if retrying.

func (*Queue) FailTraversalTask

func (q *Queue) FailTraversalTask(task *TaskBase, executionDelta time.Duration)

FailTraversalTask handles failure of traversal/retry tasks. Retries up to maxRetries, then marks as failed.

func (*Queue) GetAverageExecutionTime

func (q *Queue) GetAverageExecutionTime() time.Duration

GetAverageExecutionTime returns the current average task execution time.

func (*Queue) GetBytesTransferredTotal

func (q *Queue) GetBytesTransferredTotal() int64

GetBytesTransferredTotal returns the total bytes transferred during copy phase.

func (*Queue) GetCopyPass

func (q *Queue) GetCopyPass() int

GetCopyPass returns the current copy pass.

func (*Queue) GetExecutionTimeBufferSize

func (q *Queue) GetExecutionTimeBufferSize() int

GetExecutionTimeBufferSize returns the current size of the execution time buffer.

func (*Queue) GetExecutionTimeDeltas

func (q *Queue) GetExecutionTimeDeltas() []time.Duration

GetExecutionTimeDeltas returns a copy of the execution time deltas buffer.

func (*Queue) GetFilesCreatedTotal

func (q *Queue) GetFilesCreatedTotal() int64

GetFilesCreatedTotal returns the total files created during copy phase.

func (*Queue) GetFilesDiscoveredTotal

func (q *Queue) GetFilesDiscoveredTotal() int64

func (*Queue) GetFoldersCreatedTotal

func (q *Queue) GetFoldersCreatedTotal() int64

GetFoldersCreatedTotal returns the total folders created during copy phase.

func (*Queue) GetFoldersDiscoveredTotal

func (q *Queue) GetFoldersDiscoveredTotal() int64

func (*Queue) GetMode

func (q *Queue) GetMode() QueueMode

GetMode returns the current queue mode.

func (*Queue) GetPendingCount

func (q *Queue) GetPendingCount() int

GetPendingCount returns the number of tasks in the pending buffer.

func (*Queue) GetRound

func (q *Queue) GetRound() int

GetRound returns the current BFS round.

func (*Queue) GetRoundStats

func (q *Queue) GetRoundStats(round int) *RoundStats

GetRoundStats returns the statistics for a specific round. Returns nil if the round has no stats yet.

func (*Queue) GetTotalCompleted

func (q *Queue) GetTotalCompleted() int

GetTotalCompleted returns the total number of completed tasks across all rounds.

func (*Queue) GetTotalDiscovered

func (q *Queue) GetTotalDiscovered() int64

GetTotalDiscovered returns the total number of items discovered (files + folders).

func (*Queue) GetTotalFailed

func (q *Queue) GetTotalFailed() int

GetTotalFailed returns the total number of failed tasks across all rounds.

func (*Queue) InProgressCount

func (q *Queue) InProgressCount() int

InProgressCount returns the number of tasks currently being executed.

func (*Queue) InitializeCopyWithContext

func (q *Queue) InitializeCopyWithContext(database *db.DB, srcAdapter, dstAdapter types.FSAdapter, shutdownCtx context.Context)

InitializeCopyWithContext sets up a copy queue with both source and destination adapters. This is specifically for copy mode which requires both adapters.

func (*Queue) InitializeWithContext

func (q *Queue) InitializeWithContext(database *db.DB, adapter types.FSAdapter, shutdownCtx context.Context)

InitializeWithContext sets up the queue with DuckDB, context, and filesystem adapter references. Creates and starts workers immediately - they'll poll for tasks autonomously. shutdownCtx is optional - if provided, workers will check for cancellation and exit on shutdown. For copy mode, InitializeCopyWithContext should be used instead to provide both adapters.

func (*Queue) IsExhausted

func (q *Queue) IsExhausted() bool

IsExhausted returns true if the queue has finished all traversal or has been stopped.

func (*Queue) IsPaused

func (q *Queue) IsPaused() bool

IsPaused returns true if the queue is paused.

func (*Queue) Lease

func (q *Queue) Lease() *TaskBase

Lease attempts to lease a task for execution atomically. Returns nil if no tasks are available, queue is paused, or completed.

func (*Queue) Name

func (q *Queue) Name() string

Name returns the queue's name.

func (*Queue) NodeCache

func (q *Queue) NodeCache() *NodeCache

NodeCache returns this queue's node cache (may be nil if not set).

func (*Queue) OtherNodeCache

func (q *Queue) OtherNodeCache() *NodeCache

OtherNodeCache returns the other queue's node cache for read-only cross-check (may be nil).

func (*Queue) Pause

func (q *Queue) Pause()

Pause pauses the queue (workers will not lease new tasks).

func (*Queue) PullCopyTasks

func (q *Queue) PullCopyTasks(force bool)

PullCopyTasks pulls copy tasks from DuckDB for the current round. Pulls from SRC copy status buckets, filters by pass (folders vs files), and skips round 0. Uses getter/setter methods - no direct mutex access.

func (*Queue) PullRetryTasks

func (q *Queue) PullRetryTasks(force bool)

PullRetryTasks pulls retry tasks from failed/pending status buckets. Checks maxKnownDepth and scans all known levels up to maxKnownDepth, then uses normal traversal logic for deeper levels. Uses getter/setter methods - no direct mutex access.

func (*Queue) PullTasksIfNeeded

func (q *Queue) PullTasksIfNeeded(force bool)

func (*Queue) PullTraversalTasks

func (q *Queue) PullTraversalTasks(force bool)

PullTraversalTasks pulls traversal tasks from DuckDB for the current round. Uses getter/setter methods - no direct mutex access.

func (*Queue) RehydrateLevelFromDB

func (q *Queue) RehydrateLevelFromDB(depth int)

RehydrateLevelFromDB loads all nodes at the given depth from the DB into this queue's node cache and syncs level stats. Used on resume so the memory-first path can continue from sealed state without re-pulling from DB on first pull.

func (*Queue) ReportTaskResult

func (q *Queue) ReportTaskResult(task *TaskBase, result TaskExecutionResult)

ReportTaskResult reports the result of a task execution and handles post-processing. This is the event-driven entry point that replaces separate Complete()/Fail() calls. After processing the result, it checks if we need to pull more tasks or advance rounds.

func (*Queue) Resume

func (q *Queue) Resume()

Resume resumes the queue after a pause.

func (*Queue) Run

func (q *Queue) Run()

Run is the main queue coordination loop. It has an outer loop for rounds and an inner loop for each round. The outer loop checks coordinator gates before starting each round (DST only). The inner loop processes tasks until the round is complete.

func (*Queue) SetCopyPass

func (q *Queue) SetCopyPass(pass int)

SetCopyPass sets the current copy pass.

func (*Queue) SetMaxKnownDepth

func (q *Queue) SetMaxKnownDepth(depth int)

SetMaxKnownDepth sets the maximum depth for traversal/copy. Set to -1 to auto-detect.

func (*Queue) SetMode

func (q *Queue) SetMode(mode QueueMode)

SetMode sets the queue mode (traversal, retry, or copy).

func (*Queue) SetNodeCache

func (q *Queue) SetNodeCache(c *NodeCache)

SetNodeCache sets this queue's node cache (read-write). Call before Run() when using memory-first flow.

func (*Queue) SetObserver

func (q *Queue) SetObserver(observer *QueueObserver)

SetObserver registers this queue with an observer for DuckDB stats publishing. The observer will poll this queue directly for statistics.

func (*Queue) SetOtherNodeCache

func (q *Queue) SetOtherNodeCache(c *NodeCache)

SetOtherNodeCache sets the other queue's node cache (read-only for cross-check). Call before Run() when using memory-first flow.

func (*Queue) SetRound

func (q *Queue) SetRound(round int)

SetRound sets the queue's current round. Used for resume operations.

func (*Queue) SetShutdownContext

func (q *Queue) SetShutdownContext(ctx context.Context)

SetShutdownContext sets the shutdown context for the queue.

func (*Queue) SetState

func (q *Queue) SetState(state QueueState)

SetState sets the queue lifecycle state.

func (*Queue) SetStatsChannel

func (q *Queue) SetStatsChannel(ch chan QueueStats)

Shutdown gracefully shuts down the queue. No buffer to stop - all writes are direct/synchronous now. SetStatsChannel sets the channel for publishing queue statistics for UDP logging. The queue will periodically publish stats to this channel.

func (*Queue) SetTraversalCacheLoaded

func (q *Queue) SetTraversalCacheLoaded(loaded bool)

SetTraversalCacheLoaded sets whether the level cache has been loaded from DB (e.g. after RehydrateLevelFromDB). Until true, CheckTraversalCompletion returns false so the queue does not complete before round 0 is populated. Call after loading cache at startup; retry/sweeps should set true when their cache is ready.

func (*Queue) SetWorkers

func (q *Queue) SetWorkers(workers []Worker)

SetWorkers sets the workers associated with this queue.

func (*Queue) Shutdown

func (q *Queue) Shutdown()

Shutdown stops the stats publishing loop and cleans up resources.

func (*Queue) State

func (q *Queue) State() QueueState

State returns the current queue lifecycle state.

func (*Queue) Stats

func (q *Queue) Stats() QueueStats

Stats returns a snapshot of the queue's current state. Uses only in-memory counters - no database queries.

func (*Queue) TotalTracked

func (q *Queue) TotalTracked() int

TotalTracked returns the total number of tasks across all rounds (pending + in-progress).

type QueueCoordinator

type QueueCoordinator struct {
	// contains filtered or unexported fields
}

QueueCoordinator manages round advancement gates for dual-BFS traversal. It enforces: DST cannot advance to round N until SRC has completed rounds N and N+1; SRC may not run ahead of DST by more than MaxSrcAhead rounds (default 3).

func NewQueueCoordinator

func NewQueueCoordinator() *QueueCoordinator

NewQueueCoordinator creates a new coordinator.

func (*QueueCoordinator) CanDstStartRound

func (c *QueueCoordinator) CanDstStartRound(targetRound int) bool

CanDstStartRound returns true if DST can start processing the specified round. DST can start round N if:

  • SRC has completed traversal entirely (DST can proceed freely), OR
  • SRC has completed rounds N and N+1 (SRC is at round N+2 or higher)

Note: Since DST can now freely advance to a round and then pause, the check is N+2 (if DST wants to start round 4, SRC needs to have completed rounds 4 and 5, so SRC >= 6). Once SRC is completed, DST can proceed at full speed with no restrictions.

func (*QueueCoordinator) CanSrcStartRound

func (c *QueueCoordinator) CanSrcStartRound(srcRound int) bool

CanSrcStartRound returns true if SRC can run the given round: srcRound <= dstRound + maxSrcAhead (or SRC is done).

func (*QueueCoordinator) GetRound

func (c *QueueCoordinator) GetRound(which string) int

GetRound returns the current round for SRC or DST.

func (*QueueCoordinator) IsCompleted

func (c *QueueCoordinator) IsCompleted(queueType string) bool

IsCompleted returns true if the specified queue ("src", "dst", or "both") has completed traversal.

func (*QueueCoordinator) MarkCompleted

func (c *QueueCoordinator) MarkCompleted(which string)

MarkCompleted marks SRC or DST as completed based on the argument ("src" or "dst").

func (*QueueCoordinator) SetMaxSrcAhead

func (c *QueueCoordinator) SetMaxSrcAhead(n int)

SetMaxSrcAhead sets the maximum rounds SRC may run ahead of DST (default 3).

func (*QueueCoordinator) UpdateRound

func (c *QueueCoordinator) UpdateRound(which string, round int)

UpdateRound updates the current round for SRC or DST.

func (*QueueCoordinator) WaitSealBackpressure

func (c *QueueCoordinator) WaitSealBackpressure(round int, database *db.DB)

WaitSealBackpressure blocks until the seal buffer has flushed through (round-2) when round >= 2. Flushes pending seal jobs first so we don't block on the buffer's interval timer. Call before advancing to the next round. Pulling/processing from cache is not blocked; only round advancement waits. Pass database from the queue.

type QueueMode

type QueueMode string

QueueMode represents the operation mode of a queue.

const (
	QueueModeTraversal QueueMode = "traversal" // Normal BFS traversal
	QueueModeRetry     QueueMode = "retry"     // Retry failed tasks sweep
	QueueModeCopy      QueueMode = "copy"      // Copy phase (folders then files)
)

type QueueObserver

type QueueObserver struct {
	// contains filtered or unexported fields
}

QueueObserver collects statistics from queues by polling them directly and publishes them to DuckDB periodically. Similar to QueueCoordinator, but focused on observability rather than coordination.

func NewQueueObserver

func NewQueueObserver(database *db.DB, updateInterval time.Duration) *QueueObserver

NewQueueObserver creates a new observer that will publish stats to DuckDB. updateInterval is how often stats are written to DuckDB (default: 200ms).

func (*QueueObserver) RegisterQueue

func (o *QueueObserver) RegisterQueue(queueName string, queue *Queue)

RegisterQueue registers a queue with the observer. The observer will poll this queue directly for statistics.

func (*QueueObserver) Start

func (o *QueueObserver) Start()

Start begins the observer loop that publishes stats to DuckDB. This is called automatically when the first queue is registered, but can be called manually.

func (*QueueObserver) Stop

func (o *QueueObserver) Stop()

Stop stops the observer loop and cleans up resources. This should only be called once. Calling it multiple times is safe but has no effect.

func (*QueueObserver) UnregisterQueue

func (o *QueueObserver) UnregisterQueue(queueName string)

UnregisterQueue removes a queue from the observer.

type QueueState

type QueueState string

QueueState represents the lifecycle state of a queue.

const (
	QueueStateRunning   QueueState = "running"   // Queue is active and processing
	QueueStatePaused    QueueState = "paused"    // Queue is paused
	QueueStateStopped   QueueState = "stopped"   // Queue is stopped
	QueueStateWaiting   QueueState = "waiting"   // Queue is waiting for coordinator to allow advancement (DST only)
	QueueStateCompleted QueueState = "completed" // Traversal complete (max depth reached)
)

type QueueStateSnapshot

type QueueStateSnapshot struct {
	State              QueueState
	Round              int
	PendingCount       int
	InProgressCount    int
	Pulling            bool
	LastPullWasPartial bool
	FirstPullForRound  bool
	PullLowWM          int
	Database           *db.DB
	Mode               QueueMode
}

getStateSnapshot returns a snapshot of queue state for use in logic functions

type QueueStats

type QueueStats struct {
	Name         string
	Round        int
	Pending      int
	InProgress   int
	TotalTracked int
	Workers      int
}

Stats returns current queue statistics.

type RetryDstChild

type RetryDstChild struct {
	ID              string
	Depth           int
	TraversalStatus string
}

RetryDstChild holds DST child node meta for retry DST cleanup; populated at pull to avoid per-child DB lookups.

type RetryDstCleanup

type RetryDstCleanup struct {
	DstID        string
	DstDepth     int
	DstOldStatus string
	Children     []RetryDstChild
}

RetryDstCleanup holds DST counterpart and its children meta for SRC folder tasks in retry mode; populated at pull.

type RoundInfo

type RoundInfo struct {
	Round           int       // Round number
	PullCount       int       // Number of pull operations (queries) performed this round
	ItemsYielded    int       // Pulled amount: total items actually returned from DB queries this round (like completed count but for pulls)
	ExpectedCount   int       // Expected items from DB (if known)
	TasksCompleted  int       // Successfully completed tasks
	TasksFailed     int       // Failed tasks
	StartTime       time.Time // When this round started
	LastPullTime    time.Time // Timestamp of last pull operation
	AvgTasksPerSec  float64   // Rolling average tasks/sec
	LastPartialPull bool      // Whether the last pull was partial (< batch size)
}

RoundInfo tracks statistics and metadata for a specific BFS round.

type RoundStats

type RoundStats struct {
	Expected  int // Expected tasks for this round (folder children inserted)
	Completed int // Tasks completed in this round (successful + failed)
	Failed    int // Tasks failed in this round
}

RoundStats tracks statistics for a specific round.

type SrcNodeMeta

type SrcNodeMeta struct {
	Depth      int
	CopyStatus string
}

SrcNodeMeta holds Depth and CopyStatus for an SRC node; used by DST tasks at completion to avoid per-child DB lookups.

type TaskBase

type TaskBase struct {
	ID                  string                 // ULID for internal tracking (database keys)
	Type                string                 // Task type: "src-traversal", "dst-traversal", "upload", etc.
	Folder              types.Folder           // Folder to process (if applicable)
	File                types.File             // File to process (if applicable)
	Locked              bool                   // Whether this task is currently leased by a worker
	Attempts            int                    // Number of execution attempts
	Status              string                 // Execution result: "successful", "failed"
	ExpectedFolders     []types.Folder         // Expected folders (dst tasks only)
	ExpectedFiles       []types.File           // Expected files (dst tasks only)
	ExpectedSrcIDMap    map[string]string      // Map of Type+Name -> SRC node ID for matching (dst tasks only)
	ExpectedSrcNodeMeta map[string]SrcNodeMeta // SRC node Depth/CopyStatus keyed by SRC ID (dst tasks only, populated at pull)
	RetryDstCleanup     *RetryDstCleanup       // DST counterpart + children meta for SRC folder in retry mode (populated at pull)
	DiscoveredChildren  []ChildResult          // Children discovered during execution
	Round               int                    // The round this task belongs to (for buffer coordination)
	LeaseTime           time.Time              // Time when task was leased (for execution time tracking)
	// Copy phase specific fields
	CopyPass         int    // Copy pass number (1 for folders, 2 for files)
	BytesTransferred int64  // Bytes transferred for file copy tasks
	DstParentID      string // Destination parent folder ID for creation
}

TaskBase represents the foundational structure for all task types. Workers lease tasks, mark them Locked, and attempt execution. Tasks are identified by ULID (ID) for internal tracking.

func (*TaskBase) Identifier

func (t *TaskBase) Identifier() string

Identifier returns the unique identifier for this task (absolute path).

func (*TaskBase) IsFile

func (t *TaskBase) IsFile() bool

IsFile returns whether this task represents a file operation.

func (*TaskBase) IsFolder

func (t *TaskBase) IsFolder() bool

IsFolder returns whether this task represents a folder traversal.

func (*TaskBase) LocationPath

func (t *TaskBase) LocationPath() string

LocationPath returns the logical, root-relative path for this task.

type TaskExecutionResult

type TaskExecutionResult string

TaskExecutionResult represents the result of a task execution.

const (
	TaskExecutionResultSuccessful TaskExecutionResult = "successful"
	TaskExecutionResultFailed     TaskExecutionResult = "failed"
)

type TaskResult

type TaskResult struct {
	Task    *TaskBase
	Success bool
	Error   error
	Data    any // Optional result data (e.g., ListResult)
}

TaskResult represents the outcome of a task execution.

type TraversalWorker

type TraversalWorker struct {
	// contains filtered or unexported fields
}

TraversalWorker executes traversal tasks by listing children and recording them to DuckDB. Each worker runs independently in its own goroutine, continuously polling the queue for work.

func NewTraversalWorker

func NewTraversalWorker(
	id string,
	queue *Queue,
	database *db.DB,
	adapter types.FSAdapter,
	queueName string,
	shutdownCtx context.Context,
) *TraversalWorker

NewTraversalWorker creates a worker that executes traversal tasks. shutdownCtx is optional - if provided, the worker will check for cancellation and exit on shutdown.

func (*TraversalWorker) Run

func (w *TraversalWorker) Run()

Run is the main worker loop. It continuously polls the queue for tasks. When a task is found, it leases it, executes it, and reports the result. When no work is available or queue is paused, it briefly sleeps before polling again. When queue is exhausted, the worker exits.

type UploadTask

type UploadTask struct {
	TaskBase
	SrcId  string // Source file identifier
	DstId  string // Destination parent folder identifier
	DstCtx types.ServiceContext
}

UploadTask represents a task to upload a file from source to destination.

type Worker

type Worker interface {
	Run() // Main execution loop - polls queue and processes tasks
}

Worker represents a concurrent task executor. Each worker independently polls its queue for work, leases tasks, executes them, and reports results back to the queue and database.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL