replicate

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: Apache-2.0 Imports: 32 Imported by: 0

README

Hanzo Replicate

Hanzo Replicate is a standalone disaster recovery tool for SQLite. It runs as a background process and safely replicates changes incrementally to another file or S3. Replicate only communicates with SQLite through the SQLite API so it will not corrupt your database.

Module: github.com/hanzoai/replicate

Docker

ghcr.io/hanzoai/replicate:latest

Features

  • WAL streaming to S3 (continuous incremental replication)
  • End-to-end encryption via luxfi/age v1.4.0 (X25519, PQ upgrade path via X-Wing/ML-KEM-768)
  • Compatible with hanzoai/s3 (MinIO)
  • Init container restore + sidecar replication pattern for K8s

Encryption

Replicated data is encrypted before leaving the process using luxfi/age. Configure encryption in replicate.yml:

dbs:
  - path: /data/db.sqlite
    replicas:
      - type: s3
        bucket: my-bucket
        path: service/pod-0
        endpoint: s3.local:9000
        age-identities: /secrets/age-identity.txt
        age-recipients: /secrets/age-recipients.txt

Files on S3 are stored as .zap.age (age-encrypted ZAP binary format).

Usage

replicate replicate [arguments]
replicate restore [arguments] DB_PATH
replicate version

License

See LICENSE.

Documentation

Index

Constants

View Source
const (
	DefaultMonitorInterval      = 1 * time.Second
	DefaultCheckpointInterval   = 1 * time.Minute
	DefaultBusyTimeout          = 1 * time.Second
	DefaultMinCheckpointPageN   = 1000
	DefaultTruncatePageN        = 121359 // ~500MB with 4KB page size
	DefaultShutdownSyncTimeout  = 30 * time.Second
	DefaultShutdownSyncInterval = 500 * time.Millisecond

	// Sync error backoff configuration.
	// When sync errors occur repeatedly (e.g., disk full), backoff doubles each time.
	DefaultSyncBackoffMax = 5 * time.Minute  // Maximum backoff between retries
	SyncErrorLogInterval  = 30 * time.Second // Rate-limit repeated error logging
)

Default DB settings.

View Source
const (
	DefaultHeartbeatInterval = 5 * time.Minute
	DefaultHeartbeatTimeout  = 30 * time.Second
	MinHeartbeatInterval     = 1 * time.Minute
)
View Source
const (
	LogKeySystem    = "system"
	LogKeySubsystem = "subsystem"
	LogKeyDB        = "db"
)
View Source
const (
	LogSystemStore  = "store"
	LogSystemServer = "server"
)
View Source
const (
	LogSubsystemCompactor = "compactor"
	LogSubsystemWALReader = "wal-reader"
)
View Source
const (
	CheckpointModePassive  = "PASSIVE"
	CheckpointModeFull     = "FULL"
	CheckpointModeRestart  = "RESTART"
	CheckpointModeTruncate = "TRUNCATE"
)

SQLite checkpoint modes.

View Source
const (
	WALHeaderChecksumOffset      = 24
	WALFrameHeaderChecksumOffset = 16
)

SQLite WAL constants.

View Source
const (
	// WALHeaderSize is the size of the WAL header, in bytes.
	WALHeaderSize = 32

	// WALFrameHeaderSize is the size of the WAL frame header, in bytes.
	WALFrameHeaderSize = 24
)
View Source
const (
	DefaultSnapshotInterval  = 24 * time.Hour
	DefaultSnapshotRetention = 24 * time.Hour

	DefaultRetention              = 24 * time.Hour
	DefaultRetentionCheckInterval = 1 * time.Hour

	// DefaultL0Retention is the default time that L0 files are kept around
	// after they have been compacted into L1 files.
	DefaultL0Retention = 5 * time.Minute
	// DefaultL0RetentionCheckInterval controls how frequently L0 retention is
	// enforced. This interval should be more frequent than the L1 compaction
	// interval so that VFS read replicas have time to observe new files.
	DefaultL0RetentionCheckInterval = 15 * time.Second

	// DefaultHeartbeatCheckInterval controls how frequently the heartbeat
	// monitor checks if heartbeat pings should be sent.
	DefaultHeartbeatCheckInterval = 15 * time.Second

	// DefaultDBInitTimeout is the maximum time to wait for a database to be
	// initialized (page size known) before logging a warning.
	DefaultDBInitTimeout = 30 * time.Second
)

Store defaults

View Source
const (
	GenerationsDirV3 = "generations"
	SnapshotsDirV3   = "snapshots"
	WALDirV3         = "wal"
)

v0.3.x path constants.

View Source
const DefaultEstimatedPageIndexSize = 32 * 1024 // 32KB

DefaultEstimatedPageIndexSize is size that is first fetched when fetching the page index. If the fetch was smaller than the actual page index, another call is made to fetch the rest.

View Source
const DefaultFollowInterval = 1 * time.Second

DefaultFollowInterval is the default polling interval for follow mode.

View Source
const DefaultRestoreParallelism = 8

DefaultRestoreParallelism is the default parallelism when downloading WAL files.

View Source
const (
	DefaultSyncInterval = 1 * time.Second
)

Default replica settings.

View Source
const (
	MetaDirSuffix = "-replicate"
)

Naming constants.

View Source
const SnapshotLevel = 9

SnapshotLevel represents the level which full snapshots are held.

Variables

View Source
var (
	ErrNoSnapshots      = errors.New("no snapshots available")
	ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch")
	ErrLTXCorrupted     = errors.New("ltx file corrupted")
	ErrLTXMissing       = errors.New("ltx file missing")
)

Replicate errors.

View Source
var (
	// LogWriter is the destination writer for all logging.
	LogWriter = os.Stdout

	// LogFlags are the flags passed to log.New().
	LogFlags = 0
)
View Source
var (
	// ErrNoCompaction is returned when no new files are available from the previous level.
	ErrNoCompaction = errors.New("no compaction")

	// ErrCompactionTooEarly is returned when a compaction is attempted too soon
	// since the last compaction time. This is used to prevent frequent
	// re-compaction when restarting the process.
	ErrCompactionTooEarly = errors.New("compaction too early")

	// ErrTxNotAvailable is returned when a transaction does not exist.
	ErrTxNotAvailable = errors.New("transaction not available")

	// ErrDBNotReady is a sentinel for errors.Is() compatibility.
	ErrDBNotReady = &DBNotReadyError{}

	// ErrShutdownInterrupted is returned when the shutdown sync retry loop
	// is interrupted by a done channel signal (e.g., second Ctrl+C).
	ErrShutdownInterrupted = errors.New("shutdown sync interrupted")

	ErrDatabaseNotFound = errors.New("database not found")
	ErrDatabaseNotOpen  = errors.New("database not open")
)
View Source
var DefaultCompactionLevels = CompactionLevels{
	{Level: 0, Interval: 0},
	{Level: 1, Interval: 30 * time.Second},
	{Level: 2, Interval: 5 * time.Minute},
	{Level: 3, Interval: time.Hour},
}

DefaultCompactionLevels provides the canonical default compaction configuration. Level 0 is raw LTX files, higher levels compact at increasing intervals. These values are also used by cmd/replicate DefaultConfig().

View Source
var ErrLeaseNotHeld = errors.New("lease not held")
View Source
var ErrStopIter = errors.New("stop iterator")

Functions

func AutoReplicate

func AutoReplicate(dbPath string) func()

AutoReplicate is the one-line SDK entry point for any Go app. Starts streaming E2E PQ-encrypted replication of a SQLite database to S3.

Usage:

stop := replicate.AutoReplicate("/app/data/data.db")
defer stop()

Reads all config from env vars:

REPLICATE_S3_ENDPOINT  — S3 endpoint (required, no-op if empty)
REPLICATE_S3_BUCKET    — bucket name (default: "replicate")
REPLICATE_S3_PATH      — key prefix (default: hostname)
REPLICATE_S3_REGION    — S3 region (default: "us-central1")
REPLICATE_AGE_RECIPIENT — age public key for PQ encryption
REPLICATE_AGE_IDENTITY  — age private key for restore/decrypt
REPLICATE_SYNC_INTERVAL — WAL sync interval (default: "1s")

Returns a stop function that gracefully shuts down replication. Returns a no-op function if REPLICATE_S3_ENDPOINT is not set.

func BoolQueryValue

func BoolQueryValue(query url.Values, keys ...string) (value bool, ok bool)

BoolQueryValue returns a boolean value from URL query parameters. It checks multiple keys in order and returns the value and whether it was set.

func CalcRestorePlan

func CalcRestorePlan(ctx context.Context, client ReplicaClient, txID ltx.TXID, timestamp time.Time, logger *slog.Logger) ([]*ltx.FileInfo, error)

CalcRestorePlan returns a list of storage paths to restore a snapshot at the given TXID.

func Checksum

func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)

Checksum computes a running SQLite checksum over a byte slice.

func CleanReplicaURLPath

func CleanReplicaURLPath(p string) string

CleanReplicaURLPath cleans a URL path for use in replica storage.

func EnsureEndpointScheme

func EnsureEndpointScheme(endpoint string) (string, bool)

EnsureEndpointScheme ensures an endpoint has an HTTP(S) scheme. For local endpoints (localhost, private IPs), it defaults to http://. For all other endpoints (cloud providers), it defaults to https://. Returns the endpoint with scheme and a boolean indicating if a scheme was added.

func FetchLTXHeader

func FetchLTXHeader(ctx context.Context, client ReplicaClient, info *ltx.FileInfo) (ltx.Header, error)

FetchLTXHeader reads & returns the LTX header for the given file info.

func FetchPage

func FetchPage(ctx context.Context, client ReplicaClient, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (ltx.PageHeader, []byte, error)

FetchPage fetches and decodes a single page frame from an LTX file.

func FetchPageIndex

func FetchPageIndex(ctx context.Context, client ReplicaClient, info *ltx.FileInfo) (map[uint32]ltx.PageIndexElem, error)

func FindLTXFiles

func FindLTXFiles(ctx context.Context, client ReplicaClient, level int, useMetadata bool, filter func(*ltx.FileInfo) (bool, error)) ([]*ltx.FileInfo, error)

FindLTXFiles returns a list of files that match filter. The useMetadata parameter is passed through to LTXFiles to control whether accurate timestamps are fetched from metadata. When true (timestamp-based restore), accurate timestamps are required. When false (normal operations), fast timestamps are sufficient.

func FormatSnapshotFilenameV3

func FormatSnapshotFilenameV3(index int) string

FormatSnapshotFilenameV3 returns the filename for a v0.3.x snapshot. Format: {index:08x}.snapshot.lz4

func FormatWALSegmentFilenameV3

func FormatWALSegmentFilenameV3(index int, offset int64) string

FormatWALSegmentFilenameV3 returns the filename for a v0.3.x WAL segment. Format: {index:08x}_{offset:08x}.wal.lz4

func GenerationPathV3

func GenerationPathV3(root, generation string) string

GenerationPathV3 returns the path to a specific generation.

func GenerationsPathV3

func GenerationsPathV3(root string) string

GenerationsPathV3 returns the path to the generations directory.

func IsBackblazeEndpoint

func IsBackblazeEndpoint(endpoint string) bool

IsBackblazeEndpoint returns true if the endpoint is Backblaze B2.

func IsCloudflareR2Endpoint

func IsCloudflareR2Endpoint(endpoint string) bool

IsCloudflareR2Endpoint returns true if the endpoint is Cloudflare R2.

func IsDigitalOceanEndpoint

func IsDigitalOceanEndpoint(endpoint string) bool

IsDigitalOceanEndpoint returns true if the endpoint is Digital Ocean Spaces.

func IsFilebaseEndpoint

func IsFilebaseEndpoint(endpoint string) bool

IsFilebaseEndpoint returns true if the endpoint is Filebase.

func IsGenerationIDV3

func IsGenerationIDV3(s string) bool

IsGenerationIDV3 returns true if s is a valid v0.3.x generation ID (16 hex chars).

func IsHetznerEndpoint

func IsHetznerEndpoint(endpoint string) bool

IsHetznerEndpoint returns true if the endpoint is Hetzner object storage service.

func IsLocalEndpoint

func IsLocalEndpoint(endpoint string) bool

IsLocalEndpoint returns true if the endpoint appears to be a local development endpoint (localhost, 127.0.0.1, or private network addresses). These endpoints typically use HTTP instead of HTTPS.

func IsMinIOEndpoint

func IsMinIOEndpoint(endpoint string) bool

IsMinIOEndpoint returns true if the endpoint appears to be MinIO or similar (a custom endpoint with a port number that is not a known cloud provider).

func IsScalewayEndpoint

func IsScalewayEndpoint(endpoint string) bool

IsScalewayEndpoint returns true if the endpoint is Scaleway Object Storage.

func IsSupabaseEndpoint

func IsSupabaseEndpoint(endpoint string) bool

IsSupabaseEndpoint returns true if the endpoint is Supabase Storage S3.

func IsTigrisEndpoint

func IsTigrisEndpoint(endpoint string) bool

IsTigrisEndpoint returns true if the endpoint is the Tigris object storage service.

func IsURL

func IsURL(s string) bool

func LTXDir

func LTXDir(root string) string

LTXDir returns the path to an LTX directory.

func LTXFilePath

func LTXFilePath(root string, level int, minTXID, maxTXID ltx.TXID) string

LTXFilePath returns the path to a single LTX file.

func LTXLevelDir

func LTXLevelDir(root string, level int) string

LTXLevelDir returns the path to an LTX level directory.

func ParseReplicaURL

func ParseReplicaURL(s string) (scheme, host, urlPath string, err error)

ParseReplicaURL parses a replica URL and returns the scheme, host, and path.

func ParseReplicaURLWithQuery

func ParseReplicaURLWithQuery(s string) (scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo, err error)

ParseReplicaURLWithQuery parses a replica URL and returns query parameters and userinfo.

func ParseSnapshotFilenameV3

func ParseSnapshotFilenameV3(filename string) (index int, err error)

ParseSnapshotFilenameV3 parses a v0.3.x snapshot filename and returns the index. Returns an error if the filename does not match the expected format.

func ParseWALSegmentFilenameV3

func ParseWALSegmentFilenameV3(filename string) (index int, offset int64, err error)

ParseWALSegmentFilenameV3 parses a v0.3.x WAL segment filename. Returns the WAL index and byte offset, or an error if the filename is invalid.

func ReadTXIDFile

func ReadTXIDFile(outputPath string) (ltx.TXID, error)

ReadTXIDFile reads the TXID from a sidecar file at <outputPath>-txid. Returns 0, nil if the file does not exist (first run).

func RegionFromS3ARN

func RegionFromS3ARN(arn string) string

RegionFromS3ARN extracts the region from an S3 ARN.

func RegisterReplicaClientFactory

func RegisterReplicaClientFactory(scheme string, factory ReplicaClientFactory)

RegisterReplicaClientFactory registers a factory function for creating replica clients for a given URL scheme. This is typically called from init() functions in backend packages.

func ReplicaTypeFromURL

func ReplicaTypeFromURL(rawURL string) string

ReplicaTypeFromURL returns the replica type from a URL string. Returns empty string if the URL is invalid or has no scheme.

func SnapshotPathV3

func SnapshotPathV3(root, generation string, index int) string

SnapshotPathV3 returns the full path to a v0.3.x snapshot file.

func SnapshotsPathV3

func SnapshotsPathV3(root, generation string) string

SnapshotsPathV3 returns the path to snapshots within a generation.

func TXIDPath

func TXIDPath(outputPath string) string

TXIDPath returns the path to the TXID sidecar file for the given database path. Uses -txid suffix to match SQLite's naming convention for associated files (-wal, -shm).

func WALChecksum

func WALChecksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)

WALChecksum computes a running SQLite WAL checksum over a byte slice.

func WALPathV3

func WALPathV3(root, generation string) string

WALPathV3 returns the path to WAL segments within a generation.

func WALSegmentPathV3

func WALSegmentPathV3(root, generation string, index int, offset int64) string

WALSegmentPathV3 returns the full path to a v0.3.x WAL segment file.

func WriteTXIDFile

func WriteTXIDFile(outputPath string, txid ltx.TXID) error

WriteTXIDFile atomically writes a TXID to a sidecar file at <outputPath>-txid. Uses temp-file + fsync + rename for crash safety.

Types

type CompactionLevel

type CompactionLevel struct {
	// The numeric level. Must match the index in the list of levels.
	Level int

	// The frequency that the level is compacted from the previous level.
	Interval time.Duration
}

CompactionLevel represents a single part of a multi-level compaction. Each level merges LTX files from the previous level into larger time granularities.

func (*CompactionLevel) NextCompactionAt

func (lvl *CompactionLevel) NextCompactionAt(now time.Time) time.Time

NextCompactionAt returns the time until the next compaction occurs. Returns the current time if it is exactly a multiple of the level interval.

func (*CompactionLevel) PrevCompactionAt

func (lvl *CompactionLevel) PrevCompactionAt(now time.Time) time.Time

PrevCompactionAt returns the time when the last compaction occurred. Returns the current time if it is exactly a multiple of the level interval.

type CompactionLevels

type CompactionLevels []*CompactionLevel

CompactionLevels represents a sorted slice of non-snapshot compaction levels.

func (CompactionLevels) IsValidLevel

func (a CompactionLevels) IsValidLevel(level int) bool

IsValidLevel returns true if level is a valid compaction level number.

func (CompactionLevels) Level

func (a CompactionLevels) Level(level int) (*CompactionLevel, error)

Level returns the compaction level at the given index. Returns an error if the index is a snapshot level or is out of bounds.

func (CompactionLevels) MaxLevel

func (a CompactionLevels) MaxLevel() int

MaxLevel return the highest non-snapshot compaction level.

func (CompactionLevels) NextLevel

func (a CompactionLevels) NextLevel(level int) int

NextLevel returns the next compaction level. Returns -1 if there is no next level.

func (CompactionLevels) PrevLevel

func (a CompactionLevels) PrevLevel(level int) int

PrevLevel returns the previous compaction level. Returns -1 if there is no previous level.

func (CompactionLevels) Validate

func (a CompactionLevels) Validate() error

Validate returns an error if the levels are invalid.

type Compactor

type Compactor struct {

	// VerifyCompaction enables post-compaction TXID consistency verification.
	// When enabled, verifies that files at the destination level have
	// contiguous TXID ranges after each compaction. Disabled by default.
	VerifyCompaction bool

	// RetentionEnabled controls whether Replicate actively deletes old files
	// during retention enforcement. When false, cloud provider lifecycle
	// policies handle retention instead. Local file cleanup still occurs.
	RetentionEnabled bool

	// CompactionVerifyErrorCounter is incremented when post-compaction
	// verification fails. Optional; if nil, no metric is recorded.
	CompactionVerifyErrorCounter prometheus.Counter

	// LocalFileOpener optionally opens a local LTX file for compaction.
	// If nil or returns os.ErrNotExist, falls back to remote.
	// This is used by DB to prefer local files over remote for consistency.
	LocalFileOpener func(level int, minTXID, maxTXID ltx.TXID) (io.ReadCloser, error)

	// LocalFileDeleter optionally deletes local LTX files after retention.
	// If nil, only remote files are deleted.
	LocalFileDeleter func(level int, minTXID, maxTXID ltx.TXID) error

	// CacheGetter optionally retrieves cached MaxLTXFileInfo for a level.
	// If nil, max file info is always fetched from remote.
	CacheGetter func(level int) (*ltx.FileInfo, bool)

	// CacheSetter optionally stores MaxLTXFileInfo for a level.
	// If nil, max file info is not cached.
	CacheSetter func(level int, info *ltx.FileInfo)

	// Age encryption for compaction. When set, reads are decrypted and
	// writes are encrypted so compacted files remain encrypted at rest.
	AgeIdentities []age.Identity
	AgeRecipients []age.Recipient
	// contains filtered or unexported fields
}

Compactor handles compaction and retention for LTX files. It operates solely through the ReplicaClient interface, making it suitable for both DB (with local file caching) and VFS (remote-only).

func NewCompactor

func NewCompactor(client ReplicaClient, logger *slog.Logger) *Compactor

NewCompactor creates a new Compactor with the given client and logger.

func (*Compactor) Compact

func (c *Compactor) Compact(ctx context.Context, dstLevel int) (*ltx.FileInfo, error)

Compact compacts source level files into the destination level. Returns ErrNoCompaction if there are no files to compact.

func (*Compactor) EnforceL0Retention

func (c *Compactor) EnforceL0Retention(ctx context.Context, retention time.Duration) error

EnforceL0Retention retains L0 files based on L1 compaction progress and time. Files are only deleted if they have been compacted into L1 AND are older than retention. This ensures contiguous L0 coverage for VFS reads.

func (*Compactor) EnforceRetentionByTXID

func (c *Compactor) EnforceRetentionByTXID(ctx context.Context, level int, txID ltx.TXID) error

EnforceRetentionByTXID deletes files at the given level with maxTXID below the target. Always keeps at least one file.

func (*Compactor) EnforceSnapshotRetention

func (c *Compactor) EnforceSnapshotRetention(ctx context.Context, retention time.Duration) (ltx.TXID, error)

EnforceSnapshotRetention enforces retention of snapshot level files by timestamp. Files older than the retention duration are deleted (except the newest is always kept). Returns the minimum snapshot TXID still retained (useful for cascading retention to lower levels).

func (*Compactor) MaxLTXFileInfo

func (c *Compactor) MaxLTXFileInfo(ctx context.Context, level int) (ltx.FileInfo, error)

MaxLTXFileInfo returns metadata for the last LTX file in a level. Uses cache if available, otherwise fetches from remote.

func (*Compactor) VerifyLevelConsistency

