watch

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 7 Imported by: 0

README

watch — reactive change detection for SQLite

watch provides a generic poll → detect → debounce → action loop. All hot-reload systems in the ecosystem (connectivity, channels, mcprt, dbsync) use it as their change-detection backbone.

┌─────────┐   tick    ┌──────────┐  changed?  ┌──────────┐  debounce  ┌────────┐
│  Ticker  │────────►│ Detector  │──────────►│ Debounce  │──────────►│ Action  │
│ (1s def) │         │ (PRAGMA)  │           │ (opt.)    │           │ reload  │
└─────────┘         └──────────┘           └──────────┘           └────────┘

Quick start

w := watch.New(db, watch.Options{
    Interval: 200 * time.Millisecond,
    Debounce: 500 * time.Millisecond,
})
go w.OnChange(ctx, func() error {
    return service.Reload(ctx, db)
})

Built-in detectors

Detector Source Use case
PragmaDataVersion() PRAGMA data_version Cross-connection writes (default)
PragmaUserVersion() PRAGMA user_version Application-controlled versioning
MaxColumnDetector(table, col) MAX(column) Timestamp / auto-increment columns

PRAGMA data_version increments whenever another connection writes to the database, making it ideal for detecting external changes without triggers.

WaitForVersion

WaitForVersion blocks until the observed version reaches a target. Useful for tests and synchronization between producer and consumer.

// Writer bumps user_version after inserting config.
db.Exec("PRAGMA user_version = 42")

// Reader waits for version 42 before proceeding.
w.WaitForVersion(ctx, 42)

Observability

stats := w.Stats()
// stats.Checks          — total polls
// stats.ChangesDetected — version changes seen
// stats.Reloads         — successful action calls
// stats.AvgReloadTime   — mean action duration
// stats.Errors          — detector or action errors

Exported API

Symbol Description
Watcher Poll loop with debounce and stats
New(db, opts) Create watcher (does not start polling)
Options Interval, Debounce, Detector, Logger
ChangeDetector func(ctx, db) (int64, error)
PragmaDataVersion Default detector
PragmaUserVersion App-controlled detector
MaxColumnDetector Factory for MAX(col) detectors
Stats Point-in-time counters

Documentation

Overview

Package watch provides a generic "poll SQLite, detect change, debounce, reload" loop. It standardises the reactive pattern used across the chassis so that every consumer gets consistent intervals, debounce windows, and observability for free.

Typical usage:

w := watch.New(db, watch.Options{Interval: 200*time.Millisecond, Debounce: 500*time.Millisecond})
go w.OnChange(ctx, func() error { return service.Reload() })

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PragmaDataVersion

func PragmaDataVersion(ctx context.Context, db *sql.DB) (int64, error)

PragmaDataVersion uses PRAGMA data_version, which increments whenever another connection writes to the same database file. It detects cross-process and cross-connection mutations — ideal for hot reload.

func PragmaUserVersion

func PragmaUserVersion(ctx context.Context, db *sql.DB) (int64, error)

PragmaUserVersion uses PRAGMA user_version, which is an application-controlled integer. Callers must bump it explicitly after writes. Useful when you want deterministic version numbers (e.g. for WaitForVersion).

Types

type ChangeDetector

type ChangeDetector func(ctx context.Context, db *sql.DB) (int64, error)

ChangeDetector reads a version token from the database. Two calls that return different values mean "something changed". The concrete type is deliberately int64 — it maps naturally to PRAGMA data_version, PRAGMA user_version, or a MAX(updated_at) query.

func MaxColumnDetector

func MaxColumnDetector(table, column string) ChangeDetector

MaxColumnDetector returns a ChangeDetector that polls MAX(column) on a table. Handy for tables that use an auto-incrementing updated_at timestamp. Table and column names are quoted to prevent SQL injection.

type Options

type Options struct {
	// Interval is the polling frequency. Default: 1s.
	Interval time.Duration
	// Debounce is the quiet period after a change is detected before the
	// action fires. If more changes arrive during the window the timer
	// resets. 0 means fire immediately. Default: 0.
	Debounce time.Duration
	// Detector overrides the default PragmaDataVersion detector.
	Detector ChangeDetector
	// Logger overrides the default slog logger.
	Logger *slog.Logger
}

Options tunes the watcher behaviour.

type Stats

type Stats struct {
	Checks          int64         `json:"checks"`
	ChangesDetected int64         `json:"changes_detected"`
	Errors          int64         `json:"errors"`
	Reloads         int64         `json:"reloads"`
	AvgReloadTime   time.Duration `json:"avg_reload_time"`
}

Stats are point-in-time counters.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher polls a SQLite database for changes and runs an action when a change is detected. It is safe for concurrent use.

func New

func New(db *sql.DB, opts Options) *Watcher

New creates a Watcher. Call OnChange to start the loop.

func (*Watcher) OnChange

func (w *Watcher) OnChange(ctx context.Context, action func() error)

OnChange blocks until ctx is cancelled, polling at opts.Interval. When the detector reports a version change and the debounce window passes without further changes, action is called.

If action returns an error the version is NOT advanced — the action will be retried on the next poll cycle.

func (*Watcher) Stats

func (w *Watcher) Stats() Stats

Stats returns the current counters.

func (*Watcher) Version

func (w *Watcher) Version() int64

Version returns the last observed version token.

func (*Watcher) WaitForVersion

func (w *Watcher) WaitForVersion(ctx context.Context, target int64) error

WaitForVersion blocks until the watcher has observed and successfully processed (action returned nil) a version >= target, or ctx expires.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL