Documentation
¶
Index ¶
- Constants
- Variables
- func CalcRestorePlan(ctx context.Context, client ReplicaClient, txID ltx.TXID, timestamp time.Time, ...) ([]*ltx.FileInfo, error)
- func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)
- func FetchPage(ctx context.Context, client ReplicaClient, level int, ...) (ltx.PageHeader, []byte, error)
- func FetchPageIndex(ctx context.Context, client ReplicaClient, info *ltx.FileInfo) (map[uint32]ltx.PageIndexElem, error)
- func FindLTXFiles(ctx context.Context, client ReplicaClient, level int, useMetadata bool, ...) ([]*ltx.FileInfo, error)
- func LTXDir(root string) string
- func LTXFilePath(root string, level int, minTXID, maxTXID ltx.TXID) string
- func LTXLevelDir(root string, level int) string
- func WALChecksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)
- type CompactionLevel
- type CompactionLevels
- func (a CompactionLevels) IsValidLevel(level int) bool
- func (a CompactionLevels) Level(level int) (*CompactionLevel, error)
- func (a CompactionLevels) MaxLevel() int
- func (a CompactionLevels) NextLevel(level int) int
- func (a CompactionLevels) PrevLevel(level int) int
- func (a CompactionLevels) Validate() error
- type DB
- func (db *DB) CRC64(ctx context.Context) (uint64, ltx.Pos, error)
- func (db *DB) Checkpoint(ctx context.Context, mode string) (err error)
- func (db *DB) Close(ctx context.Context) (err error)
- func (db *DB) Compact(ctx context.Context, dstLevel int) (*ltx.FileInfo, error)
- func (db *DB) DirInfo() os.FileInfo
- func (db *DB) EnforceRetentionByTXID(ctx context.Context, level int, txID ltx.TXID) (err error)
- func (db *DB) EnforceSnapshotRetention(ctx context.Context, timestamp time.Time) (minSnapshotTXID ltx.TXID, err error)
- func (db *DB) FileInfo() os.FileInfo
- func (db *DB) LTXDir() string
- func (db *DB) LTXLevelDir(level int) string
- func (db *DB) LTXPath(level int, minTXID, maxTXID ltx.TXID) string
- func (db *DB) MaxLTX() (minTXID, maxTXID ltx.TXID, err error)
- func (db *DB) MaxLTXFileInfo(ctx context.Context, level int) (ltx.FileInfo, error)
- func (db *DB) MetaPath() string
- func (db *DB) Notify() <-chan struct{}
- func (db *DB) Open() (err error)
- func (db *DB) PageSize() int
- func (db *DB) Path() string
- func (db *DB) Pos() (ltx.Pos, error)
- func (db *DB) SQLDB() *sql.DB
- func (db *DB) SetMetaPath(path string)
- func (db *DB) Snapshot(ctx context.Context) (*ltx.FileInfo, error)
- func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error)
- func (db *DB) Sync(ctx context.Context) (err error)
- func (db *DB) WALPath() string
- type PrevFrameMismatchError
- type Replica
- func (r *Replica) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (updatedAt time.Time, err error)
- func (r *Replica) CreatedAt(ctx context.Context) (time.Time, error)
- func (r *Replica) DB() *DB
- func (r *Replica) EnforceRetention(ctx context.Context) (err error)
- func (r *Replica) Logger() *slog.Logger
- func (r *Replica) MaxLTXFileInfo(ctx context.Context, level int) (info ltx.FileInfo, err error)
- func (r *Replica) Pos() ltx.Pos
- func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error)
- func (r *Replica) SetPos(pos ltx.Pos)
- func (r *Replica) Start(ctx context.Context) error
- func (r *Replica) Stop(hard bool) (err error)
- func (r *Replica) Sync(ctx context.Context) (err error)
- func (r *Replica) TimeBounds(ctx context.Context) (createdAt, updatedAt time.Time, err error)
- type ReplicaClient
- type RestoreOptions
- type Store
- func (s *Store) Close(ctx context.Context) (err error)
- func (s *Store) CompactDB(ctx context.Context, db *DB, lvl *CompactionLevel) (*ltx.FileInfo, error)
- func (s *Store) DBs() []*DB
- func (s *Store) EnforceSnapshotRetention(ctx context.Context, db *DB) error
- func (s *Store) Open(ctx context.Context) error
- func (s *Store) SnapshotLevel() *CompactionLevel
- type WALReader
- func (r *WALReader) FrameSaltsUntil(ctx context.Context, until [2]uint32) (map[[2]uint32]struct{}, error)
- func (r *WALReader) Offset() int64
- func (r *WALReader) PageMap(ctx context.Context) (m map[uint32]int64, maxOffset int64, commit uint32, err error)
- func (r *WALReader) PageSize() uint32
- func (r *WALReader) ReadFrame(ctx context.Context, data []byte) (pgno, commit uint32, err error)
Constants ¶
const ( DefaultMonitorInterval = 1 * time.Second DefaultCheckpointInterval = 1 * time.Minute DefaultBusyTimeout = 1 * time.Second DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 DefaultTruncatePageN = 500000 )
Default DB settings.
const ( CheckpointModePassive = "PASSIVE" CheckpointModeFull = "FULL" CheckpointModeRestart = "RESTART" CheckpointModeTruncate = "TRUNCATE" )
SQLite checkpoint modes.
const ( WALHeaderChecksumOffset = 24 WALFrameHeaderChecksumOffset = 16 )
SQLite WAL constants.
const ( // WALHeaderSize is the size of the WAL header, in bytes. WALHeaderSize = 32 // WALFrameHeaderSize is the size of the WAL frame header, in bytes. WALFrameHeaderSize = 24 )
const ( DefaultSnapshotInterval = 24 * time.Hour DefaultSnapshotRetention = 24 * time.Hour DefaultRetention = 24 * time.Hour DefaultRetentionCheckInterval = 1 * time.Hour )
Store defaults
const DefaultEstimatedPageIndexSize = 32 * 1024 // 32KB
DefaultEstimatedPageIndexSize is size that is first fetched when fetching the page index. If the fetch was smaller than the actual page index, another call is made to fetch the rest.
const DefaultRestoreParallelism = 8
DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const (
DefaultSyncInterval = 1 * time.Second
)
Default replica settings.
const (
MetaDirSuffix = "-litestream"
)
Naming constants.
const SnapshotLevel = 9
SnapshotLevel represents the level which full snapshots are held.
Variables ¶
var ( ErrNoSnapshots = errors.New("no snapshots available") ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") )
Litestream errors.
var ( // LogWriter is the destination writer for all logging. LogWriter = os.Stdout // LogFlags are the flags passed to log.New(). LogFlags = 0 )
var ( // ErrNoCompaction is returned when no new files are available from the previous level. ErrNoCompaction = errors.New("no compaction") // ErrCompactionTooEarly is returned when a compaction is attempted too soon // since the last compaction time. This is used to prevent frequent // re-compaction when restarting the process. ErrCompactionTooEarly = errors.New("compaction too early") // ErrTxNotAvailable is returned when a transaction does not exist. ErrTxNotAvailable = errors.New("transaction not available") )
var ErrStopIter = errors.New("stop iterator")
Functions ¶
func CalcRestorePlan ¶ added in v0.5.0
func CalcRestorePlan(ctx context.Context, client ReplicaClient, txID ltx.TXID, timestamp time.Time, logger *slog.Logger) ([]*ltx.FileInfo, error)
CalcRestorePlan returns a list of storage paths to restore a snapshot at the given TXID.
func FetchPage ¶ added in v0.5.0
func FetchPage(ctx context.Context, client ReplicaClient, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (ltx.PageHeader, []byte, error)
FetchPage fetches and decodes a single page frame from an LTX file.
func FetchPageIndex ¶ added in v0.5.0
func FetchPageIndex(ctx context.Context, client ReplicaClient, info *ltx.FileInfo) (map[uint32]ltx.PageIndexElem, error)
func FindLTXFiles ¶ added in v0.5.0
func FindLTXFiles(ctx context.Context, client ReplicaClient, level int, useMetadata bool, filter func(*ltx.FileInfo) (bool, error)) ([]*ltx.FileInfo, error)
FindLTXFiles returns a list of files that match filter. The useMetadata parameter is passed through to LTXFiles to control whether accurate timestamps are fetched from metadata. When true (timestamp-based restore), accurate timestamps are required. When false (normal operations), fast timestamps are sufficient.
func LTXFilePath ¶ added in v0.5.0
LTXFilePath returns the path to a single LTX file.
func LTXLevelDir ¶ added in v0.5.0
LTXLevelDir returns the path to an LTX level directory.
Types ¶
type CompactionLevel ¶ added in v0.5.0
type CompactionLevel struct {
// The numeric level. Must match the index in the list of levels.
Level int
// The frequency that the level is compacted from the previous level.
Interval time.Duration
}
CompactionLevel represents a single part of a multi-level compaction. Each level merges LTX files from the previous level into larger time granularities.
func (*CompactionLevel) NextCompactionAt ¶ added in v0.5.0
func (lvl *CompactionLevel) NextCompactionAt(now time.Time) time.Time
NextCompactionAt returns the time until the next compaction occurs. Returns the current time if it is exactly a multiple of the level interval.
func (*CompactionLevel) PrevCompactionAt ¶ added in v0.5.0
func (lvl *CompactionLevel) PrevCompactionAt(now time.Time) time.Time
PrevCompactionAt returns the time when the last compaction occurred. Returns the current time if it is exactly a multiple of the level interval.
type CompactionLevels ¶ added in v0.5.0
type CompactionLevels []*CompactionLevel
CompactionLevels represents a sorted slice of non-snapshot compaction levels.
func (CompactionLevels) IsValidLevel ¶ added in v0.5.0
func (a CompactionLevels) IsValidLevel(level int) bool
IsValidLevel returns true if level is a valid compaction level number.
func (CompactionLevels) Level ¶ added in v0.5.0
func (a CompactionLevels) Level(level int) (*CompactionLevel, error)
Level returns the compaction level at the given index. Returns an error if the index is a snapshot level or is out of bounds.
func (CompactionLevels) MaxLevel ¶ added in v0.5.0
func (a CompactionLevels) MaxLevel() int
MaxLevel return the highest non-snapshot compaction level.
func (CompactionLevels) NextLevel ¶ added in v0.5.0
func (a CompactionLevels) NextLevel(level int) int
NextLevel returns the next compaction level. Returns -1 if there is no next level.
func (CompactionLevels) PrevLevel ¶ added in v0.5.0
func (a CompactionLevels) PrevLevel(level int) int
PrevLevel returns the previous compaction level. Returns -1 if there is no previous level.
func (CompactionLevels) Validate ¶ added in v0.5.0
func (a CompactionLevels) Validate() error
Validate returns an error if the levels are invalid.
type DB ¶
type DB struct {
// Minimum threshold of WAL size, in pages, before a passive checkpoint.
// A passive checkpoint will attempt a checkpoint but fail if there are
// active transactions occurring at the same time.
MinCheckpointPageN int
// Maximum threshold of WAL size, in pages, before a forced checkpoint.
// A forced checkpoint will block new transactions and wait for existing
// transactions to finish before issuing a checkpoint and resetting the WAL.
//
// If zero, no checkpoints are forced. This can cause the WAL to grow
// unbounded if there are always read transactions occurring.
MaxCheckpointPageN int
// Threshold of WAL size, in pages, before a forced truncation checkpoint.
// A forced truncation checkpoint will block new transactions and wait for
// existing transactions to finish before issuing a checkpoint and
// truncating the WAL.
//
// If zero, no truncates are forced. This can cause the WAL to grow
// unbounded if there's a sudden spike of changes between other
// checkpoints.
TruncatePageN int
// Time between automatic checkpoints in the WAL. This is done to allow
// more fine-grained WAL files so that restores can be performed with
// better precision.
CheckpointInterval time.Duration
// Frequency at which to perform db sync.
MonitorInterval time.Duration
// The timeout to wait for EBUSY from SQLite.
BusyTimeout time.Duration
// Remote replica for the database.
// Must be set before calling Open().
Replica *Replica
// Where to send log messages, defaults to global slog with database epath.
Logger *slog.Logger
// contains filtered or unexported fields
}
DB represents a managed instance of a SQLite database in the file system.
func (*DB) CRC64 ¶ added in v0.3.0
CRC64 returns a CRC-64 ISO checksum of the database and its current position.
This function obtains a read lock so it prevents syncs from occurring until the operation is complete. The database will still be usable but it will be unable to checkpoint during this time.
If dst is set, the database file is copied to that location before checksum.
func (*DB) Checkpoint ¶ added in v0.3.0
Checkpoint performs a checkpoint on the WAL file.
func (*DB) Close ¶
Close flushes outstanding WAL writes to replicas, releases the read lock, and closes the database. Takes a context for final sync.
func (*DB) Compact ¶ added in v0.5.0
Compact performs a compaction of the LTX file at the previous level into dstLevel. Returns metadata for the newly written compaction file. Returns ErrNoCompaction if no new files are available to be compacted.
func (*DB) DirInfo ¶ added in v0.3.5
DirInfo returns the cached file stats for the parent directory of the database file when it was initialized.
func (*DB) EnforceRetentionByTXID ¶ added in v0.5.0
EnforceRetentionByTXID enforces retention so that any LTX files below the target TXID are deleted. Always keep at least one file.
func (*DB) EnforceSnapshotRetention ¶ added in v0.5.0
func (db *DB) EnforceSnapshotRetention(ctx context.Context, timestamp time.Time) (minSnapshotTXID ltx.TXID, err error)
EnforceSnapshotRetention enforces retention of the snapshot level in the database by timestamp.
func (*DB) FileInfo ¶ added in v0.3.5
FileInfo returns the cached file stats for the database file when it was initialized.
func (*DB) LTXLevelDir ¶ added in v0.5.0
LTXLevelDir returns path of the given LTX compaction level. Panics if level is negative.
func (*DB) LTXPath ¶ added in v0.5.0
LTXPath returns the local path of a single LTX file. Panics if level or either txn ID is negative.
func (*DB) MaxLTXFileInfo ¶ added in v0.5.0
MaxLTXFileInfo returns the metadata for the last LTX file in a level. If cached, it will returned the local copy. Otherwise, it fetches from the replica.
func (*DB) Notify ¶
func (db *DB) Notify() <-chan struct{}
Notify returns a channel that closes when the shadow WAL changes.
func (*DB) PageSize ¶
PageSize returns the page size of the underlying database. Only valid after database exists & Init() has successfully run.
func (*DB) SetMetaPath ¶ added in v0.3.10
SetMetaPath sets the path to database metadata.
func (*DB) Snapshot ¶ added in v0.5.0
SnapshotDB writes a snapshot to the replica for the current position of the database.
func (*DB) SnapshotReader ¶ added in v0.5.0
SnapshotReader returns the current position of the database & a reader that contains a full database snapshot.
type PrevFrameMismatchError ¶ added in v0.5.0
type PrevFrameMismatchError struct {
Err error
}
func (*PrevFrameMismatchError) Error ¶ added in v0.5.0
func (e *PrevFrameMismatchError) Error() string
func (*PrevFrameMismatchError) Unwrap ¶ added in v0.5.0
func (e *PrevFrameMismatchError) Unwrap() error
type Replica ¶
type Replica struct {
// Client used to connect to the remote replica.
Client ReplicaClient
// Time between syncs with the shadow WAL.
SyncInterval time.Duration
// If true, replica monitors database for changes automatically.
// Set to false if replica is being used synchronously (such as in tests).
MonitorEnabled bool
// Encryption identities and recipients
AgeIdentities []age.Identity
AgeRecipients []age.Recipient
// contains filtered or unexported fields
}
Replica connects a database to a replication destination via a ReplicaClient. The replica manages periodic synchronization and maintaining the current replica position.
func NewReplica ¶ added in v0.3.5
func NewReplicaWithClient ¶ added in v0.5.0
func NewReplicaWithClient(db *DB, client ReplicaClient) *Replica
func (*Replica) CalcRestoreTarget ¶ added in v0.3.5
func (r *Replica) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (updatedAt time.Time, err error)
CalcRestoreTarget returns a target time restore from.
func (*Replica) CreatedAt ¶ added in v0.5.0
CreatedAt returns the earliest creation time of any LTX file. Returns zero time if no LTX files exist.
func (*Replica) DB ¶ added in v0.3.0
DB returns a reference to the database the replica is attached to, if any.
func (*Replica) EnforceRetention ¶ added in v0.3.5
EnforceRetention forces a new snapshot once the retention interval has passed. Older snapshots and WAL files are then removed.
func (*Replica) MaxLTXFileInfo ¶ added in v0.5.0
MaxLTXFileInfo returns metadata about the last LTX file for a given level. Returns nil if no files exist for the level.
func (*Replica) Pos ¶ added in v0.3.5
Pos returns the current replicated position. Returns a zero value if the current position cannot be determined.
func (*Replica) Restore ¶ added in v0.3.5
func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error)
Replica restores the database from a replica based on the options given. This method will restore into opt.OutputPath, if specified, or into the DB's original database path. It can optionally restore from a specific replica or it will automatically choose the best one. Finally, a timestamp can be specified to restore the database to a specific point-in-time.
func (*Replica) Stop ¶
Stop cancels any outstanding replication and blocks until finished.
Performing a hard stop will close the DB file descriptor which could release locks on per-process locks. Hard stops should only be performed when stopping the entire process.
type ReplicaClient ¶ added in v0.3.5
type ReplicaClient interface {
// Type returns the type of client.
Type() string
// LTXFiles returns an iterator of all LTX files on the replica for a given level.
// If seek is specified, the iterator start from the given TXID or the next available if not found.
// If useMetadata is true, the iterator fetches accurate timestamps from metadata for timestamp-based restore.
// When false, the iterator uses fast timestamps (LastModified/Created/ModTime) for normal operations.
LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error)
// OpenLTXFile returns a reader that contains an LTX file at a given TXID.
// If seek is specified, the reader will start at the given offset.
// Returns an os.ErrNotFound error if the LTX file does not exist.
OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
// WriteLTXFile writes an LTX file to the replica.
// Returns metadata for the written file.
WriteLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, r io.Reader) (*ltx.FileInfo, error)
// DeleteLTXFiles deletes one or more LTX files.
DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error
// DeleteAll deletes all files.
DeleteAll(ctx context.Context) error
}
ReplicaClient represents client to connect to a Replica.
type RestoreOptions ¶
type RestoreOptions struct {
// Target path to restore into.
// If blank, the original DB path is used.
OutputPath string
// Specific transaction to restore to.
// If zero, TXID is ignored.
TXID ltx.TXID
// Point-in-time to restore database.
// If zero, database restore to most recent state available.
Timestamp time.Time
// Specifies how many WAL files are downloaded in parallel during restore.
Parallelism int
}
RestoreOptions represents options for DB.Restore().
func NewRestoreOptions ¶ added in v0.2.0
func NewRestoreOptions() RestoreOptions
NewRestoreOptions returns a new instance of RestoreOptions with defaults.
type Store ¶ added in v0.5.0
type Store struct {
// The frequency of snapshots.
SnapshotInterval time.Duration
// The duration of time that snapshots are kept before being deleted.
SnapshotRetention time.Duration
// If true, compaction is run in the background according to compaction levels.
CompactionMonitorEnabled bool
// contains filtered or unexported fields
}
Store represents the top-level container for databases.
It manages async background tasks like compactions so that the system is not overloaded by too many concurrent tasks.
func NewStore ¶ added in v0.5.0
func NewStore(dbs []*DB, levels CompactionLevels) *Store
func (*Store) CompactDB ¶ added in v0.5.0
CompactDB performs a compaction or snapshot for a given database on a single destination level. This function will only proceed if a compaction has not occurred before the last compaction time.
func (*Store) EnforceSnapshotRetention ¶ added in v0.5.0
EnforceSnapshotRetention removes old snapshots by timestamp and then cleans up all lower levels based on minimum snapshot TXID.
func (*Store) SnapshotLevel ¶ added in v0.5.0
func (s *Store) SnapshotLevel() *CompactionLevel
SnapshotLevel returns a pseudo compaction level based on snapshot settings.
type WALReader ¶ added in v0.5.0
type WALReader struct {
// contains filtered or unexported fields
}
WALReader wraps an io.Reader and parses SQLite WAL frames.
This reader verifies the salt & checksum integrity while it reads. It does not enforce transaction boundaries (i.e. it may return uncommitted frames). It is the responsibility of the caller to handle this.
func NewWALReader ¶ added in v0.5.0
NewWALReader returns a new instance of WALReader.
func NewWALReaderWithOffset ¶ added in v0.5.0
func NewWALReaderWithOffset(ctx context.Context, rd io.ReaderAt, offset int64, salt1, salt2 uint32, logger *slog.Logger) (*WALReader, error)
NewWALReaderWithOffset returns a new instance of WALReader at a given offset. Salt must match or else no frames will be returned. Checksum calculated from from previous page.
func (*WALReader) FrameSaltsUntil ¶ added in v0.5.0
func (r *WALReader) FrameSaltsUntil(ctx context.Context, until [2]uint32) (map[[2]uint32]struct{}, error)
FrameSaltsUntil returns a set of all unique frame salts in the WAL file.
func (*WALReader) Offset ¶ added in v0.5.0
Offset returns the file offset of the last read frame. Returns zero if no frames have been read.
func (*WALReader) PageMap ¶ added in v0.5.0
func (r *WALReader) PageMap(ctx context.Context) (m map[uint32]int64, maxOffset int64, commit uint32, err error)
PageMap reads all committed frames until the end of the file and returns a map of pgno to offset of the latest version of each page. Also returns the max offset of the wal segment read, and the final database size, in pages.