Documentation
¶
Index ¶
- Constants
- func BatchGetChildrenIDsByParentIDs(d *DB, table string, parentIDs []string) (map[string][]string, error)
- func BatchGetDstIDsFromSrcIDs(d *DB, srcIDs []string) (map[string]string, error)
- func BatchGetNodeMeta(d *DB, table string, ids []string) (map[string]NodeMeta, error)
- func BatchGetNodesByID(d *DB, table string, ids []string) (map[string]*NodeState, error)
- func BatchInsertNodes(d *DB, ops []InsertOperation) error
- func BootstrapRootStats(d *DB) error
- func CountExcluded(d *DB, table string) (int, error)
- func CountExcludedInSubtree(d *DB, table, rootPath string) (int, error)
- func CountNodes(d *DB, table string) (int, error)
- func DeterministicNodeID(queueType, nodeType, path string) string
- func EnsureNodeTableIndexes(db *DB, table string) error
- func GenerateLogID() int64
- func GetAllLevels(d *DB, table string) ([]int, error)
- func GetChildrenIDsByParentID(d *DB, table, parentID string, limit int) ([]string, error)
- func GetDstIDFromSrcID(d *DB, srcParentID string) (string, error)
- func GetDstIDToSrcPath(d *DB, dstIDs []string) (map[string]string, error)
- func GetSrcChildrenGroupedByParentPath(d *DB, parentPaths []string) (map[string][]*NodeState, error)
- func InsertRootNode(d *DB, table string, state *NodeState) error
- func NodeStateAppendRowArgs(n *NodeState) []interface{}
- func StatsKeyCopyStatus(status string) string
- func StatsKeyTraversalStatus(status string) string
- type BatchInsertOperation
- type DB
- func (db *DB) AddNodeDeletion(table, nodeID string) error
- func (db *DB) AddNodeDeletions(deletions []NodeDeletion) error
- func (db *DB) Checkpoint() error
- func (db *DB) Close() error
- func (db *DB) FlushSealBuffer() error
- func (db *DB) GetAllQueueStats() (map[string][]byte, error)
- func (db *DB) GetCopyCountAtDepth(depth int, nodeType string, copyStatus string) (int64, error)
- func (db *DB) GetDB() (*sql.DB, error)
- func (db *DB) GetDBForPulls(queueType string) (*sql.DB, error)
- func (db *DB) GetMaxDepth(table string) (int, error)
- func (db *DB) GetPendingTraversalCountAtDepthFromLive(table string, depth int) (int64, error)
- func (db *DB) GetQueueStats(queueKey string) ([]byte, error)
- func (db *DB) GetStatsBreakdown(table string) ([]StatsRow, error)
- func (db *DB) GetStatsCount(table, key string) (int64, error)
- func (db *DB) GetStatsCountAtDepth(table string, depth int, key string) (int64, error)
- func (db *DB) Path() string
- func (db *DB) RunAppenderWriterTx(queueType string, fn func(w *Writer) error) error
- func (db *DB) RunUpdateWriterTx(fn func(w *Writer) error) error
- func (db *DB) RunWithConn(ctx context.Context, fn func(conn *sql.Conn) error) error
- func (db *DB) SealLevel(table string, depth int, nodes []*NodeState, ...) error
- func (db *DB) SealLevelDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, ...) error
- func (db *DB) WaitUntilSealFlushedThrough(depth int)
- type FetchResult
- func ListDstBatchWithSrcChildren(d *DB, depth int, afterID string, limit int, traversalStatus string) ([]FetchResult, map[string][]*NodeState, error)
- func ListNodesByDepthKeyset(d *DB, table string, depth int, afterID, statusFilter string, limit int) ([]FetchResult, error)
- func ListNodesCopyKeyset(d *DB, depth int, nodeType, afterID string, limit int) ([]FetchResult, error)
- type InsertOperation
- type LogBuffer
- type LogEntry
- type NodeDeletion
- type NodeMeta
- type NodeState
- func GetChildrenByParentID(d *DB, table, parentID string, limit int) ([]*NodeState, error)
- func GetChildrenByParentPath(d *DB, table, parentPath string, limit int) ([]*NodeState, error)
- func GetNodeByID(d *DB, table, id string) (*NodeState, error)
- func GetNodeByPath(d *DB, table, path string) (*NodeState, error)
- func GetRootNode(d *DB, table string) (id string, state *NodeState, ok bool)
- type Options
- type SealBuffer
- type SealBufferOptions
- type SealJob
- type StatsRow
- type StatusUpdateOperation
- type SubtreeStats
- type WriteOperation
- type Writer
- func (w *Writer) AppenderInsert(table string, nodes []*NodeState) error
- func (w *Writer) DeleteNode(table, nodeID string) error
- func (w *Writer) DeleteSubtree(table, rootPath string) error
- func (w *Writer) InsertLog(id int64, level, message, component, entity, entityID, queue string) error
- func (w *Writer) RecomputeStatsForDepth(table string, depth int) error
- func (w *Writer) RecordTaskError(queueType, phase, nodeID, message string, attempts int, path string) error
- func (w *Writer) SealDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, ...) error
- func (w *Writer) SetNodeCopyStatus(table, nodeID, status string) error
- func (w *Writer) SetNodeExcluded(table, nodeID string, excluded bool) error
- func (w *Writer) SetNodeTraversalStatus(table, nodeID, status string) error
- func (w *Writer) SetStatsCountForDepth(table string, depth int, key string, count int64) error
- func (w *Writer) WriteLevelStatsSnapshot(table string, depth int, pending, successful, failed, completed int64, ...) error
- func (w *Writer) WriteQueueStats(queueKey, metricsJSON string) error
Constants ¶
const ( StatusPending = "pending" StatusSuccessful = "successful" StatusFailed = "failed" StatusNotOnSrc = "not_on_src" // DST only StatusExcluded = "excluded" )
Traversal status values (live table and staging). Stats keys use: pending, successful, failed, not_on_src (DST only).
const ( CopyStatusPending = "pending" CopyStatusInProgress = "in_progress" // transient; not stored in stats CopyStatusSuccessful = "successful" CopyStatusFailed = "failed" CopyStatusSkipped = "skipped" )
Copy status values (src_nodes only). Stats keys use: pending, successful, failed (no in_progress in stats).
const ( NodeTypeFolder = "folder" NodeTypeFile = "file" )
Node type values.
const StatsKeyCompleted = "completed"
StatsKeyCompleted is the stats key for completed count at a depth (written at seal).
const StatsKeyExpected = "expected"
StatsKeyExpected is the stats key for expected count at a depth (set at round start).
const (
// TableMigrations is the migrations lifecycle table name (used by schema and migration store).
TableMigrations = "migrations"
)
Variables ¶
This section is empty.
Functions ¶
func BatchGetChildrenIDsByParentIDs ¶
func BatchGetChildrenIDsByParentIDs(d *DB, table string, parentIDs []string) (map[string][]string, error)
BatchGetChildrenIDsByParentIDs returns map[parentID][]childID for the given table and parent ids.
func BatchGetDstIDsFromSrcIDs ¶
BatchGetDstIDsFromSrcIDs returns map[srcID]dstID by resolving SRC id->path then DST path->id in two queries (no per-item reads).
func BatchGetNodeMeta ¶
BatchGetNodeMeta returns meta (id, depth, type, traversal_status, copy_status) for the given ids.
func BatchGetNodesByID ¶
BatchGetNodesByID returns nodes by id for the given table in one query. Missing ids are omitted from the map.
func BatchInsertNodes ¶
func BatchInsertNodes(d *DB, ops []InsertOperation) error
BatchInsertNodes inserts nodes via the writer. Uses RunUpdateWriterTx.
func BootstrapRootStats ¶
BootstrapRootStats writes (depth=0, key=traversal/pending, count=1) for SRC and DST so the queue sees pending work at round 0 after roots are seeded. Call once after inserting root nodes.
func CountExcluded ¶
CountExcluded returns the number of nodes with excluded = true in the given table.
func CountExcludedInSubtree ¶
CountExcludedInSubtree returns the number of excluded nodes in the subtree at rootPath. Single SQL query.
func CountNodes ¶
CountNodes returns the total number of nodes in the given table (src_nodes or dst_nodes). Live table count, not from stats. Uses pull conn so we see appender-written data.
func DeterministicNodeID ¶
DeterministicNodeID returns a stable id from (queueType, nodeType, path) for race-safe deduplication.
func EnsureNodeTableIndexes ¶
EnsureNodeTableIndexes creates stable lookup indexes on the given node table (e.g. "src_nodes", "dst_nodes") for path and parent_path.
Mutable status columns are intentionally left unindexed to avoid high write amplification during traversal/copy status updates.
Idempotent for create/drop operations.
func GenerateLogID ¶
func GenerateLogID() int64
GenerateLogID returns a unique log id (monotonic in practice).
func GetAllLevels ¶
GetAllLevels returns distinct depth values for the table, sorted.
func GetChildrenIDsByParentID ¶
GetChildrenIDsByParentID returns child ids for the given parent_id (up to limit).
func GetDstIDFromSrcID ¶
GetDstIDFromSrcID returns the DST node id that has the same path as the given SRC node id (join by path).
func GetDstIDToSrcPath ¶
GetDstIDToSrcPath returns for each DST id the path (which is the join key = "src path").
func GetSrcChildrenGroupedByParentPath ¶
func GetSrcChildrenGroupedByParentPath(d *DB, parentPaths []string) (map[string][]*NodeState, error)
GetSrcChildrenGroupedByParentPath returns SRC nodes grouped by parent_path. Used to batch-load expected children for DST folder tasks (join by parent_path = dst.path). Deprecated: prefer ListDstBatchWithSrcChildren for keyset-based DST pull + SRC children in one query.
func InsertRootNode ¶
InsertRootNode inserts the root node (path "/", depth 0) into the given table. Uses appender conn so pulls (same conn via GetDBForPulls) see the root immediately.
func NodeStateAppendRowArgs ¶
func NodeStateAppendRowArgs(n *NodeState) []interface{}
NodeStateAppendRowArgs returns the column values for one NodeState in table order (id, service_id, parent_id, parent_service_id, path, parent_path, type, size, mtime, depth, traversal_status, copy_status, excluded, errors) for use with duckdb.Appender.AppendRow.
func StatsKeyCopyStatus ¶
StatsKeyCopyStatus returns the src_stats key for copy status (src_nodes only). Valid statuses: pending, successful, failed.
func StatsKeyTraversalStatus ¶
StatsKeyTraversalStatus returns the src_stats/dst_stats key for traversal status. Valid statuses: pending, successful, failed, not_on_src (DST only).
Types ¶
type BatchInsertOperation ¶
type BatchInsertOperation struct {
Operations []InsertOperation
}
BatchInsertOperation is a batch of node inserts.
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB is the DuckDB-backed database handle. Single physical connection for all DB operations (schema, bulk append at seal, pulls, checkpoint).
func (*DB) AddNodeDeletion ¶
AddNodeDeletion deletes a node immediately (retry DST cleanup).
func (*DB) AddNodeDeletions ¶
func (db *DB) AddNodeDeletions(deletions []NodeDeletion) error
AddNodeDeletions deletes multiple nodes.
func (*DB) Checkpoint ¶
Checkpoint runs CHECKPOINT on the main conn, guarded by checkpointMu. Call at root seeding and round advancement only. Uses a single connection; running CHECKPOINT on multiple connections causes "Could not remove file X.wal: No such file or directory".
func (*DB) Close ¶
Close closes the database connection. Stops the seal buffer first (flushing any pending jobs).
func (*DB) FlushSealBuffer ¶
FlushSealBuffer drains pending seal jobs to the DB. Call before backpressure wait so we don't block on the flush interval.
func (*DB) GetAllQueueStats ¶
GetAllQueueStats returns all queue stats from queue_stats table.
func (*DB) GetCopyCountAtDepth ¶
GetCopyCountAtDepth returns the count of nodes in src_nodes at the given depth and copy_status; if nodeType != "", filters by type (e.g. "folder" or "file").
func (*DB) GetDBForPulls ¶
GetDBForPulls returns the main connection for pull queries. queueType is "SRC" or "DST" (both use same conn).
func (*DB) GetMaxDepth ¶
GetMaxDepth returns the maximum depth present in the stats table for the given table ("SRC" or "DST"). Used as stop condition for retry sweep.
func (*DB) GetPendingTraversalCountAtDepthFromLive ¶
GetPendingTraversalCountAtDepthFromLive returns the count of nodes at the given depth with traversal_status = 'pending' from the live nodes table. Use when advancing to a new round (stats for that depth may not exist yet).
func (*DB) GetQueueStats ¶
GetQueueStats returns the metrics JSON for the queue key from queue_stats table.
func (*DB) GetStatsBreakdown ¶
GetStatsBreakdown returns all (depth, key, count) rows for the table so callers can see e.g. "level 4 has X pending". Order: depth, key.
func (*DB) GetStatsCount ¶
GetStatsCount returns the total count for the given key across all depths from src_stats or dst_stats (table = "SRC" or "DST"). E.g. "all pending items total in src_nodes" = GetStatsCount("SRC", StatsKeyTraversalStatus("pending")).
func (*DB) GetStatsCountAtDepth ¶
GetStatsCountAtDepth returns the count for (depth, key) from src_stats or dst_stats.
func (*DB) RunAppenderWriterTx ¶
RunAppenderWriterTx runs fn inside a transaction on the main connection.
func (*DB) RunUpdateWriterTx ¶
RunUpdateWriterTx runs fn inside a transaction on the main connection.
func (*DB) RunWithConn ¶
RunWithConn runs fn with the single DB connection while holding writeMu. Used by the seal buffer for appender + stats. Caller must not retain conn after fn returns.
func (*DB) SealLevel ¶
func (db *DB) SealLevel(table string, depth int, nodes []*NodeState, pending, successful, failed, completed int64, copyP, copyS, copyF int64) error
SealLevel persists a sealed level from memory cache to the DB (bulk append + stats snapshot). copyP/copyS/copyF are used for SRC copy stats when >= 0. Payload is enqueued to the seal buffer, which writes and checkpoints asynchronously.
func (*DB) SealLevelDepth0 ¶
func (db *DB) SealLevelDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, copyP, copyS, copyF int64) error
SealLevelDepth0 updates existing root row(s) at depth 0 and writes stats. Root rows are seeded up-front, so depth 0 uses update semantics instead of appender insert.
func (*DB) WaitUntilSealFlushedThrough ¶
WaitUntilSealFlushedThrough blocks until the seal buffer has written at least the given depth (for backpressure: don't run more than one round ahead of flushed state).
type FetchResult ¶
FetchResult is one row from a keyset list (id + full state).
func ListDstBatchWithSrcChildren ¶
func ListDstBatchWithSrcChildren(d *DB, depth int, afterID string, limit int, traversalStatus string) ([]FetchResult, map[string][]*NodeState, error)
ListDstBatchWithSrcChildren returns the next batch of DST nodes at depth (keyset afterID, limit) and their SRC children (join by parent_path = d.path) in one query. Optional traversalStatus filter (e.g. StatusPending). Returns DST rows as []FetchResult and per-DST-ID SRC children as map[string][]*NodeState. Cursor must be round-scoped; reset on round advance, mode switch, and after seal. Uses DST pull conn so pulls see the same data as writes.
func ListNodesByDepthKeyset ¶
func ListNodesByDepthKeyset(d *DB, table string, depth int, afterID, statusFilter string, limit int) ([]FetchResult, error)
ListNodesByDepthKeyset returns nodes at the given depth, ordered by id, after afterID, limit rows. If statusFilter is non-empty, only rows with traversal_status = statusFilter are returned (e.g. StatusPending). Uses pull conn so pulls see the same data as writes (roots, node inserts).
func ListNodesCopyKeyset ¶
func ListNodesCopyKeyset(d *DB, depth int, nodeType, afterID string, limit int) ([]FetchResult, error)
ListNodesCopyKeyset returns src_nodes at depth for copy phase with copy_status = 'pending', ordered by id, after afterID, limit. Optional nodeType filter (folder/file or "" for both). Uses pull conn so pulls see the same data as writes.
type InsertOperation ¶
type InsertOperation struct {
QueueType string // "SRC" or "DST"
Level int // depth
Status string // initial traversal_status
State *NodeState
}
InsertOperation represents a single node insert in a batch.
type LogBuffer ¶
type LogBuffer struct {
// contains filtered or unexported fields
}
LogBuffer buffers log entries and flushes them to the DB logs table. Back-pressure: at 2x batch size, Add blocks until a flush completes. Flushing guard: only one in-flight flush; getAndClearIfReady returns nil if already flushing.
func NewLogBuffer ¶
NewLogBuffer creates a log buffer that flushes to the main DB's logs table.
type LogEntry ¶
type LogEntry struct {
ID int64
Timestamp string
Level string
Entity string
EntityID string
Message string
Queue string
}
LogEntry is a single log line for persistence.
type NodeDeletion ¶
NodeDeletion represents a node delete (retry DST cleanup).
type NodeState ¶
type NodeState struct {
ID string // Internal ULID-like id (deterministic from queueType, nodeType, path)
ServiceID string // FS id
ParentID string // Parent's internal id
ParentServiceID string
Path string // Join key with other table
ParentPath string // Join key for children
Name string // Display name (for task/UI)
Type string // "folder" or "file"
Size int64
MTime string
Depth int
TraversalStatus string // pending, successful, failed, not_on_src (dst)
CopyStatus string // pending, in_progress, successful, failed (src)
Excluded bool
Errors string // JSON placeholder for log refs
Status string // Alias for TraversalStatus (used by queue taskToNodeState)
SrcID string // Optional: corresponding SRC node id (join is by path; used during seeding for DST root)
}
NodeState is the in-memory representation of a row in src_nodes or dst_nodes. Path and parent_path are the join keys between SRC and DST.
func GetChildrenByParentID ¶
GetChildrenByParentID returns up to limit children with the given parent_id.
func GetChildrenByParentPath ¶
GetChildrenByParentPath returns up to limit children with the given parent_path.
func GetNodeByID ¶
GetNodeByID returns the node by id from the given table. Uses pull conn so we see appender-written data.
func GetNodeByPath ¶
GetNodeByPath returns the node by path from the given table. Uses pull conn so we see appender-written data.
type Options ¶
type Options struct {
Path string // Path to DuckDB file (e.g. "migration.duckdb")
SealBuffer *SealBufferOptions // Optional overrides for seal buffer; nil uses defaults. Seal buffer is always created.
}
Options configures DB open behavior.
type SealBuffer ¶
type SealBuffer struct {
// contains filtered or unexported fields
}
SealBuffer buffers seal jobs and flushes them to the DB asynchronously. Flush triggers: interval timer, row count threshold, and Stop/ForceFlush (or backpressure sync).
func NewSealBuffer ¶
func NewSealBuffer(db *DB, opts SealBufferOptions) *SealBuffer
NewSealBuffer creates a seal buffer and starts its flush loop.
func (*SealBuffer) Add ¶
func (sb *SealBuffer) Add(table string, depth int, nodes []*NodeState, pending, successful, failed, completed, copyP, copyS, copyF int64)
Add enqueues a seal job. Blocks if queue size (total node count) would exceed hardCap until a flush completes.
func (*SealBuffer) Flush ¶
func (sb *SealBuffer) Flush() error
Flush drains queued jobs and writes them to the DB using DuckDB's Appender API (nodes) then a transaction (stats), then checkpoint.
func (*SealBuffer) LastFlushedDepth ¶
func (sb *SealBuffer) LastFlushedDepth() int
LastFlushedDepth returns the maximum depth that has been written to the DB by a completed Flush. -1 until first flush.
func (*SealBuffer) Stop ¶
func (sb *SealBuffer) Stop()
Stop stops the flush loop and flushes any remaining jobs.
func (*SealBuffer) WaitUntilFlushedThrough ¶
func (sb *SealBuffer) WaitUntilFlushedThrough(depth int)
WaitUntilFlushedThrough blocks until at least the given depth has been written by a completed Flush.
type SealBufferOptions ¶
SealBufferOptions configures the seal buffer. Zero value uses defaults.
type SealJob ¶
type SealJob struct {
Table string
Depth int
Nodes []*NodeState
Pending int64
Successful int64
Failed int64
Completed int64
CopyP int64
CopyS int64
CopyF int64
}
Arr Arr Arr! (Seal noises lol) SealJob is one sealed level's payload: table, depth, nodes, and stats.
type StatsRow ¶
StatsRow is one row from src_stats or dst_stats (depth, key, count). For breakdown by level.
type StatusUpdateOperation ¶
type StatusUpdateOperation struct {
QueueType string
Level int
OldStatus string
NewStatus string
NodeID string
}
StatusUpdateOperation represents a traversal status transition (e.g. pending → successful).
type SubtreeStats ¶
SubtreeStats holds aggregate counts for a subtree (path = rootPath OR path LIKE rootPath || '/%').
func CountSubtree ¶
func CountSubtree(d *DB, table, rootPath string) (SubtreeStats, error)
CountSubtree returns aggregate counts for the subtree at rootPath (path = rootPath OR path LIKE rootPath || '/%'; for rootPath "/" uses path LIKE '/%'). Single SQL query, no DFS.
type WriteOperation ¶
type WriteOperation interface {
// contains filtered or unexported methods
}
WriteOperation is an operation that can be buffered and flushed via the writer.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is the write handle for DuckDB. Used inside RunUpdateWriterTx.
func (*Writer) AppenderInsert ¶
AppenderInsert inserts nodes into src_nodes or dst_nodes (batch INSERT). Used when not using DuckDB Appender path; for Appender path use RunAppenderTx.
func (*Writer) DeleteNode ¶
DeleteNode deletes the node from the given table (for retry DST cleanup).
func (*Writer) DeleteSubtree ¶
DeleteSubtree deletes all nodes in the subtree at rootPath (path = rootPath OR path LIKE rootPath || '/%'; for rootPath "/" uses path LIKE '/%'), then recomputes stats for each affected depth. Table is "SRC" or "DST".
func (*Writer) InsertLog ¶
func (w *Writer) InsertLog(id int64, level, message, component, entity, entityID, queue string) error
InsertLog inserts a row into logs. id must be unique (e.g. from GenerateLogID).
func (*Writer) RecomputeStatsForDepth ¶
RecomputeStatsForDepth deletes stats for the given table and depth, then writes fresh counts from the live nodes table at that depth (by traversal_status, and for SRC by copy_status). Call after direct writes that affect node counts at that depth (e.g. SetNodeTraversalStatus, DeleteSubtree).
func (*Writer) RecordTaskError ¶
func (w *Writer) RecordTaskError(queueType, phase, nodeID, message string, attempts int, path string) error
RecordTaskError inserts a row into task_errors.
func (*Writer) SealDepth0 ¶
func (w *Writer) SealDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, copyPending, copySuccessful, copyFailed int64) error
SealDepth0 updates existing depth-0 nodes (seeded roots) and writes the depth-0 stats snapshot. Unlike SealLevel insert flow, this does not append rows; it updates existing root rows.
func (*Writer) SetNodeCopyStatus ¶
SetNodeCopyStatus updates a SRC node's copy_status on the live table and recomputes stats for that depth.
func (*Writer) SetNodeExcluded ¶
SetNodeExcluded updates a node's excluded flag on the live table. No stats update (schema has no excluded key in stats).
func (*Writer) SetNodeTraversalStatus ¶
SetNodeTraversalStatus updates a node's traversal_status on the live table and recomputes stats for that depth. For test setup only; normal flow uses staging. Table is "SRC" or "DST".
func (*Writer) SetStatsCountForDepth ¶
SetStatsCountForDepth sets (depth, key, count) in src_stats or dst_stats. Must be called inside RunUpdateWriterTx.
func (*Writer) WriteLevelStatsSnapshot ¶
func (w *Writer) WriteLevelStatsSnapshot(table string, depth int, pending, successful, failed, completed int64, copyPending, copySuccessful, copyFailed int64) error
WriteLevelStatsSnapshot writes per-depth stats for a sealed level (traversal counts + completed). If copyPending >= 0 and table is SRC, also writes copy/* keys.
func (*Writer) WriteQueueStats ¶
WriteQueueStats upserts queue metrics JSON into queue_stats.