Documentation
¶
Index ¶
- Constants
- Variables
- func ApplyWAL(ctx context.Context, dbPath, walPath string) error
- func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)
- func ComparePos(a, b Pos) (int, error)
- func FindLatestGeneration(ctx context.Context, client ReplicaClient) (generation string, err error)
- func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error)
- func FindMaxSnapshotIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error)
- func FindMaxWALIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error)
- func FindSnapshotForIndex(ctx context.Context, client ReplicaClient, generation string, index int) (int, error)
- func FormatIndex(index int) string
- func FormatOffset(offset int64) string
- func GenerationTimeBounds(ctx context.Context, client ReplicaClient, generation string) (createdAt, updatedAt time.Time, err error)
- func IsGenerationName(s string) bool
- func ParseIndex(s string) (int, error)
- func ParseOffset(s string) (int64, error)
- func ReadWALFields(r io.Reader, pageSize int) (salt0, salt1, chksum0, chksum1 uint32, byteOrder binary.ByteOrder, ...)
- func ReplicaClientTimeBounds(ctx context.Context, client ReplicaClient) (min, max time.Time, err error)
- func Restore(ctx context.Context, client ReplicaClient, filename, generation string, ...) (err error)
- func RestoreSnapshot(ctx context.Context, client ReplicaClient, filename, generation string, ...) error
- func SnapshotTimeBounds(ctx context.Context, client ReplicaClient, generation string) (min, max time.Time, err error)
- func WALTimeBounds(ctx context.Context, client ReplicaClient, generation string) (min, max time.Time, err error)
- type BufferedWALSegmentIterator
- type DB
- func (db *DB) Checkpoint(ctx context.Context, mode string) (err error)
- func (db *DB) Close() (err error)
- func (db *DB) CloseWALSegmentIterator(itr *FileWALSegmentIterator) error
- func (db *DB) CurrentGeneration() (string, error)
- func (db *DB) GenerationNamePath() string
- func (db *DB) GenerationPath(generation string) string
- func (db *DB) MetaPath() string
- func (db *DB) NotifyCh() chan<- struct{}
- func (db *DB) Open() (err error)
- func (db *DB) PageSize() int
- func (db *DB) Path() string
- func (db *DB) Pos() Pos
- func (db *DB) PositionPath() string
- func (db *DB) Replica(name string) *Replica
- func (db *DB) SHMPath() string
- func (db *DB) SQLDB() *sql.DB
- func (db *DB) ShadowWALDir(generation string) string
- func (db *DB) Sync(ctx context.Context) error
- func (db *DB) UpdatedAt() (time.Time, error)
- func (db *DB) WALPath() string
- func (db *DB) WALReader(ctx context.Context, generation string, index int) (_ io.ReadCloser, err error)
- func (db *DB) WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error)
- func (db *DB) WALSegments(ctx context.Context, generation string) (*FileWALSegmentIterator, error)
- func (db *DB) WithFile(fn func(f *os.File) error) error
- type FileReplicaClient
- func (c *FileReplicaClient) DeleteGeneration(ctx context.Context, generation string) error
- func (c *FileReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error
- func (c *FileReplicaClient) DeleteWALSegments(ctx context.Context, a []Pos) error
- func (c *FileReplicaClient) GenerationDir(generation string) (string, error)
- func (c *FileReplicaClient) Generations(ctx context.Context) ([]string, error)
- func (c *FileReplicaClient) GenerationsDir() (string, error)
- func (c *FileReplicaClient) Path() string
- func (c *FileReplicaClient) SnapshotPath(generation string, index int) (string, error)
- func (c *FileReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
- func (c *FileReplicaClient) Snapshots(ctx context.Context, generation string) (SnapshotIterator, error)
- func (c *FileReplicaClient) SnapshotsDir(generation string) (string, error)
- func (c *FileReplicaClient) Type() string
- func (c *FileReplicaClient) WALDir(generation string) (string, error)
- func (c *FileReplicaClient) WALSegmentPath(generation string, index int, offset int64) (string, error)
- func (c *FileReplicaClient) WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error)
- func (c *FileReplicaClient) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error)
- func (c *FileReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info SnapshotInfo, err error)
- func (c *FileReplicaClient) WriteWALSegment(ctx context.Context, pos Pos, rd io.Reader) (info WALSegmentInfo, err error)
- type FileWALSegmentIterator
- func (itr *FileWALSegmentIterator) Append(info WALSegmentInfo) error
- func (itr *FileWALSegmentIterator) Close() (err error)
- func (itr *FileWALSegmentIterator) Err() error
- func (itr *FileWALSegmentIterator) Generation() string
- func (itr *FileWALSegmentIterator) Indexes() []int
- func (itr *FileWALSegmentIterator) Next() bool
- func (itr *FileWALSegmentIterator) NotifyCh() <-chan struct{}
- func (itr *FileWALSegmentIterator) SetErr(err error)
- func (itr *FileWALSegmentIterator) WALSegment() WALSegmentInfo
- type Pos
- type Replica
- func (r *Replica) Client() ReplicaClient
- func (r *Replica) Close() (err error)
- func (r *Replica) DB() *DB
- func (r *Replica) EnforceRetention(ctx context.Context) (err error)
- func (r *Replica) GenerationCreatedAt(ctx context.Context, generation string) (time.Time, error)
- func (r *Replica) Name() string
- func (r *Replica) Pos() Pos
- func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error)
- func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
- func (r *Replica) Snapshots(ctx context.Context) ([]SnapshotInfo, error)
- func (r *Replica) Start(ctx context.Context)
- func (r *Replica) Stop()
- func (r *Replica) Sync(ctx context.Context) (err error)
- type ReplicaClient
- type RestoreOptions
- type Server
- type SnapshotInfo
- type SnapshotInfoSlice
- type SnapshotInfoSliceIterator
- type SnapshotIterator
- type StreamClient
- type StreamReader
- type StreamRecordHeader
- type WALDownloader
- type WALInfo
- type WALInfoSlice
- type WALNotFoundError
- type WALSegmentInfo
- type WALSegmentInfoSlice
- type WALSegmentInfoSliceIterator
- type WALSegmentIterator
- type WALWriter
Constants ¶
const ( DefaultMonitorDelayInterval = 10 * time.Millisecond DefaultCheckpointInterval = 1 * time.Minute DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 DefaultShadowRetentionN = 32 )
Default DB settings.
const ( WALHeaderChecksumOffset = 24 WALFrameHeaderChecksumOffset = 16 )
SQLite WAL constants
const ( MetaDirSuffix = "-litestream" WALDirName = "wal" WALExt = ".wal" WALSegmentExt = ".wal.lz4" SnapshotExt = ".snapshot.lz4" GenerationNameLen = 16 )
Naming constants.
const ( CheckpointModePassive = "PASSIVE" CheckpointModeFull = "FULL" CheckpointModeRestart = "RESTART" CheckpointModeTruncate = "TRUNCATE" )
SQLite checkpoint modes.
const ( // WALHeaderSize is the size of the WAL header, in bytes. WALHeaderSize = 32 // WALFrameHeaderSize is the size of the WAL frame header, in bytes. WALFrameHeaderSize = 24 // WALIndexHeaderSize is the size of the SHM index header, in bytes. WALIndexHeaderSize = 136 )
const ( StreamRecordTypeSnapshot = 1 StreamRecordTypeWALSegment = 2 )
const ( DefaultSyncInterval = 1 * time.Second DefaultRetention = 24 * time.Hour DefaultRetentionCheckInterval = 1 * time.Hour )
Default replica settings.
const BusyTimeout = 1 * time.Second
BusyTimeout is the timeout to wait for EBUSY from SQLite.
const DefaultRestoreParallelism = 8
DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const FileReplicaClientType = "file"
FileReplicaClientType is the client type for file replica clients.
const MaxIndex = 0x7FFFFFFF
MaxIndex is the maximum possible WAL index. If this index is reached then a new generation will be started.
const StreamRecordHeaderSize = 0 + 4 + 4 + 8 + 8 + 8 + 8 // generation, index, offset, size
Variables ¶
var ( ErrDBClosed = errors.New("database closed") ErrNoGeneration = errors.New("no generation available") ErrGenerationChanged = errors.New("generation changed") ErrNoSnapshots = errors.New("no snapshots available") ErrNoWALSegments = errors.New("no wal segments available") ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") )
Litestream errors.
var ( // LogWriter is the destination writer for all logging. LogWriter = os.Stdout // LogFlags are the flags passed to log.New(). LogFlags = 0 )
var Tracef = func(format string, a ...interface{}) {}
Tracef is used for low-level tracing.
Functions ¶
func ComparePos ¶
ComparePos returns -1 if a is less than b, 1 if a is greater than b, and returns 0 if a and b are equal. Only index & offset are compared. Returns an error if generations are not equal.
func FindLatestGeneration ¶
func FindLatestGeneration(ctx context.Context, client ReplicaClient) (generation string, err error)
FindLatestGeneration returns the most recent generation for a client.
func FindMaxIndexByGeneration ¶
func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error)
FindMaxIndexByGeneration returns the last index within a generation. Returns ErrNoSnapshots if no index exists on the replica for the generation.
func FindMaxSnapshotIndexByGeneration ¶
func FindMaxSnapshotIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error)
FindMaxSnapshotIndexByGeneration returns the last snapshot index within a generation. Returns ErrNoSnapshots if no snapshots exist for the generation on the replica.
func FindMaxWALIndexByGeneration ¶
func FindMaxWALIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error)
FindMaxWALIndexByGeneration returns the last WAL index within a generation. Returns ErrNoWALSegments if no segments exist for the generation on the replica.
func FindSnapshotForIndex ¶
func FindSnapshotForIndex(ctx context.Context, client ReplicaClient, generation string, index int) (int, error)
FindSnapshotForIndex returns the highest index for a snapshot within a generation that occurs before a given index.
func FormatOffset ¶
FormatOffset formats an offset as a hex value.
func GenerationTimeBounds ¶
func GenerationTimeBounds(ctx context.Context, client ReplicaClient, generation string) (createdAt, updatedAt time.Time, err error)
GenerationTimeBounds returns the creation time & last updated time of a generation. Returns ErrNoSnapshots if no data exists for the generation.
func IsGenerationName ¶
IsGenerationName returns true if s is the correct length and is only lowercase hex characters.
func ParseIndex ¶
ParseIndex parses a hex-formatted index into an integer.
func ParseOffset ¶
ParseOffset parses a hex-formatted offset into an integer.
func ReadWALFields ¶
func ReadWALFields(r io.Reader, pageSize int) (salt0, salt1, chksum0, chksum1 uint32, byteOrder binary.ByteOrder, hdr, frame []byte, err error)
ReadWALFields iterates over the header & frames in the WAL data in r. Returns salt, checksum, byte order & the last frame. WAL data must start from the beginning of the WAL header and must end on either the WAL header or at the end of a WAL frame.
func ReplicaClientTimeBounds ¶
func ReplicaClientTimeBounds(ctx context.Context, client ReplicaClient) (min, max time.Time, err error)
ReplicaClientTimeBounds returns time range covered by a replica client across all generations. It scans the time range of all generations and computes the lower and upper bounds of them.
func Restore ¶
func Restore(ctx context.Context, client ReplicaClient, filename, generation string, snapshotIndex, targetIndex int, opt RestoreOptions) (err error)
Restore restores the database to the given index on a generation.
func RestoreSnapshot ¶
func RestoreSnapshot(ctx context.Context, client ReplicaClient, filename, generation string, index int, mode os.FileMode, uid, gid int) error
RestoreSnapshot copies a snapshot from the replica client to a file.
func SnapshotTimeBounds ¶
func SnapshotTimeBounds(ctx context.Context, client ReplicaClient, generation string) (min, max time.Time, err error)
SnapshotTimeBounds returns the minimum and maximum snapshot timestamps within a generation. Returns ErrNoSnapshots if no data exists for the generation.
func WALTimeBounds ¶
func WALTimeBounds(ctx context.Context, client ReplicaClient, generation string) (min, max time.Time, err error)
WALTimeBounds returns the minimum and maximum snapshot timestamps. Returns ErrNoWALSegments if no data exists for the generation.
Types ¶
type BufferedWALSegmentIterator ¶
type BufferedWALSegmentIterator struct {
// contains filtered or unexported fields
}
func NewBufferedWALSegmentIterator ¶
func NewBufferedWALSegmentIterator(itr WALSegmentIterator) *BufferedWALSegmentIterator
NewBufferedWALSegmentIterator returns a new instance of BufferedWALSegmentIterator.
func (*BufferedWALSegmentIterator) Close ¶
func (itr *BufferedWALSegmentIterator) Close() error
Close closes the underlying iterator.
func (*BufferedWALSegmentIterator) Err ¶
func (itr *BufferedWALSegmentIterator) Err() error
Returns an error that occurred during iteration.
func (*BufferedWALSegmentIterator) Next ¶
func (itr *BufferedWALSegmentIterator) Next() bool
Next returns the next segment. If buffer is full, this call is a no-op.
func (*BufferedWALSegmentIterator) Peek ¶
func (itr *BufferedWALSegmentIterator) Peek() (info WALSegmentInfo, ok bool)
Peek returns the next segment without moving the iterator forward.
func (*BufferedWALSegmentIterator) WALSegment ¶
func (itr *BufferedWALSegmentIterator) WALSegment() WALSegmentInfo
Returns metadata for the currently positioned WAL segment file.
type DB ¶
type DB struct {
// Client used to receive live, upstream changes. If specified, then
// DB should be used as read-only as local changes will conflict with
// upstream changes.
StreamClient StreamClient
// Minimum threshold of WAL size, in pages, before a passive checkpoint.
// A passive checkpoint will attempt a checkpoint but fail if there are
// active transactions occurring at the same time.
MinCheckpointPageN int
// Maximum threshold of WAL size, in pages, before a forced checkpoint.
// A forced checkpoint will block new transactions and wait for existing
// transactions to finish before issuing a checkpoint and resetting the WAL.
//
// If zero, no checkpoints are forced. This can cause the WAL to grow
// unbounded if there are always read transactions occurring.
MaxCheckpointPageN int
// Number of shadow WAL indexes to retain. This keeps files long enough for
// live replicas to retrieve the data but allows files to eventually be removed.
ShadowRetentionN int
// Time after receiving change notification before reading next WAL segment.
// Used for batching changes into fewer files instead of every transaction
// creating its own file.
MonitorDelayInterval time.Duration
// Time between automatic checkpoints in the WAL. This is done to allow
// more fine-grained WAL files so that restores can be performed with
// better precision.
CheckpointInterval time.Duration
// List of replicas for the database.
// Must be set before calling Open().
Replicas []*Replica
Logger *log.Logger
// contains filtered or unexported fields
}
DB represents a managed instance of a SQLite database in the file system.
func (*DB) Checkpoint ¶ added in v0.3.0
Checkpoint performs a checkpoint on the WAL file.
func (*DB) Close ¶
Close flushes outstanding WAL writes to replicas, releases the read lock, and closes the database.
func (*DB) CloseWALSegmentIterator ¶
func (db *DB) CloseWALSegmentIterator(itr *FileWALSegmentIterator) error
CloseWALSegmentIterator removes itr from the list of managed iterators.
func (*DB) CurrentGeneration ¶
CurrentGeneration returns the name of the generation saved to the "generation" file in the meta data directory. Returns empty string if none exists.
func (*DB) GenerationNamePath ¶
GenerationNamePath returns the path of the name of the current generation.
func (*DB) GenerationPath ¶
GenerationPath returns the path of a single generation. Panics if generation is blank.
func (*DB) NotifyCh ¶
func (db *DB) NotifyCh() chan<- struct{}
NotifyCh returns a channel that can be used to signal changes in the DB.
func (*DB) PageSize ¶
PageSize returns the page size of the underlying database. Only valid after database exists & Init() has successfully run.
func (*DB) Pos ¶ added in v0.2.0
Pos returns the cached position of the database. Returns a zero position if no position has been calculated or if there is no generation.
func (*DB) PositionPath ¶
PositionPath returns the path of the file that stores the current position. This file is only used to communicate state to external processes.
func (*DB) ShadowWALDir ¶ added in v0.2.0
ShadowWALDir returns the path of the shadow wal directory. Panics if generation is blank.
func (*DB) WALReader ¶
func (db *DB) WALReader(ctx context.Context, generation string, index int) (_ io.ReadCloser, err error)
WALReader returns the entire uncompressed WAL file for a given index.
func (*DB) WALSegmentReader ¶
WALSegmentReader returns a reader for a section of WAL data at the given position. Returns os.ErrNotExist if no matching index/offset is found.
func (*DB) WALSegments ¶
WALSegments returns an iterator over all available WAL files for a generation.
type FileReplicaClient ¶
type FileReplicaClient struct {
// File info
FileMode os.FileMode
DirMode os.FileMode
Uid, Gid int
// contains filtered or unexported fields
}
FileReplicaClient is a client for writing snapshots & WAL segments to disk.
func NewFileReplicaClient ¶
func NewFileReplicaClient(path string) *FileReplicaClient
NewFileReplicaClient returns a new instance of FileReplicaClient.
func (*FileReplicaClient) DeleteGeneration ¶
func (c *FileReplicaClient) DeleteGeneration(ctx context.Context, generation string) error
DeleteGeneration deletes all snapshots & WAL segments within a generation.
func (*FileReplicaClient) DeleteSnapshot ¶
DeleteSnapshot deletes a snapshot with the given generation & index.
func (*FileReplicaClient) DeleteWALSegments ¶
func (c *FileReplicaClient) DeleteWALSegments(ctx context.Context, a []Pos) error
DeleteWALSegments deletes WAL segments at the given positions.
func (*FileReplicaClient) GenerationDir ¶
func (c *FileReplicaClient) GenerationDir(generation string) (string, error)
GenerationDir returns the path to a generation's root directory.
func (*FileReplicaClient) Generations ¶
func (c *FileReplicaClient) Generations(ctx context.Context) ([]string, error)
Generations returns a list of available generation names.
func (*FileReplicaClient) GenerationsDir ¶
func (c *FileReplicaClient) GenerationsDir() (string, error)
GenerationsDir returns the path to a generation root directory.
func (*FileReplicaClient) Path ¶
func (c *FileReplicaClient) Path() string
Path returns the destination path to replicate the database to.
func (*FileReplicaClient) SnapshotPath ¶
func (c *FileReplicaClient) SnapshotPath(generation string, index int) (string, error)
SnapshotPath returns the path to an uncompressed snapshot file.
func (*FileReplicaClient) SnapshotReader ¶
func (c *FileReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
SnapshotReader returns a reader for snapshot data at the given generation/index. Returns os.ErrNotExist if no matching index is found.
func (*FileReplicaClient) Snapshots ¶
func (c *FileReplicaClient) Snapshots(ctx context.Context, generation string) (SnapshotIterator, error)
Snapshots returns an iterator over all available snapshots for a generation.
func (*FileReplicaClient) SnapshotsDir ¶
func (c *FileReplicaClient) SnapshotsDir(generation string) (string, error)
SnapshotsDir returns the path to a generation's snapshot directory.
func (*FileReplicaClient) Type ¶
func (c *FileReplicaClient) Type() string
Type returns "file" as the client type.
func (*FileReplicaClient) WALDir ¶
func (c *FileReplicaClient) WALDir(generation string) (string, error)
WALDir returns the path to a generation's WAL directory
func (*FileReplicaClient) WALSegmentPath ¶
func (c *FileReplicaClient) WALSegmentPath(generation string, index int, offset int64) (string, error)
WALSegmentPath returns the path to a WAL segment file.
func (*FileReplicaClient) WALSegmentReader ¶
func (c *FileReplicaClient) WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error)
WALSegmentReader returns a reader for a section of WAL data at the given position. Returns os.ErrNotExist if no matching index/offset is found.
func (*FileReplicaClient) WALSegments ¶
func (c *FileReplicaClient) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error)
WALSegments returns an iterator over all available WAL files for a generation.
func (*FileReplicaClient) WriteSnapshot ¶
func (c *FileReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info SnapshotInfo, err error)
WriteSnapshot writes LZ4 compressed data from rd into a file on disk.
func (*FileReplicaClient) WriteWALSegment ¶
func (c *FileReplicaClient) WriteWALSegment(ctx context.Context, pos Pos, rd io.Reader) (info WALSegmentInfo, err error)
WriteWALSegment writes LZ4 compressed data from rd into a file on disk.
type FileWALSegmentIterator ¶
type FileWALSegmentIterator struct {
// contains filtered or unexported fields
}
func NewFileWALSegmentIterator ¶
func NewFileWALSegmentIterator(dir, generation string, indexes []int) *FileWALSegmentIterator
func (*FileWALSegmentIterator) Append ¶
func (itr *FileWALSegmentIterator) Append(info WALSegmentInfo) error
Append add an additional WAL segment to the end of the iterator. This function expects that info will always be later than all previous infos that the iterator has or has seen.
func (*FileWALSegmentIterator) Close ¶
func (itr *FileWALSegmentIterator) Close() (err error)
func (*FileWALSegmentIterator) Err ¶
func (itr *FileWALSegmentIterator) Err() error
Err returns the first error that occurs on the iterator.
func (*FileWALSegmentIterator) Generation ¶
func (itr *FileWALSegmentIterator) Generation() string
Generation returns the generation this iterator was initialized with.
func (*FileWALSegmentIterator) Indexes ¶
func (itr *FileWALSegmentIterator) Indexes() []int
Indexes returns the pending indexes. Only used for testing.
func (*FileWALSegmentIterator) Next ¶
func (itr *FileWALSegmentIterator) Next() bool
func (*FileWALSegmentIterator) NotifyCh ¶
func (itr *FileWALSegmentIterator) NotifyCh() <-chan struct{}
func (*FileWALSegmentIterator) SetErr ¶
func (itr *FileWALSegmentIterator) SetErr(err error)
SetErr sets the error on the iterator and notifies it of the change.
func (*FileWALSegmentIterator) WALSegment ¶
func (itr *FileWALSegmentIterator) WALSegment() WALSegmentInfo
type Pos ¶
type Pos struct {
Generation string // generation name
Index int // wal file index
Offset int64 // offset within wal file
}
Pos is a position in the WAL for a generation.
type Replica ¶
type Replica struct {
// Time between syncs with the shadow WAL.
SyncInterval time.Duration
// Frequency to create new snapshots.
SnapshotInterval time.Duration
// Time to keep snapshots and related WAL files.
// Database is snapshotted after interval, if needed, and older WAL files are discarded.
Retention time.Duration
// Time between checks for retention.
RetentionCheckInterval time.Duration
// Time between validation checks.
ValidationInterval time.Duration
// If true, replica monitors database for changes automatically.
// Set to false if replica is being used synchronously (such as in tests).
MonitorEnabled bool
Logger *log.Logger
// contains filtered or unexported fields
}
Replica connects a database to a replication destination via a ReplicaClient. The replica manages periodic synchronization and maintaining the current replica position.
func LatestReplica ¶
LatestReplica returns the most recently updated replica.
func NewReplica ¶ added in v0.3.5
func NewReplica(db *DB, name string, client ReplicaClient) *Replica
func (*Replica) Client ¶ added in v0.3.5
func (r *Replica) Client() ReplicaClient
Client returns the client the replica was initialized with.
func (*Replica) Close ¶
Close will close the DB file descriptor which could release locks on per-process locks (e.g. non-Linux OSes).
func (*Replica) DB ¶ added in v0.3.0
DB returns a reference to the database the replica is attached to, if any.
func (*Replica) EnforceRetention ¶ added in v0.3.5
EnforceRetention forces a new snapshot once the retention interval has passed. Older snapshots and WAL files are then removed.
func (*Replica) GenerationCreatedAt ¶ added in v0.3.5
GenerationCreatedAt returns the earliest creation time of any snapshot. Returns zero time if no snapshots exist.
func (*Replica) Pos ¶ added in v0.3.5
Pos returns the current replicated position. Returns a zero value if the current position cannot be determined.
func (*Replica) Snapshot ¶ added in v0.3.5
func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error)
Snapshot copies the entire database to the replica path.
func (*Replica) SnapshotIndexAt ¶
func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
SnapshotIndexAt returns the highest index for a snapshot within a generation that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
func (*Replica) Snapshots ¶ added in v0.2.0
func (r *Replica) Snapshots(ctx context.Context) ([]SnapshotInfo, error)
Snapshots returns a list of all snapshots across all generations.
type ReplicaClient ¶ added in v0.3.5
type ReplicaClient interface {
// Returns the type of client.
Type() string
// Returns a list of available generations.
Generations(ctx context.Context) ([]string, error)
// Deletes all snapshots & WAL segments within a generation.
DeleteGeneration(ctx context.Context, generation string) error
// Returns an iterator of all snapshots within a generation on the replica.
Snapshots(ctx context.Context, generation string) (SnapshotIterator, error)
// Writes LZ4 compressed snapshot data to the replica at a given index
// within a generation. Returns metadata for the snapshot.
WriteSnapshot(ctx context.Context, generation string, index int, r io.Reader) (SnapshotInfo, error)
// Deletes a snapshot with the given generation & index.
DeleteSnapshot(ctx context.Context, generation string, index int) error
// Returns a reader that contains LZ4 compressed snapshot data for a
// given index within a generation. Returns an os.ErrNotFound error if
// the snapshot does not exist.
SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)
// Returns an iterator of all WAL segments within a generation on the replica.
WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error)
// Writes an LZ4 compressed WAL segment at a given position.
// Returns metadata for the written segment.
WriteWALSegment(ctx context.Context, pos Pos, r io.Reader) (WALSegmentInfo, error)
// Deletes one or more WAL segments at the given positions.
DeleteWALSegments(ctx context.Context, a []Pos) error
// Returns a reader that contains an LZ4 compressed WAL segment at a given
// index/offset within a generation. Returns an os.ErrNotFound error if the
// WAL segment does not exist.
WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error)
}
ReplicaClient represents client to connect to a Replica.
type RestoreOptions ¶
type RestoreOptions struct {
// File info used for restored snapshot & WAL files.
Mode os.FileMode
Uid, Gid int
// Specifies how many WAL files are downloaded in parallel during restore.
Parallelism int
// Logging settings.
Logger *log.Logger
LogPrefix string
}
RestoreOptions represents options for DB.Restore().
func NewRestoreOptions ¶ added in v0.2.0
func NewRestoreOptions() RestoreOptions
NewRestoreOptions returns a new instance of RestoreOptions with defaults.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server represents the top-level container. It manage databases and routes global file system events.
type SnapshotInfo ¶ added in v0.2.0
SnapshotInfo represents file information about a snapshot.
func FilterSnapshotsAfter ¶ added in v0.3.0
func FilterSnapshotsAfter(a []SnapshotInfo, t time.Time) []SnapshotInfo
FilterSnapshotsAfter returns all snapshots that were created on or after t.
func FindMinSnapshotByGeneration ¶ added in v0.3.0
func FindMinSnapshotByGeneration(a []SnapshotInfo, generation string) *SnapshotInfo
FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
func SliceSnapshotIterator ¶ added in v0.3.5
func SliceSnapshotIterator(itr SnapshotIterator) ([]SnapshotInfo, error)
SliceSnapshotIterator returns all snapshots from an iterator as a slice.
func (*SnapshotInfo) Pos ¶ added in v0.3.5
func (info *SnapshotInfo) Pos() Pos
Pos returns the WAL position when the snapshot was made.
type SnapshotInfoSlice ¶ added in v0.3.5
type SnapshotInfoSlice []SnapshotInfo
SnapshotInfoSlice represents a slice of snapshot metadata.
func (SnapshotInfoSlice) Len ¶ added in v0.3.5
func (a SnapshotInfoSlice) Len() int
func (SnapshotInfoSlice) Less ¶ added in v0.3.5
func (a SnapshotInfoSlice) Less(i, j int) bool
func (SnapshotInfoSlice) Swap ¶ added in v0.3.5
func (a SnapshotInfoSlice) Swap(i, j int)
type SnapshotInfoSliceIterator ¶ added in v0.3.5
type SnapshotInfoSliceIterator struct {
// contains filtered or unexported fields
}
SnapshotInfoSliceIterator represents an iterator for iterating over a slice of snapshots.
func NewSnapshotInfoSliceIterator ¶ added in v0.3.5
func NewSnapshotInfoSliceIterator(a []SnapshotInfo) *SnapshotInfoSliceIterator
NewSnapshotInfoSliceIterator returns a new instance of SnapshotInfoSliceIterator.
func (*SnapshotInfoSliceIterator) Close ¶ added in v0.3.5
func (itr *SnapshotInfoSliceIterator) Close() error
Close always returns nil.
func (*SnapshotInfoSliceIterator) Err ¶ added in v0.3.5
func (itr *SnapshotInfoSliceIterator) Err() error
Err always returns nil.
func (*SnapshotInfoSliceIterator) Next ¶ added in v0.3.5
func (itr *SnapshotInfoSliceIterator) Next() bool
Next moves to the next snapshot. Returns true if another snapshot is available.
func (*SnapshotInfoSliceIterator) Snapshot ¶ added in v0.3.5
func (itr *SnapshotInfoSliceIterator) Snapshot() SnapshotInfo
Snapshot returns the metadata from the currently positioned snapshot.
type SnapshotIterator ¶ added in v0.3.5
type SnapshotIterator interface {
io.Closer
// Prepares the next snapshot for reading with the Snapshot() method.
// Returns true if another snapshot is available. Returns false if no more
// snapshots are available or if an error occurred.
Next() bool
// Returns an error that occurred during iteration.
Err() error
// Returns metadata for the currently positioned snapshot.
Snapshot() SnapshotInfo
}
SnapshotIterator represents an iterator over a collection of snapshot metadata.
type StreamClient ¶
type StreamClient interface {
// Stream returns a reader which contains and optional snapshot followed
// by a series of WAL segments. This stream begins from the given position.
Stream(ctx context.Context, pos Pos) (StreamReader, error)
}
StreamClient represents a client for streaming changes to a replica DB.
type StreamReader ¶
type StreamReader interface {
io.ReadCloser
PageSize() int
Next() (*StreamRecordHeader, error)
}
StreamReader represents a reader that streams snapshot and WAL records.
type StreamRecordHeader ¶
type StreamRecordHeader struct {
Type int
Flags int
Generation string
Index int
Offset int64
Size int64
}
func (*StreamRecordHeader) MarshalBinary ¶
func (hdr *StreamRecordHeader) MarshalBinary() ([]byte, error)
func (*StreamRecordHeader) Pos ¶
func (hdr *StreamRecordHeader) Pos() Pos
func (*StreamRecordHeader) UnmarshalBinary ¶
func (hdr *StreamRecordHeader) UnmarshalBinary(data []byte) error
UnmarshalBinary from data into hdr.
type WALDownloader ¶
type WALDownloader struct {
// File info used for downloaded WAL files.
Mode os.FileMode
Uid, Gid int
// Number of downloads occurring in parallel.
Parallelism int
// contains filtered or unexported fields
}
WALDownloader represents a parallel downloader of WAL files from a replica client.
It works on a per-index level so WAL files are always downloaded in their entirety and are not segmented. WAL files are downloaded from minIndex to maxIndex, inclusively, and are written to a path prefix. WAL files are named with the prefix and suffixed with the WAL index. It is the responsibility of the caller to clean up these WAL files.
The purpose of the parallization is that RTT & WAL apply time can consume much of the restore time so it's useful to download multiple WAL files in the background to minimize the latency. While some WAL indexes may be downloaded out of order, the WALDownloader ensures that Next() always returns the WAL files sequentially.
func NewWALDownloader ¶
func NewWALDownloader(client ReplicaClient, prefix string, generation string, minIndex, maxIndex int) *WALDownloader
NewWALDownloader returns a new instance of WALDownloader.
func (*WALDownloader) Close ¶
func (d *WALDownloader) Close() (err error)
Close cancels all downloads and returns any error that has occurred.
func (*WALDownloader) N ¶
func (d *WALDownloader) N() int
N returns the number of WAL files returned by Next().
type WALInfoSlice ¶ added in v0.3.5
type WALInfoSlice []WALInfo
WALInfoSlice represents a slice of WAL metadata.
func (WALInfoSlice) Len ¶ added in v0.3.5
func (a WALInfoSlice) Len() int
func (WALInfoSlice) Less ¶ added in v0.3.5
func (a WALInfoSlice) Less(i, j int) bool
func (WALInfoSlice) Swap ¶ added in v0.3.5
func (a WALInfoSlice) Swap(i, j int)
type WALNotFoundError ¶
WALNotFoundError is returned by WALDownloader if an WAL index is not found.
func (*WALNotFoundError) Error ¶
func (e *WALNotFoundError) Error() string
Error returns the error string.
type WALSegmentInfo ¶ added in v0.3.5
type WALSegmentInfo struct {
Generation string
Index int
Offset int64
Size int64
CreatedAt time.Time
}
WALSegmentInfo represents file information about a WAL segment file.
func SliceWALSegmentIterator ¶ added in v0.3.5
func SliceWALSegmentIterator(itr WALSegmentIterator) ([]WALSegmentInfo, error)
SliceWALSegmentIterator returns all WAL segment files from an iterator as a slice.
func (*WALSegmentInfo) Pos ¶ added in v0.3.5
func (info *WALSegmentInfo) Pos() Pos
Pos returns the WAL position when the segment was made.
type WALSegmentInfoSlice ¶ added in v0.3.5
type WALSegmentInfoSlice []WALSegmentInfo
WALSegmentInfoSlice represents a slice of WAL segment metadata.
func (WALSegmentInfoSlice) Len ¶ added in v0.3.5
func (a WALSegmentInfoSlice) Len() int
func (WALSegmentInfoSlice) Less ¶ added in v0.3.5
func (a WALSegmentInfoSlice) Less(i, j int) bool
func (WALSegmentInfoSlice) Swap ¶ added in v0.3.5
func (a WALSegmentInfoSlice) Swap(i, j int)
type WALSegmentInfoSliceIterator ¶ added in v0.3.5
type WALSegmentInfoSliceIterator struct {
// contains filtered or unexported fields
}
WALSegmentInfoSliceIterator represents an iterator for iterating over a slice of wal segments.
func NewWALSegmentInfoSliceIterator ¶ added in v0.3.5
func NewWALSegmentInfoSliceIterator(a []WALSegmentInfo) *WALSegmentInfoSliceIterator
NewWALSegmentInfoSliceIterator returns a new instance of WALSegmentInfoSliceIterator.
func (*WALSegmentInfoSliceIterator) Close ¶ added in v0.3.5
func (itr *WALSegmentInfoSliceIterator) Close() error
Close always returns nil.
func (*WALSegmentInfoSliceIterator) Err ¶ added in v0.3.5
func (itr *WALSegmentInfoSliceIterator) Err() error
Err always returns nil.
func (*WALSegmentInfoSliceIterator) Next ¶ added in v0.3.5
func (itr *WALSegmentInfoSliceIterator) Next() bool
Next moves to the next wal segment. Returns true if another segment is available.
func (*WALSegmentInfoSliceIterator) WALSegment ¶ added in v0.3.5
func (itr *WALSegmentInfoSliceIterator) WALSegment() WALSegmentInfo
WALSegment returns the metadata from the currently positioned wal segment.
type WALSegmentIterator ¶ added in v0.3.5
type WALSegmentIterator interface {
io.Closer
// Prepares the next WAL for reading with the WAL() method.
// Returns true if another WAL is available. Returns false if no more
// WAL files are available or if an error occurred.
Next() bool
// Returns an error that occurred during iteration.
Err() error
// Returns metadata for the currently positioned WAL segment file.
WALSegment() WALSegmentInfo
}
WALSegmentIterator represents an iterator over a collection of WAL segments.
type WALWriter ¶
type WALWriter struct {
Salt0, Salt1 uint32
// contains filtered or unexported fields
}
WALWriter represents a writer to a SQLite WAL file.
func NewWALWriter ¶
NewWALWriter returns a new instance of WALWriter.
func (*WALWriter) WriteFrame ¶
func (*WALWriter) WriteHeader ¶
WriteHeader writes the WAL header to the beginning of the file.