Documentation
¶
Overview ¶
Package sync orchestrates data synchronization from PeeringDB into the local SQLite database using the ent ORM.
Index ¶
- Variables
- func GetCursor(ctx context.Context, db *sql.DB, objType string) (time.Time, error)
- func GetLastSuccessfulFullSyncTime(ctx context.Context, db *sql.DB) (time.Time, error)
- func GetLastSuccessfulSyncTime(ctx context.Context, db *sql.DB) (time.Time, error)
- func GetMaxUpdated(ctx context.Context, db *sql.DB, table string) (time.Time, error)
- func InitStatusTable(ctx context.Context, db *sql.DB) error
- func InitialObjectCounts(ctx context.Context, db *sql.DB) (map[string]int64, error)
- func ReapStaleRunningRows(ctx context.Context, db *sql.DB) (int, error)
- func RecordSyncComplete(ctx context.Context, db *sql.DB, id int64, status Status) error
- func RecordSyncStart(ctx context.Context, db *sql.DB, startedAt time.Time, mode string) (int64, error)
- func StepOrder() []string
- func UpsertCursor(ctx context.Context, tx *ent.Tx, objType string, lastSyncAt time.Time, ...) error
- type Status
- type Worker
- func (w *Worker) HasCompletedSync() bool
- func (w *Worker) SetRetryBackoffs(backoffs []time.Duration)
- func (w *Worker) StartScheduler(ctx context.Context, interval time.Duration)
- func (w *Worker) Sync(ctx context.Context, mode config.SyncMode) error
- func (w *Worker) SyncWithRetry(ctx context.Context, mode config.SyncMode) error
- type WorkerConfig
Constants ¶
This section is empty.
Variables ¶
var ErrSyncMemoryLimitExceeded = errors.New("sync aborted: memory limit exceeded")
ErrSyncMemoryLimitExceeded is returned by Worker.Sync when runtime.ReadMemStats reports HeapAlloc above WorkerConfig.SyncMemoryLimit after the Phase A fetch pass completes. The sync aborts without opening the ent transaction; the running mutex is released on return and the next scheduled retry proceeds normally after the Phase A scratch batches are reclaimed by GC.
Commit F (Plan 54-03) defense-in-depth against PeeringDB data growth that exceeds the 400 MB bench harness baseline at runtime (e.g. if netixlan doubles between benchmarks and production). Callers detect the sentinel via errors.Is per GO-ERR-2.
Functions ¶
func GetCursor ¶ added in v1.2.0
DEPRECATED v1.18.10 (260428-mu0): cursor is now derived from MAX(updated) per table via internal/sync/cursor.go GetMaxUpdated. Slated for removal in v1.19. Do not call from new code. The sync_cursors table CREATE TABLE in InitStatusTable is preserved for rollback safety; existing rows are ignored by the production sync path.
GetCursor returns the last sync timestamp for a type. Returns zero time only if no cursor row exists for the type.
v1.18.3: dropped the prior `AND last_status = 'success'` filter. The cursor is a high-water-mark timestamp; failure observability lives in the separate sync_status table. Coupling cursor reads to last_status caused subtle "all cursors zero after a failed cycle" surprises that could trigger expensive re-fetches (and was load-bearing in the v1.18.2 bootstrap regression). The last_status column is preserved for stored data compatibility and any future dashboard queries.
func GetLastSuccessfulFullSyncTime ¶ added in v1.18.10
GetLastSuccessfulFullSyncTime returns the completion time of the most recent successful FULL sync, or zero time if no full sync has been recorded. Used by the PDBPLUS_FULL_SYNC_INTERVAL escape hatch in syncFetchPass to force a periodic bare-list refetch — defends against pathological upstream cross-row inconsistency where a since= response includes row R' (updated=M) but is missing earlier row R (updated < M); R is permanently missed under any since-based design.
260428-mu0.
func GetLastSuccessfulSyncTime ¶ added in v1.5.0
GetLastSuccessfulSyncTime returns the completion time of the most recent successful sync, or zero time if no successful sync has been recorded.
func GetMaxUpdated ¶ added in v1.18.10
GetMaxUpdated returns the maximum `updated` timestamp across all rows in the given table, or zero time if the table is empty (NULL).
The cursor is derived from MAX(updated) on each sync cycle rather than persisted in a sync_cursors table. This works because:
- The `updated` column is indexed on all 13 entity tables (`index.Fields("updated")` in every ent/schema/<type>.go).
- PeeringDB's `?since=N` is inclusive (`updated >= since` per internal/pdbcompat/filter.go applySince), so re-fetching the boundary row each cycle is idempotent (the Phase 75 skip-on-unchanged predicate turns the OnConflict UPDATE into a no-op).
- Empty table → NULL → zero time → caller falls through to the full bare-list path (existing stageOneTypeToScratch behaviour preserved).
- Tombstone rows (status='deleted') still count toward MAX(updated) because their `updated` reflects the upstream deletion event.
Implementation note: the query uses `ORDER BY updated DESC LIMIT 1` instead of `MAX(updated)` because modernc.org/sqlite only auto-parses TEXT → time.Time when the result column has a declared type of DATE / DATETIME / TIMESTAMP (see modernc.org/sqlite/rows.go:171-176). Aggregate expressions like MAX(...) drop the decltype, so the driver returns the raw stored string ("2026-04-28 12:00:00 +0000 UTC" — Go time.String() format, since the DSN does not pin _time_format). The ORDER-BY-LIMIT-1 form is index-backed (every entity has `index.Fields("updated")`) so the plan is identical: a single index seek.
Pathological-cross-row-inconsistency caveat: if upstream serves a response where row R' (updated=M) is present but row R (updated < M) is missing, R is permanently missed under any since-based design. The PDBPLUS_FULL_SYNC_INTERVAL escape hatch (Task 2) defends against this.
func InitStatusTable ¶
InitStatusTable creates the sync_status and sync_cursors tables if they don't exist. These are not ent-managed entities; they store operational metadata via raw SQL.
260428-mu0: a `mode TEXT NOT NULL DEFAULT 'incremental'` column is added to sync_status. Fresh databases get the column via a future CREATE TABLE adjustment (kept off the schema literal here for rollback simplicity); existing databases get it via an idempotent ALTER TABLE probed against pragma_table_info. GetLastSuccessfulFullSyncTime reads the column to implement the PDBPLUS_FULL_SYNC_INTERVAL escape hatch.
func InitialObjectCounts ¶ added in v1.18.0
InitialObjectCounts runs a one-shot UNION ALL COUNT(*) against each of the 13 PeeringDB entity tables and returns the result keyed by PeeringDB type name. The keys match those produced by syncSteps() so the same atomic cache can be primed by either the startup path (this helper) or the OnSyncComplete callback.
Implements OBS-01 D-01: synchronous startup population so the pdbplus_data_type_count gauge reports correct values within 30s of process start instead of holding zeros until the first sync cycle completes (~15 min default, ~1h on unauthenticated instances).
Cost: a single SQL UNION ALL across 13 tables; ~1ms on a primed LiteFS DB. Replaces the prior 13 sequential ent Count() calls (~15-20ms in aggregate). Counts include all rows regardless of status (matching the existing OnSyncComplete cache contract — "raw upserted- row count from the latest sync cycle"). Phase 68 tombstones (status="deleted") are rows the dashboard wants to see in "Total Objects" until tombstone GC ships (SEED-004 dormant). If a future requirement wants live-only counts, that's a separate metric.
Privacy: raw SQL bypasses ent's Privacy policy entirely (no Privacy Hook fires on db.QueryContext). The COUNT(*) sees every physical row regardless of privacy tier — symmetric with the OnSyncComplete writer (which runs under privacy.DecisionContext(ctx, privacy.Allow)).
Phase 75 OBS-01 D-01 history: this function previously elevated ctx to TierUsers via privctx.WithTier to keep Poc.Policy from filtering visible!="Public" rows. Without it, the cross-writer disagreement on POC counts caused the pdbplus_data_type_count{type="poc"} 2x/0.5x oscillation visible on the Grafana "Object Counts Over Time" panel: replicas (which only ever ran InitialObjectCounts) held the public- only count P while the primary's cache flipped between T ≈ 2P (just after a full sync) and tiny incremental deltas, and max by(type) across the 8-instance fleet alternated between T and P accordingly. 260428-eda CHANGE 6 retires the tier elevation entirely: raw SQL achieves the same row-set without going through ent privacy at all (a COUNT bypass is intentional and safe). See .planning/debug/poc-count-doubling-halving.md for the full incident analysis.
Errors are returned wrapped with the type name so an operator can see which table failed; partial results are NOT returned — a single failure aborts the whole call to keep the contract simple.
func ReapStaleRunningRows ¶ added in v1.14.0
ReapStaleRunningRows transitions any sync_status rows stuck in "running" state to "failed" with an explanatory error message. Call from startup on the primary machine — a row can get stuck in "running" if a previous process was killed mid-sync (e.g. rolling deploy terminated the primary before the sync commit/rollback path ran). The Worker.running atomic is reset on process start, so no future sync is blocked by these rows; the transition is purely cosmetic so /ui/about and /readyz queries stop seeing phantom "running" syncs that will never complete.
Safe to call concurrently with live sync workers because the Consul lease guarantees only one primary at a time. A legitimate in-flight "running" row would be replaced by its own RecordSyncComplete call (latest write wins); in practice the reap runs BEFORE the first sync worker tick so there's no real overlap window.
Returns the number of rows transitioned.
func RecordSyncComplete ¶
RecordSyncComplete updates the sync status row with results.
func RecordSyncStart ¶
func RecordSyncStart(ctx context.Context, db *sql.DB, startedAt time.Time, mode string) (int64, error)
RecordSyncStart inserts a new running sync status row and returns its ID.
260428-mu0: mode is "full" or "incremental" — persisted in the sync_status.mode column so GetLastSuccessfulFullSyncTime can find the most recent full-sync completion (used by the PDBPLUS_FULL_SYNC_INTERVAL escape hatch). The mode parameter MUST reflect the cycle's effective behaviour, not the configured default — a forced bare-list refetch should be recorded as "full".
func StepOrder ¶ added in v1.18.2
func StepOrder() []string
StepOrder returns a copy of the canonical 13-name sync step ordering. Out-of-package callers (cmd/loadtest sync mode) use it as the parity reference; mutating the returned slice is safe.
func UpsertCursor ¶ added in v1.2.0
func UpsertCursor(ctx context.Context, tx *ent.Tx, objType string, lastSyncAt time.Time, status string) error
DEPRECATED v1.18.10 (260428-mu0): cursor is now derived from MAX(updated) per table via internal/sync/cursor.go GetMaxUpdated. Slated for removal in v1.19. Do not call from new code. The sync_cursors table CREATE TABLE in InitStatusTable is preserved for rollback safety; existing rows are ignored by the production sync path.
UpsertCursor updates or inserts the sync cursor for a type.
260428-eda CHANGE 2: called WITHIN the main sync transaction (via *ent.Tx) so cursor writes commit atomically with their corresponding ent upserts. This closes the prior gap where cursor writes were 13 separate post-commit *sql.DB Exec calls — each one a LiteFS-replicated commit — and removes the failure window where ent upserts were durable but the cursor advance was not (resulting in re-fetching already-applied rows on the next cycle).
D-19 atomicity: sync_status (the outcome record) remains a separate raw-SQL Exec because it must reflect the OUTCOME of the tx (success/failure/error message) — that's correct. Cursors describe DATA STATE and belong inside the data tx.
Failure-mode shift: a cursor-write failure now rolls back the entire upsert tx (including any FK-backfill HTTP work that already happened inside it). This is the CORRECT semantic — cursor IS data state and a divergence between upserts-committed and cursor-not-advanced is the very bug being fixed. The OTel span attribute pdbplus.sync.cursor_write_caused_rollback is set true on the sync root span when a rollback was caused by cursor write failure (B3). SyncWithRetry handles transient failures by re-running the cycle.
Types ¶
type Status ¶ added in v1.2.0
type Status struct {
LastSyncAt time.Time
Duration time.Duration
ObjectCounts map[string]int // type -> count
Status string // "success", "failed", "running"
ErrorMessage string // empty on success
}
Status represents the result of a sync operation.
func GetLastCompletedStatus ¶ added in v1.14.0
GetLastCompletedStatus returns the most recent non-running sync status row (either "success" or "failed"). Used by /readyz to fall back past an in-flight "running" row so the health check reflects the most recent outcome — whether success or failure. Returns nil if no completed sync has ever been recorded.
NOTE: this is the right answer for /readyz (which wants to know "what's the most recent outcome?" and reports unhealthy on failed) but NOT for UI freshness displays (which want "when was the last known-good data?"). UI surfaces should use GetLastSuccessfulStatus instead.
func GetLastStatus ¶ added in v1.2.0
GetLastStatus returns the most recent sync status. Returns nil if no sync has been recorded.
func GetLastSuccessfulStatus ¶ added in v1.14.0
GetLastSuccessfulStatus returns the most recent successful sync status row. This is the right answer for UI surfaces that want to display "when was the last known-good data?" — it skips past any in-flight "running" rows AND any "failed" rows to find the most recent row with status="success". Returns nil if no successful sync has ever been recorded.
Unlike GetLastSuccessfulSyncTime (which returns only the timestamp for ETag seeding), this returns the full Status struct including object counts and duration for display purposes.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker orchestrates PeeringDB data synchronization.
func NewWorker ¶
func NewWorker(pdbClient *peeringdb.Client, entClient *ent.Client, db *sql.DB, cfg WorkerConfig, logger *slog.Logger) *Worker
NewWorker creates a new sync worker. If cfg.IsPrimary is nil, it defaults to always-primary for backward compatibility (local dev, tests without explicit primary config).
func (*Worker) HasCompletedSync ¶
HasCompletedSync reports whether at least one successful sync has completed. Used for 503 behavior per D-30.
func (*Worker) SetRetryBackoffs ¶
SetRetryBackoffs overrides the default retry backoff durations. Intended for testing.
func (*Worker) StartScheduler ¶
StartScheduler runs the sync scheduler on all instances per D-22. On primary nodes it executes sync cycles; on replicas it waits for promotion. Role changes are detected dynamically at each scheduler wakeup via w.config.IsPrimary(). The scheduler stops when ctx is cancelled per CC-2.
Scheduling anchor: the next sync is scheduled at lastCompletion + interval, not at processStart + N*interval. This matters across restarts — a rolling deploy mid-interval would otherwise delay the next sync by up to a full interval (the ticker would re-anchor on process start). Concretely:
- Fresh DB (no prior successful sync) on a primary: run a full sync immediately, then schedule the next at now+interval.
- Warm start on a primary with a recent lastSync: wait until lastSync+interval; if that is already in the past, the first iteration fires immediately.
- Replica: wake every interval to check for promotion. Matches the heartbeat cadence of the pre-rewrite ticker-based design.
After each cycle — success or failure — the next sync is scheduled at time.Now()+interval. A slower-than-expected sync therefore does NOT shorten the following window, and a failed sync gives PeeringDB a full interval to recover before the next external-facing retry.
func (*Worker) SyncWithRetry ¶
SyncWithRetry calls Sync and retries on failure with exponential backoff per D-21.
Rate-limit short-circuit: when the wrapped error is a *peeringdb.RateLimitError, the retry ladder is skipped entirely. PeeringDB's unauthenticated quota is 1 request per distinct query-string per hour, and all three default backoffs (30s, 2m, 8m) fall well inside that window — every retry within the window is guaranteed to 429 again AND consumes another slot against the hourly quota. Returning immediately lets the hourly scheduler retry naturally on its next tick (1h interval ≥ most Retry-After values we've observed).
type WorkerConfig ¶
type WorkerConfig struct {
IsPrimary func() bool // live primary detection; nil defaults to always-primary
SyncMode config.SyncMode
// OnSyncComplete is called after a successful sync with the worker's
// ctx and the completion timestamp. The timestamp is the same value
// persisted into the sync_status row by recordSuccess, so downstream
// consumers (e.g. the caching middleware ETag setter wired in
// cmd/peeringdb-plus/main.go for PERF-07) stay in lock-step with the
// database without an extra round-trip.
//
// Quick task 260427-ojm: the per-cycle upsert-count map (the old
// `counts map[string]int` arg) was removed. It was the wrong value to
// feed into the pdbplus_data_type_count gauge cache — for incremental
// syncs it was a delta, and for Poc it never agreed with the
// privacy-filtered Count(ctx) used at startup ("doubling-halving").
// Consumers should run pdbsync.InitialObjectCounts(ctx, client) on
// the supplied ctx if they want live row counts, or query
// sync_status (which still persists the raw upsert deltas) if they
// want cycle telemetry.
OnSyncComplete func(ctx context.Context, syncTime time.Time)
// SyncMemoryLimit is the peak Go heap ceiling (bytes) checked
// after Phase A fetch completes and before the ent.Tx opens. If
// runtime.MemStats.HeapAlloc exceeds this value, Sync aborts with
// ErrSyncMemoryLimitExceeded. Zero disables the guardrail. Wired
// from config.Config.SyncMemoryLimit by main.go. Commit F default
// is 400 MB (matches the DEBT-03 bench regression gate).
SyncMemoryLimit int64
// HeapWarnBytes is the peak Go heap threshold (bytes) above which
// the end-of-sync-cycle emitter fires slog.Warn("heap threshold
// crossed", ...). The OTel span attr pdbplus.sync.peak_heap_bytes is
// attached regardless. Zero disables only the Warn (not the attr).
// Wired from config.Config.HeapWarnBytes by main.go.
//
// SEED-001 escalation signal: sustained breach triggers the
// incremental-sync evaluation path documented in
// .planning/seeds/SEED-001-incremental-sync-evaluation.md.
HeapWarnBytes int64
// RSSWarnBytes is the peak OS RSS threshold (bytes) above which
// the emitter fires slog.Warn. Read from /proc/self/status VmHWM on
// Linux; skipped on other OSes (the RSS attr is then omitted — it
// is not set to zero). Zero disables only the Warn.
RSSWarnBytes int64
// FKBackfillMaxRequestsPerCycle caps the number of underlying HTTP
// requests issued by FK-backfill per sync cycle. v1.18.5 semantic
// shift: the previous cap counted ROWS (per-row in 260428-2zl,
// nominally per-row but batched in 260428-5xt — a weak circuit
// breaker once batching collapsed N rows into 1 request). The cap
// now directly bounds upstream HTTP traffic, the surface protected
// by upstream's API_THROTTLE_REPEATED_REQUEST and our local rate
// limiter. Default 20 (PDBPLUS_FK_BACKFILL_MAX_REQUESTS_PER_CYCLE)
// — at 1 req/sec auth, ≈20s of upstream pressure max per cycle.
// 0 disables backfill entirely (drop-on-miss behavior, preserved
// as an operator escape-hatch).
FKBackfillMaxRequestsPerCycle int
// FKBackfillTimeout is the per-cycle wall-clock budget for FK
// backfill HTTP activity. v1.18.3: added because backfill calls
// happen inside the sync transaction; without a deadline a cascade
// of slow / rate-limited backfills could hold the tx open for tens
// of minutes, stalling LiteFS replication. After the deadline,
// fkBackfillParent short-circuits to drop-on-miss so the rest of
// the sync (bulk fetches + upserts) can commit and the next cycle
// picks up where we left off. Default 5 minutes
// (PDBPLUS_FK_BACKFILL_TIMEOUT). Zero or negative disables the
// deadline (only the cap applies).
FKBackfillTimeout time.Duration
// FullSyncInterval is the interval after which a sync cycle forces
// a full bare-list refetch of every type, regardless of the
// per-table MAX(updated) cursor. Defends against pathological
// upstream cross-row inconsistency where a `?since=` response
// includes row R' (updated=M) but is missing earlier row R
// (updated < M); R is permanently missed under any since-based
// design without periodic full refetch. Wired from
// PDBPLUS_FULL_SYNC_INTERVAL (default 24h). Zero disables the
// escape hatch (only the per-cycle MAX(updated) cursor applies).
// 260428-mu0.
FullSyncInterval time.Duration
}
WorkerConfig holds configuration for the sync worker.