func (c *Compactor) VerifyLevelConsistency(ctx context.Context, level int) error

VerifyLevelConsistency checks that LTX files at the given level have contiguous TXID ranges (prevMaxTXID + 1 == currMinTXID for consecutive files). Returns an error describing any gaps or overlaps found.

type DB

type DB struct {
	Done <-chan struct{}

	// Minimum threshold of WAL size, in pages, before a passive checkpoint.
	// A passive checkpoint will attempt a checkpoint but fail if there are
	// active transactions occurring at the same time.
	//
	// Uses PASSIVE checkpoint mode (non-blocking). Keeps WAL size manageable
	// for faster restores. Default: 1000 pages (~4MB with 4KB page size).
	MinCheckpointPageN int

	// Threshold of WAL size, in pages, before a forced truncation checkpoint.
	// A forced truncation checkpoint will block new transactions and wait for
	// existing transactions to finish before issuing a checkpoint and
	// truncating the WAL.
	//
	// Uses TRUNCATE checkpoint mode (blocking). Prevents unbounded WAL growth
	// from long-lived read transactions. Default: 121359 pages (~500MB with 4KB
	// page size). Set to 0 to disable forced truncation (use with caution as
	// WAL can grow unbounded if read transactions prevent checkpointing).
	TruncatePageN int

	// Time between automatic checkpoints in the WAL. This is done to allow
	// more fine-grained WAL files so that restores can be performed with
	// better precision.
	//
	// Uses PASSIVE checkpoint mode (non-blocking). Default: 1 minute.
	// Set to 0 to disable time-based checkpoints.
	CheckpointInterval time.Duration

	// Frequency at which to perform db sync.
	MonitorInterval time.Duration

	// The timeout to wait for EBUSY from SQLite.
	BusyTimeout time.Duration

	// Minimum time to retain L0 files after they have been compacted into L1.
	L0Retention time.Duration

	// VerifyCompaction enables post-compaction TXID consistency verification.
	// When enabled, verifies that files at the destination level have
	// contiguous TXID ranges after each compaction.
	VerifyCompaction bool

	// RetentionEnabled controls whether Replicate actively deletes old files
	// during retention enforcement. When false, cloud provider lifecycle
	// policies handle retention instead. Local file cleanup still occurs.
	RetentionEnabled bool

	// Remote replica for the database.
	// Must be set before calling Open().
	Replica *Replica

	// Shutdown sync retry settings.
	// ShutdownSyncTimeout is the total time to retry syncing on shutdown.
	// ShutdownSyncInterval is the time between retry attempts.
	ShutdownSyncTimeout  time.Duration
	ShutdownSyncInterval time.Duration

	// Where to send log messages, defaults to global slog with database epath.
	Logger *slog.Logger
	// contains filtered or unexported fields
}

DB represents a managed instance of a SQLite database in the file system.

Checkpoint Strategy: Replicate uses a progressive 3-tier checkpoint approach to balance WAL size management with write availability:

  1. MinCheckpointPageN (PASSIVE): Non-blocking checkpoint at ~1k pages (~4MB). Attempts checkpoint but allows concurrent readers/writers.

  2. CheckpointInterval (PASSIVE): Time-based non-blocking checkpoint. Ensures regular checkpointing even with low write volume.

  3. TruncatePageN (TRUNCATE): Blocking checkpoint at ~121k pages (~500MB). Emergency brake for runaway WAL growth. Can block writes while waiting for long-lived read transactions. Configurable/disableable.

The RESTART checkpoint mode was permanently removed due to production issues with indefinite write blocking (issue #724). All checkpoints now use either PASSIVE (non-blocking) or TRUNCATE (emergency only) modes.

func NewDB

func NewDB(path string) *DB

NewDB returns a new instance of DB for a given path.

func (*DB) CRC64

func (db *DB) CRC64(ctx context.Context) (uint64, ltx.Pos, error)

CRC64 returns a CRC-64 ISO checksum of the database and its current position.

This function obtains a read lock so it prevents syncs from occurring until the operation is complete. The database will still be usable but it will be unable to checkpoint during this time.

If dst is set, the database file is copied to that location before checksum.

func (*DB) Checkpoint

func (db *DB) Checkpoint(ctx context.Context, mode string) (err error)

Checkpoint performs a checkpoint on the WAL file.

func (*DB) Close

func (db *DB) Close(ctx context.Context) (err error)

Close flushes outstanding WAL writes to replicas, releases the read lock, and closes the database. If Done is set, closing it interrupts the shutdown sync retry loop and cancels any in-flight sync attempt.

func (*DB) Compact

func (db *DB) Compact(ctx context.Context, dstLevel int) (*ltx.FileInfo, error)

Compact performs a compaction of the LTX file at the previous level into dstLevel. Returns metadata for the newly written compaction file. Returns ErrNoCompaction if no new files are available to be compacted.

func (*DB) DirInfo

func (db *DB) DirInfo() os.FileInfo

DirInfo returns the cached file stats for the parent directory of the database file when it was initialized.

func (*DB) EnforceL0RetentionByTime

func (db *DB) EnforceL0RetentionByTime(ctx context.Context) error

EnforceL0RetentionByTime retains L0 files until they have been compacted into L1 and have existed for at least L0Retention.

func (*DB) EnforceRetentionByTXID

func (db *DB) EnforceRetentionByTXID(ctx context.Context, level int, txID ltx.TXID) (err error)

EnforceRetentionByTXID enforces retention so that any LTX files below the target TXID are deleted. Always keep at least one file.

func (*DB) EnforceSnapshotRetention

func (db *DB) EnforceSnapshotRetention(ctx context.Context, timestamp time.Time) (minSnapshotTXID ltx.TXID, err error)

EnforceSnapshotRetention enforces retention of the snapshot level in the database by timestamp.

func (*DB) EnsureExists

func (db *DB) EnsureExists(ctx context.Context) error

EnsureExists restores the database from the configured replica if the local database file does not exist. If no backup is available, it returns nil and a fresh database will be created on Open(). Must be called before Open().

func (*DB) FileInfo

func (db *DB) FileInfo() os.FileInfo

FileInfo returns the cached file stats for the database file when it was initialized.

func (*DB) IsOpen

func (db *DB) IsOpen() bool

IsOpen returns true if the database has been opened.

func (*DB) LTXDir

func (db *DB) LTXDir() string

LTXDir returns path of the root LTX directory.

func (*DB) LTXLevelDir

func (db *DB) LTXLevelDir(level int) string

LTXLevelDir returns path of the given LTX compaction level. Panics if level is negative.

func (*DB) LTXPath

func (db *DB) LTXPath(level int, minTXID, maxTXID ltx.TXID) string

LTXPath returns the local path of a single LTX file. Panics if level or either txn ID is negative.

func (*DB) LastSuccessfulSyncAt

func (db *DB) LastSuccessfulSyncAt() time.Time

LastSuccessfulSyncAt returns the time of the last successful sync.

func (*DB) MaxLTX

func (db *DB) MaxLTX() (minTXID, maxTXID ltx.TXID, err error)

MaxLTX returns the last LTX file written to level 0.

func (*DB) MaxLTXFileInfo

func (db *DB) MaxLTXFileInfo(ctx context.Context, level int) (ltx.FileInfo, error)

MaxLTXFileInfo returns the metadata for the last LTX file in a level. If cached, it will returned the local copy. Otherwise, it fetches from the replica.

func (*DB) MetaPath

func (db *DB) MetaPath() string

MetaPath returns the path to the database metadata.

func (*DB) Notify

func (db *DB) Notify() <-chan struct{}

Notify returns a channel that closes when the shadow WAL changes.

func (*DB) Open

func (db *DB) Open() (err error)

Open initializes the background monitoring goroutine.

func (*DB) PageSize

func (db *DB) PageSize() int

PageSize returns the page size of the underlying database. Only valid after database exists & Init() has successfully run.

func (*DB) Path

func (db *DB) Path() string

Path returns the path to the database.

func (*DB) Pos

func (db *DB) Pos() (ltx.Pos, error)

Pos returns the current replication position of the database. The result is cached and invalidated when L0 LTX files change.

func (*DB) RecordSuccessfulSync

func (db *DB) RecordSuccessfulSync()

RecordSuccessfulSync marks the current time as a successful sync. Used by heartbeat monitoring to determine if a ping should be sent.

func (*DB) ResetLocalState

func (db *DB) ResetLocalState(ctx context.Context) error

ResetLocalState removes local LTX files, forcing a fresh snapshot on next sync. This is useful for recovering from corrupted or missing LTX files. The database file itself is not modified.

func (*DB) SQLDB

func (db *DB) SQLDB() *sql.DB

SQLDB returns a reference to the underlying sql.DB connection.

func (*DB) SetLogger

func (db *DB) SetLogger(logger *slog.Logger)

SetLogger updates the database logger and propagates to subsystems.

func (*DB) SetMetaPath

func (db *DB) SetMetaPath(path string)

SetMetaPath sets the path to database metadata.

func (*DB) Snapshot

func (db *DB) Snapshot(ctx context.Context) (*ltx.FileInfo, error)

SnapshotDB writes a snapshot to the replica for the current position of the database.

func (*DB) SnapshotReader

func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error)

SnapshotReader returns the current position of the database & a reader that contains a full database snapshot.

func (*DB) Sync

func (db *DB) Sync(ctx context.Context) (err error)

Sync copies pending data from the WAL to the shadow WAL.

func (*DB) SyncAndWait

func (db *DB) SyncAndWait(ctx context.Context) error

SyncAndWait performs a full sync: WAL to LTX files, then LTX files to remote replica. Blocks until both stages complete.

func (*DB) SyncStatus

func (db *DB) SyncStatus(ctx context.Context) (SyncStatus, error)

SyncStatus returns the current replication status of the database, comparing the local transaction position against the remote replica position. The remote position is queried from the replica storage, so this method may perform I/O.

func (*DB) WALPath

func (db *DB) WALPath() string

WALPath returns the path to the database's WAL file.

type DBNotReadyError

type DBNotReadyError struct {
	Reason string
}

DBNotReadyError is returned when an operation is attempted before the database has been initialized (e.g., page size not yet known).

func (*DBNotReadyError) Error

func (e *DBNotReadyError) Error() string

func (*DBNotReadyError) Is

func (e *DBNotReadyError) Is(target error) bool

type DatabaseSummary

type DatabaseSummary struct {
	Path   string `json:"path"`
	Status string `json:"status"`

	// LastSyncAt is the timestamp of the last successful replica sync.
	// This reflects when data was last successfully uploaded to the replica
	// storage backend, not just when the local WAL was processed.
	LastSyncAt *time.Time `json:"last_sync_at,omitempty"`
}

DatabaseSummary contains summary information about a database.

type ErrorResponse

type ErrorResponse struct {
	Error   string      `json:"error"`
	Details interface{} `json:"details,omitempty"`
}

ErrorResponse is returned when an error occurs.

type HeartbeatClient

type HeartbeatClient struct {
	URL      string
	Interval time.Duration
	Timeout  time.Duration
	// contains filtered or unexported fields
}

func NewHeartbeatClient

func NewHeartbeatClient(url string, interval time.Duration) *HeartbeatClient

func (*HeartbeatClient) LastPingAt

func (c *HeartbeatClient) LastPingAt() time.Time

func (*HeartbeatClient) Ping

func (c *HeartbeatClient) Ping(ctx context.Context) error

func (*HeartbeatClient) RecordPing

func (c *HeartbeatClient) RecordPing()

func (*HeartbeatClient) ShouldPing

func (c *HeartbeatClient) ShouldPing() bool

type InfoResponse

type InfoResponse struct {
	Version       string    `json:"version"`
	PID           int       `json:"pid"`
	UptimeSeconds int64     `json:"uptime_seconds"`
	StartedAt     time.Time `json:"started_at"`
	DatabaseCount int       `json:"database_count"`
}

InfoResponse is the response body for the /info endpoint.

type IntegrityCheckMode

type IntegrityCheckMode int

IntegrityCheckMode specifies the level of integrity checking after restore.

const (
	IntegrityCheckNone IntegrityCheckMode = iota
	IntegrityCheckQuick
	IntegrityCheckFull
)

type LTXError

type LTXError struct {
	Op      string // Operation that failed (e.g., "open", "read", "validate")
	Path    string // File path
	Level   int    // LTX level (0 = L0, etc.)
	MinTXID uint64 // Minimum transaction ID
	MaxTXID uint64 // Maximum transaction ID
	Err     error  // Underlying error
	Hint    string // Recovery hint for users
}

LTXError provides detailed context for LTX file errors with recovery hints.

func NewLTXError

func NewLTXError(op, path string, level int, minTXID, maxTXID uint64, err error) *LTXError

NewLTXError creates a new LTX error with appropriate hints based on the error type.

func (*LTXError) Error

func (e *LTXError) Error() string

func (*LTXError) IsAutoRecoverable

func (e *LTXError) IsAutoRecoverable() bool

IsAutoRecoverable reports whether the underlying error indicates local state corruption that can be fixed by resetting and re-downloading from remote. Returns false for transient OS errors (EMFILE, EIO, EACCES) that should be retried with backoff instead.

func (*LTXError) Unwrap

func (e *LTXError) Unwrap() error

type Lease

type Lease struct {
	Generation int64     `json:"generation"`
	ExpiresAt  time.Time `json:"expires_at"`
	Owner      string    `json:"owner,omitempty"`
	ETag       string    `json:"-"`
}

func (*Lease) IsExpired

func (l *Lease) IsExpired() bool

func (*Lease) TTL

func (l *Lease) TTL() time.Duration

type LeaseExistsError

type LeaseExistsError struct {
	Owner     string
	ExpiresAt time.Time
}

func (*LeaseExistsError) Error

func (e *LeaseExistsError) Error() string

type Leaser

type Leaser interface {
	Type() string
	AcquireLease(ctx context.Context) (*Lease, error)
	RenewLease(ctx context.Context, lease *Lease) (*Lease, error)
	ReleaseLease(ctx context.Context, lease *Lease) error
}

type ListResponse

type ListResponse struct {
	Databases []DatabaseSummary `json:"databases"`
}

ListResponse is the response body for the /list endpoint.

type PosV3

type PosV3 struct {
	Generation string // 16-char hex string
	Index      int    // WAL index
	Offset     int64  // Offset within WAL segment
}

PosV3 represents a position in a v0.3.x backup.

func (PosV3) IsZero

func (p PosV3) IsZero() bool

IsZero returns true if the position is the zero value.

func (PosV3) String

func (p PosV3) String() string

String returns a string representation of the position.

type PrevFrameMismatchError

type PrevFrameMismatchError struct {
	Err error
}

func (*PrevFrameMismatchError) Error

func (e *PrevFrameMismatchError) Error() string

func (*PrevFrameMismatchError) Unwrap

func (e *PrevFrameMismatchError) Unwrap() error

type RegisterDatabaseRequest

type RegisterDatabaseRequest struct {
	Path       string `json:"path"`
	ReplicaURL string `json:"replica_url"`
}

RegisterDatabaseRequest is the request body for the /register endpoint.

type RegisterDatabaseResponse

type RegisterDatabaseResponse struct {
	Status string `json:"status"`
	Path   string `json:"path"`
}

RegisterDatabaseResponse is the response body for the /register endpoint.

type Replica

type Replica struct {

	// Client used to connect to the remote replica.
	Client ReplicaClient

	// Time between syncs with the shadow WAL.
	SyncInterval time.Duration

	// If true, replica monitors database for changes automatically.
	// Set to false if replica is being used synchronously (such as in tests).
	MonitorEnabled bool

	// If true, automatically reset local state when LTX errors are detected.
	// This allows recovery from corrupted/missing LTX files by resetting
	// the position file and removing local LTX files, forcing a fresh sync.
	// Disabled by default to prevent silent data loss scenarios.
	AutoRecoverEnabled bool

	// Encryption identities and recipients for age (X25519) E2E encryption.
	// AgeRecipients: public keys used to encrypt data on write (sync, snapshot).
	// AgeIdentities: private keys used to decrypt data on read (restore, calcPos).
	AgeIdentities []age.Identity
	AgeRecipients []age.Recipient
	// contains filtered or unexported fields
}

Replica connects a database to a replication destination via a ReplicaClient. The replica manages periodic synchronization and maintaining the current replica position.

func NewReplica

func NewReplica(db *DB) *Replica

func NewReplicaWithClient

func NewReplicaWithClient(db *DB, client ReplicaClient) *Replica

func (*Replica) CalcRestoreTarget

func (r *Replica) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (updatedAt time.Time, err error)

CalcRestoreTarget returns a target time restore from.

func (*Replica) CreatedAt

func (r *Replica) CreatedAt(ctx context.Context) (time.Time, error)

CreatedAt returns the earliest creation time of any LTX file. Returns zero time if no LTX files exist.

func (*Replica) DB

func (r *Replica) DB() *DB

DB returns a reference to the database the replica is attached to, if any.

func (*Replica) DecryptionEnabled

func (r *Replica) DecryptionEnabled() bool

DecryptionEnabled returns true if age decryption is configured for reads.

func (*Replica) EncryptionEnabled

func (r *Replica) EncryptionEnabled() bool

EncryptionEnabled returns true if age encryption is configured for writes.

func (*Replica) EnforceRetention

func (r *Replica) EnforceRetention(ctx context.Context) (err error)

EnforceRetention forces a new snapshot once the retention interval has passed. Older snapshots and WAL files are then removed.

func (*Replica) Logger

func (r *Replica) Logger() *slog.Logger

Logger returns the DB sub-logger for this replica.

func (*Replica) MaxLTXFileInfo

func (r *Replica) MaxLTXFileInfo(ctx context.Context, level int) (info ltx.FileInfo, err error)

MaxLTXFileInfo returns metadata about the last LTX file for a given level. Returns nil if no files exist for the level.

func (*Replica) OpenLTXFile

func (r *Replica) OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)

OpenLTXFile opens a remote LTX file, decrypting it if age identities are set. This should be used instead of Client.OpenLTXFile when the caller needs plaintext data (all Replica read paths).

func (*Replica) Pos

func (r *Replica) Pos() ltx.Pos

Pos returns the current replicated position. Returns a zero value if the current position cannot be determined.

func (*Replica) Restore

func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error)

Replica restores the database from a replica based on the options given. This method will restore into opt.OutputPath, if specified, or into the DB's original database path. It can optionally restore from a specific replica or it will automatically choose the best one. Finally, a timestamp can be specified to restore the database to a specific point-in-time.

When the replica contains both v0.3.x and LTX format backups, this method compares snapshots from both formats and uses whichever has the better backup: - With timestamp: uses the format with the most recent snapshot before timestamp - Without timestamp: uses the format with the most recent backup overall

func (*Replica) RestoreV3

func (r *Replica) RestoreV3(ctx context.Context, opt RestoreOptions) error

RestoreV3 restores from a v0.3.x format backup.

func (*Replica) SetPos

func (r *Replica) SetPos(pos ltx.Pos)

SetPos sets the current replicated position.

func (*Replica) Start

func (r *Replica) Start(ctx context.Context) error

Starts replicating in a background goroutine.

func (*Replica) Stop

func (r *Replica) Stop(hard bool) (err error)

Stop cancels any outstanding replication and blocks until finished.

Performing a hard stop will close the DB file descriptor which could release locks on per-process locks. Hard stops should only be performed when stopping the entire process.

func (*Replica) Sync

func (r *Replica) Sync(ctx context.Context) (err error)

Sync copies new WAL frames from the shadow WAL to the replica client. Only one Sync can run at a time to prevent concurrent uploads of the same file.

func (*Replica) TimeBounds

func (r *Replica) TimeBounds(ctx context.Context) (createdAt, updatedAt time.Time, err error)

TimeBounds returns the creation time & last updated time. Returns zero time if no LTX files exist.

func (*Replica) TimeBoundsV3

func (r *Replica) TimeBoundsV3(ctx context.Context, client ReplicaClientV3) (createdAt, updatedAt time.Time, err error)

TimeBoundsV3 returns the time bounds of v0.3.x backups. Returns zero times if no v0.3.x backups exist.

func (*Replica) ValidateLevel

func (r *Replica) ValidateLevel(ctx context.Context, level int) ([]ValidationError, error)

ValidateLevel checks LTX files at the given level are sorted and contiguous. Returns a slice of validation errors (empty if valid).

func (*Replica) WriteLTXFile

func (r *Replica) WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, rd io.Reader) (*ltx.FileInfo, error)

WriteLTXFile writes an LTX file to the replica, encrypting it if age recipients are set. This should be used instead of Client.WriteLTXFile when the caller provides plaintext data (all Replica write paths).

type ReplicaClient

type ReplicaClient interface {
	// Type returns the type of client.
	Type() string

	// Init initializes the replica client connection.
	// This may establish connections, validate configuration, etc.
	// Implementations should be idempotent (no-op if already initialized).
	Init(ctx context.Context) error

	// LTXFiles returns an iterator of all LTX files on the replica for a given level.
	// If seek is specified, the iterator start from the given TXID or the next available if not found.
	// If useMetadata is true, the iterator fetches accurate timestamps from metadata for timestamp-based restore.
	// When false, the iterator uses fast timestamps (LastModified/Created/ModTime) for normal operations.
	LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)

	// OpenLTXFile returns a reader that contains an LTX file at a given TXID.
	// If seek is specified, the reader will start at the given offset.
	// Returns an os.ErrNotFound error if the LTX file does not exist.
	OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)

	// WriteLTXFile writes an LTX file to the replica.
	// Returns metadata for the written file.
	WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error)

	// DeleteLTXFiles deletes one or more LTX files.
	DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error

	// DeleteAll deletes all files.
	DeleteAll(ctx context.Context) error

	// SetLogger sets the logger for the client.
	SetLogger(logger *slog.Logger)
}

ReplicaClient represents client to connect to a Replica.

func NewReplicaClientFromURL

func NewReplicaClientFromURL(rawURL string) (ReplicaClient, error)

NewReplicaClientFromURL creates a new ReplicaClient from a URL string. The URL scheme determines which backend is used (s3, gs, abs, file, etc.).

type ReplicaClientFactory

type ReplicaClientFactory func(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (ReplicaClient, error)

ReplicaClientFactory is a function that creates a ReplicaClient from URL components. The userinfo parameter contains credentials from the URL (e.g., user:pass@host).

type ReplicaClientV3

type ReplicaClientV3 interface {
	// GenerationsV3 returns a list of generation IDs in the replica.
	// Returns an empty slice if no v0.3.x backups exist.
	// Generation IDs are sorted in ascending order.
	GenerationsV3(ctx context.Context) ([]string, error)

	// SnapshotsV3 returns snapshots for a generation, sorted by index.
	// Returns an empty slice if no snapshots exist.
	SnapshotsV3(ctx context.Context, generation string) ([]SnapshotInfoV3, error)

	// WALSegmentsV3 returns WAL segments for a generation, sorted by index then offset.
	// Returns an empty slice if no WAL segments exist.
	WALSegmentsV3(ctx context.Context, generation string) ([]WALSegmentInfoV3, error)

	// OpenSnapshotV3 opens a v0.3.x snapshot for reading.
	// The returned reader provides LZ4-decompressed data.
	OpenSnapshotV3(ctx context.Context, generation string, index int) (io.ReadCloser, error)

	// OpenWALSegmentV3 opens a v0.3.x WAL segment for reading.
	// The returned reader provides LZ4-decompressed data.
	OpenWALSegmentV3(ctx context.Context, generation string, index int, offset int64) (io.ReadCloser, error)
}

ReplicaClientV3 reads v0.3.x backup data. ReplicaClient implementations that support v0.3.x restore should implement this interface.

type RestoreOptions

type RestoreOptions struct {
	// Target path to restore into.
	// If blank, the original DB path is used.
	OutputPath string

	// Specific transaction to restore to.
	// If zero, TXID is ignored.
	TXID ltx.TXID

	// Point-in-time to restore database.
	// If zero, database restore to most recent state available.
	Timestamp time.Time

	// Specifies how many WAL files are downloaded in parallel during restore.
	Parallelism int

	// Follow enables continuous restore mode, polling for new LTX files
	// and applying them to the restored database. Similar to tail -f.
	Follow bool

	// FollowInterval specifies how often to poll for new LTX files in follow mode.
	FollowInterval time.Duration

	// IntegrityCheck specifies the level of integrity checking after restore.
	// Zero value (IntegrityCheckNone) skips the check for backward compatibility.
	IntegrityCheck IntegrityCheckMode
}

RestoreOptions represents options for DB.Restore().

func NewRestoreOptions

func NewRestoreOptions() RestoreOptions

NewRestoreOptions returns a new instance of RestoreOptions with defaults.

type Server

type Server struct {

	// SocketPath is the path to the Unix socket.
	SocketPath string

	// SocketPerms is the file permissions for the socket.
	SocketPerms uint32

	// PathExpander optionally expands paths (e.g., ~ expansion).
	// If nil, paths are used as-is.
	PathExpander func(string) (string, error)

	// Version is the version string to report in /info.
	Version string
	// contains filtered or unexported fields
}

Server manages runtime control via Unix socket using HTTP.

func NewServer

func NewServer(store *Store) *Server

NewServer creates a new Server instance.

func (*Server) Close

func (s *Server) Close() error

Close gracefully shuts down the control server.

func (*Server) Start

func (s *Server) Start() error

Start begins listening for control connections.

type SnapshotInfoV3

type SnapshotInfoV3 struct {
	Generation string
	Index      int
	Size       int64
	CreatedAt  time.Time
}

SnapshotInfoV3 contains metadata about a v0.3.x snapshot.

func (SnapshotInfoV3) Pos

func (info SnapshotInfoV3) Pos() PosV3

Pos returns the position of this snapshot.

type SocketConfig

type SocketConfig struct {
	Enabled     bool   `yaml:"enabled"`
	Path        string `yaml:"path"`
	Permissions uint32 `yaml:"permissions"`
}

SocketConfig configures the Unix socket for control commands.

func DefaultSocketConfig

func DefaultSocketConfig() SocketConfig

DefaultSocketConfig returns the default socket configuration.

type StartRequest

type StartRequest struct {
	Path    string `json:"path"`
	Timeout int    `json:"timeout,omitempty"`
}

StartRequest is the request body for the /start endpoint.

type StartResponse

type StartResponse struct {
	Status string `json:"status"`
	Path   string `json:"path"`
}

StartResponse is the response body for the /start endpoint.

type StopRequest

type StopRequest struct {
	Path    string `json:"path"`
	Timeout int    `json:"timeout,omitempty"`
}

StopRequest is the request body for the /stop endpoint.

type StopResponse

type StopResponse struct {
	Status string `json:"status"`
	Path   string `json:"path"`
}

StopResponse is the response body for the /stop endpoint.

type Store

type Store struct {

	// The frequency of snapshots.
	SnapshotInterval time.Duration
	// The duration of time that snapshots are kept before being deleted.
	SnapshotRetention time.Duration

	// The duration that L0 files are kept after being compacted into L1.
	L0Retention time.Duration
	// How often to check for expired L0 files.
	L0RetentionCheckInterval time.Duration

	// If true, compaction is run in the background according to compaction levels.
	CompactionMonitorEnabled bool

	// If true, verify TXID consistency at destination level after each compaction.
	VerifyCompaction bool

	// RetentionEnabled controls whether Replicate actively deletes old files
	// during retention enforcement. When false, cloud provider lifecycle
	// policies handle retention instead. Local file cleanup still occurs.
	RetentionEnabled bool

	// Shutdown sync retry settings.
	ShutdownSyncTimeout  time.Duration
	ShutdownSyncInterval time.Duration

	// How often to check if heartbeat pings should be sent.
	HeartbeatCheckInterval time.Duration

	// Heartbeat client for health check pings. Sends pings only when
	// all databases have synced successfully within the heartbeat interval.
	Heartbeat *HeartbeatClient

	// How often to run validation checks. Zero disables periodic validation.
	ValidationInterval time.Duration

	Logger *slog.Logger
	// contains filtered or unexported fields
}

Store represents the top-level container for databases.

It manages async background tasks like compactions so that the system is not overloaded by too many concurrent tasks.

func NewStore

func NewStore(dbs []*DB, levels CompactionLevels) *Store

func (*Store) Close

func (s *Store) Close(ctx context.Context) (err error)

func (*Store) CompactDB

func (s *Store) CompactDB(ctx context.Context, db *DB, lvl *CompactionLevel) (*ltx.FileInfo, error)

CompactDB performs a compaction or snapshot for a given database on a single destination level. This function will only proceed if a compaction has not occurred before the last compaction time.

func (*Store) DBs

func (s *Store) DBs() []*DB

func (*Store) DisableDB

func (s *Store) DisableDB(ctx context.Context, path string) error

DisableDB stops replication for a database.

func (*Store) EnableDB

func (s *Store) EnableDB(ctx context.Context, path string) error

EnableDB starts replication for a registered database. The context is checked for cancellation before opening. Note: db.Open() itself does not support cancellation.

func (*Store) EnforceSnapshotRetention

func (s *Store) EnforceSnapshotRetention(ctx context.Context, db *DB) error

EnforceSnapshotRetention removes old snapshots by timestamp and then cleans up all lower levels based on minimum snapshot TXID.

func (*Store) FindDB

func (s *Store) FindDB(path string) *DB

FindDB returns the database with the given path.

func (*Store) Open

func (s *Store) Open(ctx context.Context) error

func (*Store) RegisterDB

func (s *Store) RegisterDB(db *DB) error

RegisterDB registers a new database with the store and starts monitoring it.

func (*Store) SetDone

func (s *Store) SetDone(done <-chan struct{})

SetDone sets the done channel used for interrupt handling during shutdown and propagates it to all managed databases.

func (*Store) SetL0Retention

func (s *Store) SetL0Retention(d time.Duration)

SetL0Retention updates the retention window for L0 files and propagates it to all managed databases.

func (*Store) SetRetentionEnabled

func (s *Store) SetRetentionEnabled(v bool)

func (*Store) SetShutdownSyncInterval

func (s *Store) SetShutdownSyncInterval(d time.Duration)

SetShutdownSyncInterval updates the shutdown sync interval and propagates it to all managed databases.

func (*Store) SetShutdownSyncTimeout

func (s *Store) SetShutdownSyncTimeout(d time.Duration)

SetShutdownSyncTimeout updates the shutdown sync timeout and propagates it to all managed databases.

func (*Store) SetVerifyCompaction

func (s *Store) SetVerifyCompaction(v bool)

SetVerifyCompaction updates the verify compaction flag and propagates it to all managed databases.

func (*Store) SnapshotLevel

func (s *Store) SnapshotLevel() *CompactionLevel

SnapshotLevel returns a pseudo compaction level based on snapshot settings.

func (*Store) SyncDB

func (s *Store) SyncDB(ctx context.Context, path string, wait bool) (SyncDBResult, error)

SyncDB forces an immediate sync for a database. If wait is true, blocks until both WAL-to-LTX and LTX-to-remote sync complete. If wait is false, only performs the WAL-to-LTX sync and lets the replica monitor handle upload. The timeout is best-effort as internal lock acquisition is not context-aware.

func (*Store) UnregisterDB

func (s *Store) UnregisterDB(ctx context.Context, path string) error

UnregisterDB stops monitoring the database at the provided path and closes it.

func (*Store) Validate

func (s *Store) Validate(ctx context.Context) (*ValidationResult, error)

Validate checks LTX file consistency across all databases and levels. SnapshotLevel (9) is excluded since snapshots are not contiguous.

type SyncDBResult

type SyncDBResult struct {
	TXID           uint64
	ReplicatedTXID uint64
	Changed        bool
}

SyncDBResult holds the result of a sync operation.

type SyncRequest

type SyncRequest struct {
	Path    string `json:"path"`
	Wait    bool   `json:"wait,omitempty"`
	Timeout int    `json:"timeout,omitempty"`
}

SyncRequest is the request body for the /sync endpoint.

type SyncResponse

type SyncResponse struct {
	Status         string `json:"status"`
	Path           string `json:"path"`
	TXID           uint64 `json:"txid"`
	ReplicatedTXID uint64 `json:"replicated_txid"`
}

SyncResponse is the response body for the /sync endpoint.

type SyncStatus

type SyncStatus struct {
	LocalTXID  ltx.TXID
	RemoteTXID ltx.TXID
	InSync     bool
}

SyncStatus represents the current replication state of the database.

type TXIDResponse

type TXIDResponse struct {
	TXID uint64 `json:"txid"`
}

TXIDResponse is the response body for the /txid endpoint.

type UnregisterDatabaseRequest

type UnregisterDatabaseRequest struct {
	Path    string `json:"path"`
	Timeout int    `json:"timeout,omitempty"`
}

UnregisterDatabaseRequest is the request body for the /unregister endpoint.

type UnregisterDatabaseResponse

type UnregisterDatabaseResponse struct {
	Status string `json:"status"`
	Path   string `json:"path"`
}

UnregisterDatabaseResponse is the response body for the /unregister endpoint.

type ValidationError

type ValidationError struct {
	Level    int           // compaction level
	Type     string        // "gap", "overlap", or "unsorted"
	Message  string        // human-readable description
	PrevFile *ltx.FileInfo // previous file
	CurrFile *ltx.FileInfo // current file that caused error
}

ValidationError represents a single validation issue.

type ValidationResult

type ValidationResult struct {
	Valid  bool              // true if no errors found
	Errors []ValidationError // all errors found
}

ValidationResult holds the result of validating a replica's LTX files.

type WALReader

type WALReader struct {
	// contains filtered or unexported fields
}

WALReader wraps an io.Reader and parses SQLite WAL frames.

This reader verifies the salt & checksum integrity while it reads. It does not enforce transaction boundaries (i.e. it may return uncommitted frames). It is the responsibility of the caller to handle this.

func NewWALReader

func NewWALReader(rd io.ReaderAt, logger *slog.Logger) (*WALReader, error)

NewWALReader returns a new instance of WALReader.

func NewWALReaderWithOffset

func NewWALReaderWithOffset(ctx context.Context, rd io.ReaderAt, offset int64, salt1, salt2 uint32, logger *slog.Logger) (*WALReader, error)

NewWALReaderWithOffset returns a new instance of WALReader at a given offset. Salt must match or else no frames will be returned. Checksum calculated from from previous page.

func (*WALReader) FrameSaltsUntil

func (r *WALReader) FrameSaltsUntil(ctx context.Context, until [2]uint32) (map[[2]uint32]struct{}, error)

FrameSaltsUntil returns a set of all unique frame salts in the WAL file.

func (*WALReader) Offset

func (r *WALReader) Offset() int64

Offset returns the file offset of the last read frame. Returns zero if no frames have been read.

func (*WALReader) PageMap

func (r *WALReader) PageMap(ctx context.Context) (m map[uint32]int64, maxOffset int64, commit uint32, err error)

PageMap reads all committed frames until the end of the file and returns a map of pgno to offset of the latest version of each page. Also returns the max offset of the wal segment read, and the final database size, in pages.

func (*WALReader) PageSize

func (r *WALReader) PageSize() uint32

PageSize returns the page size from the header. Must call ReadHeader() first.

func (*WALReader) ReadFrame

func (r *WALReader) ReadFrame(ctx context.Context, data []byte) (pgno, commit uint32, err error)

ReadFrame reads the next frame from the WAL and returns the page number. Returns io.EOF at the end of the valid WAL.

type WALSegmentInfoV3

type WALSegmentInfoV3 struct {
	Generation string
	Index      int
	Offset     int64
	Size       int64
	CreatedAt  time.Time
}

WALSegmentInfoV3 contains metadata about a v0.3.x WAL segment.

func (WALSegmentInfoV3) Pos

func (info WALSegmentInfoV3) Pos() PosV3

Pos returns the position of this WAL segment.

Directories

Path Synopsis
_examples
library/basic command
Example: Basic Replicate Library Usage
Example: Basic Replicate Library Usage
library/s3 command
Example: Replicate Library Usage with S3 and Restore-on-Startup
Example: Replicate Library Usage with S3 and Restore-on-Startup
cmd
replicate command
replicate-test command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL