db

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2026 License: LGPL-2.1 Imports: 12 Imported by: 0

README

db Package

The db package is the persistence layer for the Migration Engine. It owns the database file, schema, and all read/write operations. The queue uses a memory-first flow: NodeCache is required. Hot-path updates go to in-memory level caches; the DB is written only at seal (round advance) via bulk append and stats snapshot.


Overview

  • DB (db.go): Opens DuckDB (file or :memory:), creates schema, holds one *sql.DB and transaction runners. Exposes seal API SealLevel(...) for persistence; AddNodeDeletions for retry DST cleanup; Checkpoint for durability. When Options.SealBuffer is non-nil, seal writes go through SealBuffer (async); otherwise seal is synchronous.
  • SealBuffer (seal_buffer.go): Buffers seal jobs (table, depth, nodes, stats); flushes to src_nodes/dst_nodes and stats tables on interval, row/batch threshold, and Stop/Flush. Used for write-ahead-log behavior at round seal without blocking the queue.
  • Writer (writer.go): Used inside RunUpdateWriterTx. AppenderInsert bulk-inserts nodes into live tables; WriteLevelStatsSnapshot writes per-depth stats (traversal and, for SRC, copy).
  • Schema (schema.go): DDL for live tables (src_nodes, dst_nodes, src_stats, dst_stats, stats, logs, queue_stats, task_errors).
  • Queries (queries.go): Read-only helpers: node by id/path, root, children, keyset lists by depth (traversal and copy), subtree counts, stats, batch lookups, ListDstBatchWithSrcChildren. All use the main DB connection.
  • Constants (constants.go): Traversal and copy status values, node types.
  • Types (nodestate.go): NodeState, NodeMeta, InsertOperation, FetchResult, DeterministicNodeID.
  • Logs (logs.go): LogBuffer batches log entries and flushes to the logs table via Writer (time-based, batch-size, and manual Flush/Stop). Unchanged from pre-seal-buffer design.
  • Seeding (seeding.go): InsertRootNode, BootstrapRootStats, BatchInsertNodes for initial setup.
  • Stats (stats.go): Stats key helpers, GetStatsCount, GetStatsCountAtDepth, GetCopyCountAtDepth, GetMaxDepth, GetPendingTraversalCountAtDepthFromLive, GetStatsBreakdown, queue stats getters.
  • Indexes (indexes.go): EnsureNodeTableIndexes for path, parent_path, traversal_status, copy_status on node tables.

Tables

Table Purpose
src_nodes Live source tree (path, depth, traversal_status, copy_status). Written at seal via bulk append.
dst_nodes Live destination tree. Same write path as SRC.
src_stats Per-depth counts (traversal and copy). Written at seal from in-memory snapshot.
dst_stats Per-depth traversal status counts.
stats Global key/count (e.g. completed counts).
logs Log entries (from LogBuffer).
queue_stats Queue metrics JSON per queue key.
task_errors Task error records (phase, node_id, message, etc.).

Write Paths

  1. Cache + seal (only path)
    NodeCache is required. Queue holds per-level caches (NodeCache / LevelCache). Task completion updates cache only. At seal (round advance), the queue calls SealLevel(...). When a SealBuffer is configured (Options.SealBuffer != nil), the payload is enqueued and written asynchronously (flush on interval, row/job threshold, or DB.Close); otherwise the write is synchronous in one transaction.
  2. Transactional updates
    RunUpdateWriterTx(fn) runs fn(Writer) in a single transaction. Serialized with other writes via writeMu. Used for seal, deletes (AddNodeDeletions), logs, queue_stats, and test setup.

Read Paths

  • Pulls: ListNodesByDepthKeyset, ListNodesCopyKeyset, ListDstBatchWithSrcChildren, GetNodeByID, GetNodeByPath, etc. use GetDB() / GetDBForPulls(queueType) (single connection).
  • Stats: GetStatsCount, GetStatsCountAtDepth, GetCopyCountAtDepth, GetMaxDepth, GetStatsBreakdown, etc. read from src_stats / dst_stats and live tables as needed.
  • Other: CountSubtree, GetAllLevels, BatchGetNodeMeta, BatchGetNodesByID, and similar helpers all query the same DuckDB connection.

Concurrency and Checkpoint

  • Single connection: conn.SetMaxOpenConns(1) so all operations share one connection; no cross-connection CHECKPOINT issues.
  • Writes: Guarded by writeMu; RunUpdateWriterTx and RunAppenderWriterTx are serialized. Seal buffer flushes and log buffer flushes use the same mutex. Checkpoint is separately guarded by checkpointMu (call at root seeding and round advancement only).

File Layout

pkg/db/
├── db.go        # DB open/close, schema init, SealLevel, AddNodeDeletions, checkpoint, transaction runners, optional SealBuffer
├── seal_buffer.go # SealBuffer: async seal jobs, flush on interval/row/job threshold and Stop
├── writer.go    # Writer: AppenderInsert, WriteLevelStatsSnapshot, stats, deletes, logs, task_errors, queue_stats
├── schema.go    # DDL for node, stats, logs, queue_stats, task_errors, migrations
├── queries.go   # All read queries (nodes, keysets, counts, batch lookups)
├── constants.go # Status and node-type constants
├── nodestate.go # NodeState, InsertOperation, FetchResult, DeterministicNodeID
├── logs.go      # LogBuffer → logs table (time + batch + manual flush)
├── seeding.go   # InsertRootNode, BootstrapRootStats, BatchInsertNodes
├── stats.go     # Stats key helpers and stats/queue_stats read APIs
└── indexes.go   # EnsureNodeTableIndexes (path, parent_path, traversal_status, copy_status)

Integration

  • Queue layer (pkg/queue): Uses this package for reads (keysets, stats), seal (SealLevel), retry DST cleanup (AddNodeDeletions), and resume rehydration (RehydrateLevelFromDB). NodeCache is required; hot-path writes go to cache only.
  • Migration (pkg/migration): Opens the DB via db.Open or receives an existing *db.DB; inspects status and runs verification against the same DB. Log service and config (pkg/configs) can supply log address and optional buffer settings.

Summary

  • Single connection: One *sql.DB; all reads and writes go through it.
  • Cache + seal only: Writes to the DB are bulk append and stats at seal (SealLevel), plus AddNodeDeletions for retry and transactional updates for logs, queue_stats, and setup. The former buffers.go (removed) was for the old staging/node merge path; SealBuffer and LogBuffer provide the appender-buffer behavior for seal and logs only (no staging): time-based, count-based, and manual flush.
  • Transactional updates: Seal (sync or via buffer flush), stats, deletes, and logging go through RunUpdateWriterTx(Writer).

Documentation

Index

Constants

View Source
const (
	StatusPending    = "pending"
	StatusSuccessful = "successful"
	StatusFailed     = "failed"
	StatusNotOnSrc   = "not_on_src" // DST only
	StatusExcluded   = "excluded"
)

Traversal status values (live table and staging). Stats keys use: pending, successful, failed, not_on_src (DST only).

View Source
const (
	CopyStatusPending    = "pending"
	CopyStatusInProgress = "in_progress" // transient; not stored in stats
	CopyStatusSuccessful = "successful"
	CopyStatusFailed     = "failed"
	CopyStatusSkipped    = "skipped"
)

Copy status values (src_nodes only). Stats keys use: pending, successful, failed (no in_progress in stats).

View Source
const (
	NodeTypeFolder = "folder"
	NodeTypeFile   = "file"
)

Node type values.

View Source
const StatsKeyCompleted = "completed"

StatsKeyCompleted is the stats key for completed count at a depth (written at seal).

View Source
const StatsKeyExpected = "expected"

StatsKeyExpected is the stats key for expected count at a depth (set at round start).

View Source
const (

	// TableMigrations is the migrations lifecycle table name (used by schema and migration store).
	TableMigrations = "migrations"
)

Variables

This section is empty.

Functions

func BatchGetChildrenIDsByParentIDs

func BatchGetChildrenIDsByParentIDs(d *DB, table string, parentIDs []string) (map[string][]string, error)

BatchGetChildrenIDsByParentIDs returns map[parentID][]childID for the given table and parent ids.

func BatchGetDstIDsFromSrcIDs

func BatchGetDstIDsFromSrcIDs(d *DB, srcIDs []string) (map[string]string, error)

BatchGetDstIDsFromSrcIDs returns map[srcID]dstID by resolving SRC id->path then DST path->id in two queries (no per-item reads).

func BatchGetNodeMeta

func BatchGetNodeMeta(d *DB, table string, ids []string) (map[string]NodeMeta, error)

BatchGetNodeMeta returns meta (id, depth, type, traversal_status, copy_status) for the given ids.

func BatchGetNodesByID

func BatchGetNodesByID(d *DB, table string, ids []string) (map[string]*NodeState, error)

BatchGetNodesByID returns nodes by id for the given table in one query. Missing ids are omitted from the map.

func BatchInsertNodes

func BatchInsertNodes(d *DB, ops []InsertOperation) error

BatchInsertNodes inserts nodes via the writer. Uses RunUpdateWriterTx.

func BootstrapRootStats

func BootstrapRootStats(d *DB) error

BootstrapRootStats writes (depth=0, key=traversal/pending, count=1) for SRC and DST so the queue sees pending work at round 0 after roots are seeded. Call once after inserting root nodes.

func CountExcluded

func CountExcluded(d *DB, table string) (int, error)

CountExcluded returns the number of nodes with excluded = true in the given table.

func CountExcludedInSubtree

func CountExcludedInSubtree(d *DB, table, rootPath string) (int, error)

CountExcludedInSubtree returns the number of excluded nodes in the subtree at rootPath. Single SQL query.

func CountNodes

func CountNodes(d *DB, table string) (int, error)

CountNodes returns the total number of nodes in the given table (src_nodes or dst_nodes). Live table count, not from stats. Uses pull conn so we see appender-written data.

func DeterministicNodeID

func DeterministicNodeID(queueType, nodeType, path string) string

DeterministicNodeID returns a stable id from (queueType, nodeType, path) for race-safe deduplication.

func EnsureNodeTableIndexes

func EnsureNodeTableIndexes(db *DB, table string) error

EnsureNodeTableIndexes creates stable lookup indexes on the given node table (e.g. "src_nodes", "dst_nodes") for path and parent_path.

Mutable status columns are intentionally left unindexed to avoid high write amplification during traversal/copy status updates.

Idempotent for create/drop operations.

func GenerateLogID

func GenerateLogID() int64

GenerateLogID returns a unique log id (monotonic in practice).

func GetAllLevels

func GetAllLevels(d *DB, table string) ([]int, error)

GetAllLevels returns distinct depth values for the table, sorted.

func GetChildrenIDsByParentID

func GetChildrenIDsByParentID(d *DB, table, parentID string, limit int) ([]string, error)

GetChildrenIDsByParentID returns child ids for the given parent_id (up to limit).

func GetDstIDFromSrcID

func GetDstIDFromSrcID(d *DB, srcParentID string) (string, error)

GetDstIDFromSrcID returns the DST node id that has the same path as the given SRC node id (join by path).

func GetDstIDToSrcPath

func GetDstIDToSrcPath(d *DB, dstIDs []string) (map[string]string, error)

GetDstIDToSrcPath returns for each DST id the path (which is the join key = "src path").

func GetSrcChildrenGroupedByParentPath

func GetSrcChildrenGroupedByParentPath(d *DB, parentPaths []string) (map[string][]*NodeState, error)

GetSrcChildrenGroupedByParentPath returns SRC nodes grouped by parent_path. Used to batch-load expected children for DST folder tasks (join by parent_path = dst.path). Deprecated: prefer ListDstBatchWithSrcChildren for keyset-based DST pull + SRC children in one query.

func InsertRootNode

func InsertRootNode(d *DB, table string, state *NodeState) error

InsertRootNode inserts the root node (path "/", depth 0) into the given table. Uses appender conn so pulls (same conn via GetDBForPulls) see the root immediately.

func NodeStateAppendRowArgs

func NodeStateAppendRowArgs(n *NodeState) []interface{}

NodeStateAppendRowArgs returns the column values for one NodeState in table order (id, service_id, parent_id, parent_service_id, path, parent_path, type, size, mtime, depth, traversal_status, copy_status, excluded, errors) for use with duckdb.Appender.AppendRow.

func StatsKeyCopyStatus

func StatsKeyCopyStatus(status string) string

StatsKeyCopyStatus returns the src_stats key for copy status (src_nodes only). Valid statuses: pending, successful, failed.

func StatsKeyTraversalStatus

func StatsKeyTraversalStatus(status string) string

StatsKeyTraversalStatus returns the src_stats/dst_stats key for traversal status. Valid statuses: pending, successful, failed, not_on_src (DST only).

Types

type BatchInsertOperation

type BatchInsertOperation struct {
	Operations []InsertOperation
}

BatchInsertOperation is a batch of node inserts.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is the DuckDB-backed database handle. Single physical connection for all DB operations (schema, bulk append at seal, pulls, checkpoint).

func Open

func Open(opts Options) (*DB, error)

Open opens a DuckDB database at the given path and creates schema if missing.

func (*DB) AddNodeDeletion

func (db *DB) AddNodeDeletion(table, nodeID string) error

AddNodeDeletion deletes a node immediately (retry DST cleanup).

func (*DB) AddNodeDeletions

func (db *DB) AddNodeDeletions(deletions []NodeDeletion) error

AddNodeDeletions deletes multiple nodes.

func (*DB) Checkpoint

func (db *DB) Checkpoint() error

Checkpoint runs CHECKPOINT on the main conn, guarded by checkpointMu. Call at root seeding and round advancement only. Uses a single connection; running CHECKPOINT on multiple connections causes "Could not remove file X.wal: No such file or directory".

func (*DB) Close

func (db *DB) Close() error

Close closes the database connection. Stops the seal buffer first (flushing any pending jobs).

func (*DB) FlushSealBuffer

func (db *DB) FlushSealBuffer() error

FlushSealBuffer drains pending seal jobs to the DB. Call before backpressure wait so we don't block on the flush interval.

func (*DB) GetAllQueueStats

func (db *DB) GetAllQueueStats() (map[string][]byte, error)

GetAllQueueStats returns all queue stats from queue_stats table.

func (*DB) GetCopyCountAtDepth

func (db *DB) GetCopyCountAtDepth(depth int, nodeType string, copyStatus string) (int64, error)

GetCopyCountAtDepth returns the count of nodes in src_nodes at the given depth and copy_status; if nodeType != "", filters by type (e.g. "folder" or "file").

func (*DB) GetDB

func (db *DB) GetDB() (*sql.DB, error)

GetDB returns the underlying *sql.DB for read-only queries (main conn).

func (*DB) GetDBForPulls

func (db *DB) GetDBForPulls(queueType string) (*sql.DB, error)

GetDBForPulls returns the main connection for pull queries. queueType is "SRC" or "DST" (both use same conn).

func (*DB) GetMaxDepth

func (db *DB) GetMaxDepth(table string) (int, error)

GetMaxDepth returns the maximum depth present in the stats table for the given table ("SRC" or "DST"). Used as stop condition for retry sweep.

func (*DB) GetPendingTraversalCountAtDepthFromLive

func (db *DB) GetPendingTraversalCountAtDepthFromLive(table string, depth int) (int64, error)

GetPendingTraversalCountAtDepthFromLive returns the count of nodes at the given depth with traversal_status = 'pending' from the live nodes table. Use when advancing to a new round (stats for that depth may not exist yet).

func (*DB) GetQueueStats

func (db *DB) GetQueueStats(queueKey string) ([]byte, error)

GetQueueStats returns the metrics JSON for the queue key from queue_stats table.

func (*DB) GetStatsBreakdown

func (db *DB) GetStatsBreakdown(table string) ([]StatsRow, error)

GetStatsBreakdown returns all (depth, key, count) rows for the table so callers can see e.g. "level 4 has X pending". Order: depth, key.

func (*DB) GetStatsCount

func (db *DB) GetStatsCount(table, key string) (int64, error)

GetStatsCount returns the total count for the given key across all depths from src_stats or dst_stats (table = "SRC" or "DST"). E.g. "all pending items total in src_nodes" = GetStatsCount("SRC", StatsKeyTraversalStatus("pending")).

func (*DB) GetStatsCountAtDepth

func (db *DB) GetStatsCountAtDepth(table string, depth int, key string) (int64, error)

GetStatsCountAtDepth returns the count for (depth, key) from src_stats or dst_stats.

func (*DB) Path

func (db *DB) Path() string

Path returns the database file path (or ":memory:").

func (*DB) RunAppenderWriterTx

func (db *DB) RunAppenderWriterTx(queueType string, fn func(w *Writer) error) error

RunAppenderWriterTx runs fn inside a transaction on the main connection.

func (*DB) RunUpdateWriterTx

func (db *DB) RunUpdateWriterTx(fn func(w *Writer) error) error

RunUpdateWriterTx runs fn inside a transaction on the main connection.

func (*DB) RunWithConn

func (db *DB) RunWithConn(ctx context.Context, fn func(conn *sql.Conn) error) error

RunWithConn runs fn with the single DB connection while holding writeMu. Used by the seal buffer for appender + stats. Caller must not retain conn after fn returns.

func (*DB) SealLevel

func (db *DB) SealLevel(table string, depth int, nodes []*NodeState, pending, successful, failed, completed int64, copyP, copyS, copyF int64) error

SealLevel persists a sealed level from memory cache to the DB (bulk append + stats snapshot). copyP/copyS/copyF are used for SRC copy stats when >= 0. Payload is enqueued to the seal buffer, which writes and checkpoints asynchronously.

func (*DB) SealLevelDepth0

func (db *DB) SealLevelDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, copyP, copyS, copyF int64) error

SealLevelDepth0 updates existing root row(s) at depth 0 and writes stats. Root rows are seeded up-front, so depth 0 uses update semantics instead of appender insert.

func (*DB) WaitUntilSealFlushedThrough

func (db *DB) WaitUntilSealFlushedThrough(depth int)

WaitUntilSealFlushedThrough blocks until the seal buffer has written at least the given depth (for backpressure: don't run more than one round ahead of flushed state).

type FetchResult

type FetchResult struct {
	Key   string
	State *NodeState
}

FetchResult is one row from a keyset list (id + full state).

func ListDstBatchWithSrcChildren

func ListDstBatchWithSrcChildren(d *DB, depth int, afterID string, limit int, traversalStatus string) ([]FetchResult, map[string][]*NodeState, error)

ListDstBatchWithSrcChildren returns the next batch of DST nodes at depth (keyset afterID, limit) and their SRC children (join by parent_path = d.path) in one query. Optional traversalStatus filter (e.g. StatusPending). Returns DST rows as []FetchResult and per-DST-ID SRC children as map[string][]*NodeState. Cursor must be round-scoped; reset on round advance, mode switch, and after seal. Uses DST pull conn so pulls see the same data as writes.

func ListNodesByDepthKeyset

func ListNodesByDepthKeyset(d *DB, table string, depth int, afterID, statusFilter string, limit int) ([]FetchResult, error)

ListNodesByDepthKeyset returns nodes at the given depth, ordered by id, after afterID, limit rows. If statusFilter is non-empty, only rows with traversal_status = statusFilter are returned (e.g. StatusPending). Uses pull conn so pulls see the same data as writes (roots, node inserts).

func ListNodesCopyKeyset

func ListNodesCopyKeyset(d *DB, depth int, nodeType, afterID string, limit int) ([]FetchResult, error)

ListNodesCopyKeyset returns src_nodes at depth for copy phase with copy_status = 'pending', ordered by id, after afterID, limit. Optional nodeType filter (folder/file or "" for both). Uses pull conn so pulls see the same data as writes.

type InsertOperation

type InsertOperation struct {
	QueueType string // "SRC" or "DST"
	Level     int    // depth
	Status    string // initial traversal_status
	State     *NodeState
}

InsertOperation represents a single node insert in a batch.

type LogBuffer

type LogBuffer struct {
	// contains filtered or unexported fields
}

LogBuffer buffers log entries and flushes them to the DB logs table. Back-pressure: at 2x batch size, Add blocks until a flush completes. Flushing guard: only one in-flight flush; getAndClearIfReady returns nil if already flushing.

func NewLogBuffer

func NewLogBuffer(db *DB, batchSize int, interval time.Duration) *LogBuffer

NewLogBuffer creates a log buffer that flushes to the main DB's logs table.

func (*LogBuffer) Add

func (lb *LogBuffer) Add(e LogEntry)

Add adds an entry to the buffer.

func (*LogBuffer) Flush

func (lb *LogBuffer) Flush()

Flush writes buffered entries to the logs table.

func (*LogBuffer) Stop

func (lb *LogBuffer) Stop()

Stop stops the flush loop. Does not close the DB.

type LogEntry

type LogEntry struct {
	ID        int64
	Timestamp string
	Level     string
	Entity    string
	EntityID  string
	Message   string
	Queue     string
}

LogEntry is a single log line for persistence.

type NodeDeletion

type NodeDeletion struct {
	Table  string
	NodeID string
}

NodeDeletion represents a node delete (retry DST cleanup).

type NodeMeta

type NodeMeta struct {
	ID              string
	Depth           int
	Type            string
	TraversalStatus string
	CopyStatus      string
}

NodeMeta is a subset of NodeState for batch lookups.

type NodeState

type NodeState struct {
	ID              string // Internal ULID-like id (deterministic from queueType, nodeType, path)
	ServiceID       string // FS id
	ParentID        string // Parent's internal id
	ParentServiceID string
	Path            string // Join key with other table
	ParentPath      string // Join key for children
	Name            string // Display name (for task/UI)
	Type            string // "folder" or "file"
	Size            int64
	MTime           string
	Depth           int
	TraversalStatus string // pending, successful, failed, not_on_src (dst)
	CopyStatus      string // pending, in_progress, successful, failed (src)
	Excluded        bool
	Errors          string // JSON placeholder for log refs
	Status          string // Alias for TraversalStatus (used by queue taskToNodeState)
	SrcID           string // Optional: corresponding SRC node id (join is by path; used during seeding for DST root)
}

NodeState is the in-memory representation of a row in src_nodes or dst_nodes. Path and parent_path are the join keys between SRC and DST.

func GetChildrenByParentID

func GetChildrenByParentID(d *DB, table, parentID string, limit int) ([]*NodeState, error)

GetChildrenByParentID returns up to limit children with the given parent_id.

func GetChildrenByParentPath

func GetChildrenByParentPath(d *DB, table, parentPath string, limit int) ([]*NodeState, error)

GetChildrenByParentPath returns up to limit children with the given parent_path.

func GetNodeByID

func GetNodeByID(d *DB, table, id string) (*NodeState, error)

GetNodeByID returns the node by id from the given table. Uses pull conn so we see appender-written data.

func GetNodeByPath

func GetNodeByPath(d *DB, table, path string) (*NodeState, error)

GetNodeByPath returns the node by path from the given table. Uses pull conn so we see appender-written data.

func GetRootNode

func GetRootNode(d *DB, table string) (id string, state *NodeState, ok bool)

GetRootNode returns the root node (path = '/') from the given table.

type Options

type Options struct {
	Path       string             // Path to DuckDB file (e.g. "migration.duckdb")
	SealBuffer *SealBufferOptions // Optional overrides for seal buffer; nil uses defaults. Seal buffer is always created.
}

Options configures DB open behavior.

func DefaultOptions

func DefaultOptions() Options

DefaultOptions returns default options.

type SealBuffer

type SealBuffer struct {
	// contains filtered or unexported fields
}

SealBuffer buffers seal jobs and flushes them to the DB asynchronously. Flush triggers: interval timer, row count threshold, and Stop/ForceFlush (or backpressure sync).

func NewSealBuffer

func NewSealBuffer(db *DB, opts SealBufferOptions) *SealBuffer

NewSealBuffer creates a seal buffer and starts its flush loop.

func (*SealBuffer) Add

func (sb *SealBuffer) Add(table string, depth int, nodes []*NodeState, pending, successful, failed, completed, copyP, copyS, copyF int64)

Add enqueues a seal job. Blocks if queue size (total node count) would exceed hardCap until a flush completes.

func (*SealBuffer) Flush

func (sb *SealBuffer) Flush() error

Flush drains queued jobs and writes them to the DB using DuckDB's Appender API (nodes) then a transaction (stats), then checkpoint.

func (*SealBuffer) LastFlushedDepth

func (sb *SealBuffer) LastFlushedDepth() int

LastFlushedDepth returns the maximum depth that has been written to the DB by a completed Flush. -1 until first flush.

func (*SealBuffer) Stop

func (sb *SealBuffer) Stop()

Stop stops the flush loop and flushes any remaining jobs.

func (*SealBuffer) WaitUntilFlushedThrough

func (sb *SealBuffer) WaitUntilFlushedThrough(depth int)

WaitUntilFlushedThrough blocks until at least the given depth has been written by a completed Flush.

type SealBufferOptions

type SealBufferOptions struct {
	FlushInterval time.Duration
	RowThreshold  int
	HardCap       int
}

SealBufferOptions configures the seal buffer. Zero value uses defaults.

type SealJob

type SealJob struct {
	Table      string
	Depth      int
	Nodes      []*NodeState
	Pending    int64
	Successful int64
	Failed     int64
	Completed  int64
	CopyP      int64
	CopyS      int64
	CopyF      int64
}

Arr Arr Arr! (Seal noises lol) SealJob is one sealed level's payload: table, depth, nodes, and stats.

type StatsRow

type StatsRow struct {
	Depth int
	Key   string
	Count int64
}

StatsRow is one row from src_stats or dst_stats (depth, key, count). For breakdown by level.

type StatusUpdateOperation

type StatusUpdateOperation struct {
	QueueType string
	Level     int
	OldStatus string
	NewStatus string
	NodeID    string
}

StatusUpdateOperation represents a traversal status transition (e.g. pending → successful).

type SubtreeStats

type SubtreeStats struct {
	TotalNodes   int
	TotalFolders int
	TotalFiles   int
	MaxDepth     int
}

SubtreeStats holds aggregate counts for a subtree (path = rootPath OR path LIKE rootPath || '/%').

func CountSubtree

func CountSubtree(d *DB, table, rootPath string) (SubtreeStats, error)

CountSubtree returns aggregate counts for the subtree at rootPath (path = rootPath OR path LIKE rootPath || '/%'; for rootPath "/" uses path LIKE '/%'). Single SQL query, no DFS.

type WriteOperation

type WriteOperation interface {
	// contains filtered or unexported methods
}

WriteOperation is an operation that can be buffered and flushed via the writer.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is the write handle for DuckDB. Used inside RunUpdateWriterTx.

func (*Writer) AppenderInsert

func (w *Writer) AppenderInsert(table string, nodes []*NodeState) error

AppenderInsert inserts nodes into src_nodes or dst_nodes (batch INSERT). Used when not using DuckDB Appender path; for Appender path use RunAppenderTx.

func (*Writer) DeleteNode

func (w *Writer) DeleteNode(table, nodeID string) error

DeleteNode deletes the node from the given table (for retry DST cleanup).

func (*Writer) DeleteSubtree

func (w *Writer) DeleteSubtree(table, rootPath string) error

DeleteSubtree deletes all nodes in the subtree at rootPath (path = rootPath OR path LIKE rootPath || '/%'; for rootPath "/" uses path LIKE '/%'), then recomputes stats for each affected depth. Table is "SRC" or "DST".

func (*Writer) InsertLog

func (w *Writer) InsertLog(id int64, level, message, component, entity, entityID, queue string) error

InsertLog inserts a row into logs. id must be unique (e.g. from GenerateLogID).

func (*Writer) RecomputeStatsForDepth

func (w *Writer) RecomputeStatsForDepth(table string, depth int) error

RecomputeStatsForDepth deletes stats for the given table and depth, then writes fresh counts from the live nodes table at that depth (by traversal_status, and for SRC by copy_status). Call after direct writes that affect node counts at that depth (e.g. SetNodeTraversalStatus, DeleteSubtree).

func (*Writer) RecordTaskError

func (w *Writer) RecordTaskError(queueType, phase, nodeID, message string, attempts int, path string) error

RecordTaskError inserts a row into task_errors.

func (*Writer) SealDepth0

func (w *Writer) SealDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, copyPending, copySuccessful, copyFailed int64) error

SealDepth0 updates existing depth-0 nodes (seeded roots) and writes the depth-0 stats snapshot. Unlike SealLevel insert flow, this does not append rows; it updates existing root rows.

func (*Writer) SetNodeCopyStatus

func (w *Writer) SetNodeCopyStatus(table, nodeID, status string) error

SetNodeCopyStatus updates a SRC node's copy_status on the live table and recomputes stats for that depth.

func (*Writer) SetNodeExcluded

func (w *Writer) SetNodeExcluded(table, nodeID string, excluded bool) error

SetNodeExcluded updates a node's excluded flag on the live table. No stats update (schema has no excluded key in stats).

func (*Writer) SetNodeTraversalStatus

func (w *Writer) SetNodeTraversalStatus(table, nodeID, status string) error

SetNodeTraversalStatus updates a node's traversal_status on the live table and recomputes stats for that depth. For test setup only; normal flow uses staging. Table is "SRC" or "DST".

func (*Writer) SetStatsCountForDepth

func (w *Writer) SetStatsCountForDepth(table string, depth int, key string, count int64) error

SetStatsCountForDepth sets (depth, key, count) in src_stats or dst_stats. Must be called inside RunUpdateWriterTx.

func (*Writer) WriteLevelStatsSnapshot

func (w *Writer) WriteLevelStatsSnapshot(table string, depth int, pending, successful, failed, completed int64, copyPending, copySuccessful, copyFailed int64) error

WriteLevelStatsSnapshot writes per-depth stats for a sealed level (traversal counts + completed). If copyPending >= 0 and table is SRC, also writes copy/* keys.

func (*Writer) WriteQueueStats

func (w *Writer) WriteQueueStats(queueKey, metricsJSON string) error

WriteQueueStats upserts queue metrics JSON into queue_stats.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL