litestream

package module
v0.4.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2022 License: Apache-2.0 Imports: 26 Imported by: 10

README

Litestream GitHub release (latest by date) Status GitHub Docker Pulls test

Litestream is a standalone streaming replication tool for SQLite. It runs as a background process and safely replicates changes incrementally to another file or S3. Litestream only communicates with SQLite through the SQLite API so it will not corrupt your database.

If you need support or have ideas for improving Litestream, please join the Litestream Slack or visit the GitHub Discussions. Please visit the Litestream web site for installation instructions and documentation.

If you find this project interesting, please consider starring the project on GitHub.

Acknowledgements

While the Litestream project does not accept external code patches, many of the most valuable contributions are in the forms of testing, feedback, and documentation. These help harden software and streamline usage for other users.

I want to give special thanks to individuals who invest much of their time and energy into the project to help make it better:

Huge thanks to fly.io for their support and for contributing credits for testing and development!

Contribution Policy

Initially, Litestream was closed to outside contributions. The goal was to reduce burnout by limiting the maintenance overhead of reviewing and validating third-party code. However, this policy is overly broad and has prevented small, easily testable patches from being contributed.

Litestream is now open to code contributions for bug fixes only. Features carry a long-term maintenance burden so they will not be accepted at this time. Please submit an issue if you have a feature you'd like to request.

If you find mistakes in the documentation, please submit a fix to the documentation repository.

Documentation

Index

Constants

View Source
const (
	DefaultMonitorDelayInterval = 10 * time.Millisecond
	DefaultCheckpointInterval   = 1 * time.Minute

	DefaultMinCheckpointPageN = 1000
	DefaultMaxCheckpointPageN = 10000
	DefaultShadowRetentionN   = 32
)

Default DB settings.

View Source
const (
	WALHeaderChecksumOffset      = 24
	WALFrameHeaderChecksumOffset = 16
)

SQLite WAL constants

View Source
const (
	MetaDirSuffix = "-litestream"

	WALDirName    = "wal"
	WALExt        = ".wal"
	WALSegmentExt = ".wal.lz4"
	SnapshotExt   = ".snapshot.lz4"

	GenerationNameLen = 16
)

Naming constants.

View Source
const (
	CheckpointModePassive  = "PASSIVE"
	CheckpointModeFull     = "FULL"
	CheckpointModeRestart  = "RESTART"
	CheckpointModeTruncate = "TRUNCATE"
)

SQLite checkpoint modes.

View Source
const (
	// WALHeaderSize is the size of the WAL header, in bytes.
	WALHeaderSize = 32

	// WALFrameHeaderSize is the size of the WAL frame header, in bytes.
	WALFrameHeaderSize = 24

	// WALIndexHeaderSize is the size of the SHM index header, in bytes.
	WALIndexHeaderSize = 136
)
View Source
const (
	StreamRecordTypeSnapshot   = 1
	StreamRecordTypeWALSegment = 2
)
View Source
const (
	DefaultSyncInterval           = 1 * time.Second
	DefaultRetention              = 24 * time.Hour
	DefaultRetentionCheckInterval = 1 * time.Hour
)

Default replica settings.

View Source
const BusyTimeout = 1 * time.Second

BusyTimeout is the timeout to wait for EBUSY from SQLite.

View Source
const DefaultRestoreParallelism = 8

DefaultRestoreParallelism is the default parallelism when downloading WAL files.

View Source
const FileReplicaClientType = "file"

FileReplicaClientType is the client type for file replica clients.

View Source
const MaxIndex = 0x7FFFFFFF

MaxIndex is the maximum possible WAL index. If this index is reached then a new generation will be started.

View Source
const StreamRecordHeaderSize = 0 +
	4 + 4 +

	8 + 8 + 8 + 8 // generation, index, offset, size

Variables

View Source
var (
	ErrDBClosed          = errors.New("database closed")
	ErrNoGeneration      = errors.New("no generation available")
	ErrGenerationChanged = errors.New("generation changed")
	ErrNoSnapshots       = errors.New("no snapshots available")
	ErrNoWALSegments     = errors.New("no wal segments available")
	ErrChecksumMismatch  = errors.New("invalid replica, checksum mismatch")
)

Litestream errors.

View Source
var (
	// LogWriter is the destination writer for all logging.
	LogWriter = os.Stdout

	// LogFlags are the flags passed to log.New().
	LogFlags = 0
)
View Source
var Tracef = func(format string, a ...interface{}) {}

Tracef is used for low-level tracing.

Functions

func ApplyWAL

func ApplyWAL(ctx context.Context, dbPath, walPath string) error

ApplyWAL performs a truncating checkpoint on the given database.

func Checksum

func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)

Checksum computes a running SQLite checksum over a byte slice.

func ComparePos

func ComparePos(a, b Pos) (int, error)

ComparePos returns -1 if a is less than b, 1 if a is greater than b, and returns 0 if a and b are equal. Only index & offset are compared. Returns an error if generations are not equal.

func FindLatestGeneration

func FindLatestGeneration(ctx context.Context, client ReplicaClient) (generation string, err error)

FindLatestGeneration returns the most recent generation for a client.

func FindMaxIndexByGeneration

func FindMaxIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error)

FindMaxIndexByGeneration returns the last index within a generation. Returns ErrNoSnapshots if no index exists on the replica for the generation.

func FindMaxSnapshotIndexByGeneration

func FindMaxSnapshotIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error)

FindMaxSnapshotIndexByGeneration returns the last snapshot index within a generation. Returns ErrNoSnapshots if no snapshots exist for the generation on the replica.

func FindMaxWALIndexByGeneration

func FindMaxWALIndexByGeneration(ctx context.Context, client ReplicaClient, generation string) (index int, err error)

FindMaxWALIndexByGeneration returns the last WAL index within a generation. Returns ErrNoWALSegments if no segments exist for the generation on the replica.

func FindSnapshotForIndex

func FindSnapshotForIndex(ctx context.Context, client ReplicaClient, generation string, index int) (int, error)

FindSnapshotForIndex returns the highest index for a snapshot within a generation that occurs before a given index.

func FormatIndex

func FormatIndex(index int) string

FormatIndex formats an index as a hex value.

func FormatOffset

func FormatOffset(offset int64) string

FormatOffset formats an offset as a hex value.

func GenerationTimeBounds

func GenerationTimeBounds(ctx context.Context, client ReplicaClient, generation string) (createdAt, updatedAt time.Time, err error)

GenerationTimeBounds returns the creation time & last updated time of a generation. Returns ErrNoSnapshots if no data exists for the generation.

func IsGenerationName

func IsGenerationName(s string) bool

IsGenerationName returns true if s is the correct length and is only lowercase hex characters.

func ParseIndex

func ParseIndex(s string) (int, error)

ParseIndex parses a hex-formatted index into an integer.

func ParseOffset

func ParseOffset(s string) (int64, error)

ParseOffset parses a hex-formatted offset into an integer.

func ReadWALFields

func ReadWALFields(r io.Reader, pageSize int) (salt0, salt1, chksum0, chksum1 uint32, byteOrder binary.ByteOrder, hdr, frame []byte, err error)

ReadWALFields iterates over the header & frames in the WAL data in r. Returns salt, checksum, byte order & the last frame. WAL data must start from the beginning of the WAL header and must end on either the WAL header or at the end of a WAL frame.

func ReplicaClientTimeBounds

func ReplicaClientTimeBounds(ctx context.Context, client ReplicaClient) (min, max time.Time, err error)

ReplicaClientTimeBounds returns time range covered by a replica client across all generations. It scans the time range of all generations and computes the lower and upper bounds of them.

func Restore

func Restore(ctx context.Context, client ReplicaClient, filename, generation string, snapshotIndex, targetIndex int, opt RestoreOptions) (err error)

Restore restores the database to the given index on a generation.

func RestoreSnapshot

func RestoreSnapshot(ctx context.Context, client ReplicaClient, filename, generation string, index int, mode os.FileMode, uid, gid int) error

RestoreSnapshot copies a snapshot from the replica client to a file.

func SnapshotTimeBounds

func SnapshotTimeBounds(ctx context.Context, client ReplicaClient, generation string) (min, max time.Time, err error)

SnapshotTimeBounds returns the minimum and maximum snapshot timestamps within a generation. Returns ErrNoSnapshots if no data exists for the generation.

func WALTimeBounds

func WALTimeBounds(ctx context.Context, client ReplicaClient, generation string) (min, max time.Time, err error)

WALTimeBounds returns the minimum and maximum snapshot timestamps. Returns ErrNoWALSegments if no data exists for the generation.

Types

type BufferedWALSegmentIterator

type BufferedWALSegmentIterator struct {
	// contains filtered or unexported fields
}

func NewBufferedWALSegmentIterator

func NewBufferedWALSegmentIterator(itr WALSegmentIterator) *BufferedWALSegmentIterator

NewBufferedWALSegmentIterator returns a new instance of BufferedWALSegmentIterator.

func (*BufferedWALSegmentIterator) Close

func (itr *BufferedWALSegmentIterator) Close() error

Close closes the underlying iterator.

func (*BufferedWALSegmentIterator) Err

func (itr *BufferedWALSegmentIterator) Err() error

Returns an error that occurred during iteration.

func (*BufferedWALSegmentIterator) Next

func (itr *BufferedWALSegmentIterator) Next() bool

Next returns the next segment. If buffer is full, this call is a no-op.

func (*BufferedWALSegmentIterator) Peek

func (itr *BufferedWALSegmentIterator) Peek() (info WALSegmentInfo, ok bool)

Peek returns the next segment without moving the iterator forward.

func (*BufferedWALSegmentIterator) WALSegment

func (itr *BufferedWALSegmentIterator) WALSegment() WALSegmentInfo

Returns metadata for the currently positioned WAL segment file.

type DB

type DB struct {

	// Client used to receive live, upstream changes. If specified, then
	// DB should be used as read-only as local changes will conflict with
	// upstream changes.
	StreamClient StreamClient

	// Minimum threshold of WAL size, in pages, before a passive checkpoint.
	// A passive checkpoint will attempt a checkpoint but fail if there are
	// active transactions occurring at the same time.
	MinCheckpointPageN int

	// Maximum threshold of WAL size, in pages, before a forced checkpoint.
	// A forced checkpoint will block new transactions and wait for existing
	// transactions to finish before issuing a checkpoint and resetting the WAL.
	//
	// If zero, no checkpoints are forced. This can cause the WAL to grow
	// unbounded if there are always read transactions occurring.
	MaxCheckpointPageN int

	// Number of shadow WAL indexes to retain. This keeps files long enough for
	// live replicas to retrieve the data but allows files to eventually be removed.
	ShadowRetentionN int

	// Time after receiving change notification before reading next WAL segment.
	// Used for batching changes into fewer files instead of every transaction
	// creating its own file.
	MonitorDelayInterval time.Duration

	// Time between automatic checkpoints in the WAL. This is done to allow
	// more fine-grained WAL files so that restores can be performed with
	// better precision.
	CheckpointInterval time.Duration

	// List of replicas for the database.
	// Must be set before calling Open().
	Replicas []*Replica

	Logger *log.Logger
	// contains filtered or unexported fields
}

DB represents a managed instance of a SQLite database in the file system.

func NewDB

func NewDB(path string) *DB

NewDB returns a new instance of DB for a given path.

func (*DB) Checkpoint added in v0.3.0

func (db *DB) Checkpoint(ctx context.Context, mode string) (err error)

Checkpoint performs a checkpoint on the WAL file.

func (*DB) Close

func (db *DB) Close() (err error)

Close flushes outstanding WAL writes to replicas, releases the read lock, and closes the database.

func (*DB) CloseWALSegmentIterator

func (db *DB) CloseWALSegmentIterator(itr *FileWALSegmentIterator) error

CloseWALSegmentIterator removes itr from the list of managed iterators.

func (*DB) CurrentGeneration

func (db *DB) CurrentGeneration() (string, error)

CurrentGeneration returns the name of the generation saved to the "generation" file in the meta data directory. Returns empty string if none exists.

func (*DB) GenerationNamePath

func (db *DB) GenerationNamePath() string

GenerationNamePath returns the path of the name of the current generation.

func (*DB) GenerationPath

func (db *DB) GenerationPath(generation string) string

GenerationPath returns the path of a single generation. Panics if generation is blank.

func (*DB) MetaPath

func (db *DB) MetaPath() string

MetaPath returns the path to the database metadata.

func (*DB) NotifyCh

func (db *DB) NotifyCh() chan<- struct{}

NotifyCh returns a channel that can be used to signal changes in the DB.

func (*DB) Open

func (db *DB) Open() (err error)

Open initializes the background monitoring goroutine.

func (*DB) PageSize

func (db *DB) PageSize() int

PageSize returns the page size of the underlying database. Only valid after database exists & Init() has successfully run.

func (*DB) Path

func (db *DB) Path() string

Path returns the path to the database.

func (*DB) Pos added in v0.2.0

func (db *DB) Pos() Pos

Pos returns the cached position of the database. Returns a zero position if no position has been calculated or if there is no generation.

func (*DB) PositionPath

func (db *DB) PositionPath() string

PositionPath returns the path of the file that stores the current position. This file is only used to communicate state to external processes.

func (*DB) Replica added in v0.2.0

func (db *DB) Replica(name string) *Replica

Replica returns a replica by name.

func (*DB) SHMPath

func (db *DB) SHMPath() string

SHMPath returns the path to the database's shared memory file.

func (*DB) SQLDB added in v0.3.0

func (db *DB) SQLDB() *sql.DB

SQLDB returns a reference to the underlying sql.DB connection.

func (*DB) ShadowWALDir added in v0.2.0

func (db *DB) ShadowWALDir(generation string) string

ShadowWALDir returns the path of the shadow wal directory. Panics if generation is blank.

func (*DB) Sync

func (db *DB) Sync(ctx context.Context) error

Sync copies pending data from the WAL to the shadow WAL.

func (*DB) UpdatedAt

func (db *DB) UpdatedAt() (time.Time, error)

UpdatedAt returns the last modified time of the database or WAL file.

func (*DB) WALPath

func (db *DB) WALPath() string

WALPath returns the path to the database's WAL file.

func (*DB) WALReader

func (db *DB) WALReader(ctx context.Context, generation string, index int) (_ io.ReadCloser, err error)

WALReader returns the entire uncompressed WAL file for a given index.

func (*DB) WALSegmentReader

func (db *DB) WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error)

WALSegmentReader returns a reader for a section of WAL data at the given position. Returns os.ErrNotExist if no matching index/offset is found.

func (*DB) WALSegments

func (db *DB) WALSegments(ctx context.Context, generation string) (*FileWALSegmentIterator, error)

WALSegments returns an iterator over all available WAL files for a generation.

func (*DB) WithFile

func (db *DB) WithFile(fn func(f *os.File) error) error

WithFile executes fn with a file handle for the main database file. On Linux, this is a unique file handle for each call. On non-Linux systems, the file handle is shared because of lock semantics.

type FileReplicaClient

type FileReplicaClient struct {

	// File info
	FileMode os.FileMode
	DirMode  os.FileMode
	Uid, Gid int
	// contains filtered or unexported fields
}

FileReplicaClient is a client for writing snapshots & WAL segments to disk.

func NewFileReplicaClient

func NewFileReplicaClient(path string) *FileReplicaClient

NewFileReplicaClient returns a new instance of FileReplicaClient.

func (*FileReplicaClient) DeleteGeneration

func (c *FileReplicaClient) DeleteGeneration(ctx context.Context, generation string) error

DeleteGeneration deletes all snapshots & WAL segments within a generation.

func (*FileReplicaClient) DeleteSnapshot

func (c *FileReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error

DeleteSnapshot deletes a snapshot with the given generation & index.

func (*FileReplicaClient) DeleteWALSegments

func (c *FileReplicaClient) DeleteWALSegments(ctx context.Context, a []Pos) error

DeleteWALSegments deletes WAL segments at the given positions.

func (*FileReplicaClient) GenerationDir

func (c *FileReplicaClient) GenerationDir(generation string) (string, error)

GenerationDir returns the path to a generation's root directory.

func (*FileReplicaClient) Generations

func (c *FileReplicaClient) Generations(ctx context.Context) ([]string, error)

Generations returns a list of available generation names.

func (*FileReplicaClient) GenerationsDir

func (c *FileReplicaClient) GenerationsDir() (string, error)

GenerationsDir returns the path to a generation root directory.

func (*FileReplicaClient) Path

func (c *FileReplicaClient) Path() string

Path returns the destination path to replicate the database to.

func (*FileReplicaClient) SnapshotPath

func (c *FileReplicaClient) SnapshotPath(generation string, index int) (string, error)

SnapshotPath returns the path to an uncompressed snapshot file.

func (*FileReplicaClient) SnapshotReader

func (c *FileReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)

SnapshotReader returns a reader for snapshot data at the given generation/index. Returns os.ErrNotExist if no matching index is found.

func (*FileReplicaClient) Snapshots

func (c *FileReplicaClient) Snapshots(ctx context.Context, generation string) (SnapshotIterator, error)

Snapshots returns an iterator over all available snapshots for a generation.

func (*FileReplicaClient) SnapshotsDir

func (c *FileReplicaClient) SnapshotsDir(generation string) (string, error)

SnapshotsDir returns the path to a generation's snapshot directory.

func (*FileReplicaClient) Type

func (c *FileReplicaClient) Type() string

Type returns "file" as the client type.

func (*FileReplicaClient) WALDir

func (c *FileReplicaClient) WALDir(generation string) (string, error)

WALDir returns the path to a generation's WAL directory

func (*FileReplicaClient) WALSegmentPath

func (c *FileReplicaClient) WALSegmentPath(generation string, index int, offset int64) (string, error)

WALSegmentPath returns the path to a WAL segment file.

func (*FileReplicaClient) WALSegmentReader

func (c *FileReplicaClient) WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error)

WALSegmentReader returns a reader for a section of WAL data at the given position. Returns os.ErrNotExist if no matching index/offset is found.

func (*FileReplicaClient) WALSegments

func (c *FileReplicaClient) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error)

WALSegments returns an iterator over all available WAL files for a generation.

func (*FileReplicaClient) WriteSnapshot

func (c *FileReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info SnapshotInfo, err error)

WriteSnapshot writes LZ4 compressed data from rd into a file on disk.

func (*FileReplicaClient) WriteWALSegment

func (c *FileReplicaClient) WriteWALSegment(ctx context.Context, pos Pos, rd io.Reader) (info WALSegmentInfo, err error)

WriteWALSegment writes LZ4 compressed data from rd into a file on disk.

type FileWALSegmentIterator

type FileWALSegmentIterator struct {
	// contains filtered or unexported fields
}

func NewFileWALSegmentIterator

func NewFileWALSegmentIterator(dir, generation string, indexes []int) *FileWALSegmentIterator

func (*FileWALSegmentIterator) Append

func (itr *FileWALSegmentIterator) Append(info WALSegmentInfo) error

Append add an additional WAL segment to the end of the iterator. This function expects that info will always be later than all previous infos that the iterator has or has seen.

func (*FileWALSegmentIterator) Close

func (itr *FileWALSegmentIterator) Close() (err error)

func (*FileWALSegmentIterator) Err

func (itr *FileWALSegmentIterator) Err() error

Err returns the first error that occurs on the iterator.

func (*FileWALSegmentIterator) Generation

func (itr *FileWALSegmentIterator) Generation() string

Generation returns the generation this iterator was initialized with.

func (*FileWALSegmentIterator) Indexes

func (itr *FileWALSegmentIterator) Indexes() []int

Indexes returns the pending indexes. Only used for testing.

func (*FileWALSegmentIterator) Next

func (itr *FileWALSegmentIterator) Next() bool

func (*FileWALSegmentIterator) NotifyCh

func (itr *FileWALSegmentIterator) NotifyCh() <-chan struct{}

func (*FileWALSegmentIterator) SetErr

func (itr *FileWALSegmentIterator) SetErr(err error)

SetErr sets the error on the iterator and notifies it of the change.

func (*FileWALSegmentIterator) WALSegment

func (itr *FileWALSegmentIterator) WALSegment() WALSegmentInfo

type Pos

type Pos struct {
	Generation string // generation name
	Index      int    // wal file index
	Offset     int64  // offset within wal file
}

Pos is a position in the WAL for a generation.

func ParsePos

func ParsePos(s string) (Pos, error)

ParsePos parses a position generated by Pos.String().

func (Pos) IsZero

func (p Pos) IsZero() bool

IsZero returns true if p is the zero value.

func (Pos) String

func (p Pos) String() string

String returns a string representation.

func (Pos) Truncate added in v0.3.5

func (p Pos) Truncate() Pos

Truncate returns p with the offset truncated to zero.

type Replica

type Replica struct {

	// Time between syncs with the shadow WAL.
	SyncInterval time.Duration

	// Frequency to create new snapshots.
	SnapshotInterval time.Duration

	// Time to keep snapshots and related WAL files.
	// Database is snapshotted after interval, if needed, and older WAL files are discarded.
	Retention time.Duration

	// Time between checks for retention.
	RetentionCheckInterval time.Duration

	// Time between validation checks.
	ValidationInterval time.Duration

	// If true, replica monitors database for changes automatically.
	// Set to false if replica is being used synchronously (such as in tests).
	MonitorEnabled bool

	Logger *log.Logger
	// contains filtered or unexported fields
}

Replica connects a database to a replication destination via a ReplicaClient. The replica manages periodic synchronization and maintaining the current replica position.

func LatestReplica

func LatestReplica(ctx context.Context, replicas []*Replica) (*Replica, error)

LatestReplica returns the most recently updated replica.

func NewReplica added in v0.3.5

func NewReplica(db *DB, name string, client ReplicaClient) *Replica

func (*Replica) Client added in v0.3.5

func (r *Replica) Client() ReplicaClient

Client returns the client the replica was initialized with.

func (*Replica) Close

func (r *Replica) Close() (err error)

Close will close the DB file descriptor which could release locks on per-process locks (e.g. non-Linux OSes).

func (*Replica) DB added in v0.3.0

func (r *Replica) DB() *DB

DB returns a reference to the database the replica is attached to, if any.

func (*Replica) EnforceRetention added in v0.3.5

func (r *Replica) EnforceRetention(ctx context.Context) (err error)

EnforceRetention forces a new snapshot once the retention interval has passed. Older snapshots and WAL files are then removed.

func (*Replica) GenerationCreatedAt added in v0.3.5

func (r *Replica) GenerationCreatedAt(ctx context.Context, generation string) (time.Time, error)

GenerationCreatedAt returns the earliest creation time of any snapshot. Returns zero time if no snapshots exist.

func (*Replica) Name

func (r *Replica) Name() string

Name returns the name of the replica.

func (*Replica) Pos added in v0.3.5

func (r *Replica) Pos() Pos

Pos returns the current replicated position. Returns a zero value if the current position cannot be determined.

func (*Replica) Snapshot added in v0.3.5

func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error)

Snapshot copies the entire database to the replica path.

func (*Replica) SnapshotIndexAt

func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)

SnapshotIndexAt returns the highest index for a snapshot within a generation that occurs before timestamp. If timestamp is zero, returns the latest snapshot.

func (*Replica) Snapshots added in v0.2.0

func (r *Replica) Snapshots(ctx context.Context) ([]SnapshotInfo, error)

Snapshots returns a list of all snapshots across all generations.

func (*Replica) Start

func (r *Replica) Start(ctx context.Context)

Starts replicating in a background goroutine.

func (*Replica) Stop

func (r *Replica) Stop()

Stop cancels any outstanding replication and blocks until finished.

func (*Replica) Sync added in v0.3.4

func (r *Replica) Sync(ctx context.Context) (err error)

Sync copies new WAL frames from the shadow WAL to the replica client.

type ReplicaClient added in v0.3.5

type ReplicaClient interface {
	// Returns the type of client.
	Type() string

	// Returns a list of available generations.
	Generations(ctx context.Context) ([]string, error)

	// Deletes all snapshots & WAL segments within a generation.
	DeleteGeneration(ctx context.Context, generation string) error

	// Returns an iterator of all snapshots within a generation on the replica.
	Snapshots(ctx context.Context, generation string) (SnapshotIterator, error)

	// Writes LZ4 compressed snapshot data to the replica at a given index
	// within a generation. Returns metadata for the snapshot.
	WriteSnapshot(ctx context.Context, generation string, index int, r io.Reader) (SnapshotInfo, error)

	// Deletes a snapshot with the given generation & index.
	DeleteSnapshot(ctx context.Context, generation string, index int) error

	// Returns a reader that contains LZ4 compressed snapshot data for a
	// given index within a generation. Returns an os.ErrNotFound error if
	// the snapshot does not exist.
	SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)

	// Returns an iterator of all WAL segments within a generation on the replica.
	WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error)

	// Writes an LZ4 compressed WAL segment at a given position.
	// Returns metadata for the written segment.
	WriteWALSegment(ctx context.Context, pos Pos, r io.Reader) (WALSegmentInfo, error)

	// Deletes one or more WAL segments at the given positions.
	DeleteWALSegments(ctx context.Context, a []Pos) error

	// Returns a reader that contains an LZ4 compressed WAL segment at a given
	// index/offset within a generation. Returns an os.ErrNotFound error if the
	// WAL segment does not exist.
	WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error)
}

ReplicaClient represents client to connect to a Replica.

type RestoreOptions

type RestoreOptions struct {
	// File info used for restored snapshot & WAL files.
	Mode     os.FileMode
	Uid, Gid int

	// Specifies how many WAL files are downloaded in parallel during restore.
	Parallelism int

	// Logging settings.
	Logger    *log.Logger
	LogPrefix string
}

RestoreOptions represents options for DB.Restore().

func NewRestoreOptions added in v0.2.0

func NewRestoreOptions() RestoreOptions

NewRestoreOptions returns a new instance of RestoreOptions with defaults.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents the top-level container. It manage databases and routes global file system events.

func NewServer

func NewServer() *Server

NewServer returns a new instance of Server.

func (*Server) Close

func (s *Server) Close() (err error)

Close shuts down the server and all databases it manages.

func (*Server) DB

func (s *Server) DB(path string) *DB

DB returns the database with the given path, if it's managed by the server.

func (*Server) DBs

func (s *Server) DBs() []*DB

DBs returns a slice of all databases managed by the server.

func (*Server) Open

func (s *Server) Open() error

Open initializes the server and begins watching for file system events.

func (*Server) Unwatch

func (s *Server) Unwatch(path string) error

Unwatch removes a database path from being managed by the server.

func (*Server) Watch

func (s *Server) Watch(path string, fn func(path string) (*DB, error)) error

Watch adds a database path to be managed by the server.

type SnapshotInfo added in v0.2.0

type SnapshotInfo struct {
	Generation string
	Index      int
	Size       int64
	CreatedAt  time.Time
}

SnapshotInfo represents file information about a snapshot.

func FilterSnapshotsAfter added in v0.3.0

func FilterSnapshotsAfter(a []SnapshotInfo, t time.Time) []SnapshotInfo

FilterSnapshotsAfter returns all snapshots that were created on or after t.

func FindMinSnapshotByGeneration added in v0.3.0

func FindMinSnapshotByGeneration(a []SnapshotInfo, generation string) *SnapshotInfo

FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.

func SliceSnapshotIterator added in v0.3.5

func SliceSnapshotIterator(itr SnapshotIterator) ([]SnapshotInfo, error)

SliceSnapshotIterator returns all snapshots from an iterator as a slice.

func (*SnapshotInfo) Pos added in v0.3.5

func (info *SnapshotInfo) Pos() Pos

Pos returns the WAL position when the snapshot was made.

type SnapshotInfoSlice added in v0.3.5

type SnapshotInfoSlice []SnapshotInfo

SnapshotInfoSlice represents a slice of snapshot metadata.

func (SnapshotInfoSlice) Len added in v0.3.5

func (a SnapshotInfoSlice) Len() int

func (SnapshotInfoSlice) Less added in v0.3.5

func (a SnapshotInfoSlice) Less(i, j int) bool

func (SnapshotInfoSlice) Swap added in v0.3.5

func (a SnapshotInfoSlice) Swap(i, j int)

type SnapshotInfoSliceIterator added in v0.3.5

type SnapshotInfoSliceIterator struct {
	// contains filtered or unexported fields
}

SnapshotInfoSliceIterator represents an iterator for iterating over a slice of snapshots.

func NewSnapshotInfoSliceIterator added in v0.3.5

func NewSnapshotInfoSliceIterator(a []SnapshotInfo) *SnapshotInfoSliceIterator

NewSnapshotInfoSliceIterator returns a new instance of SnapshotInfoSliceIterator.

func (*SnapshotInfoSliceIterator) Close added in v0.3.5

func (itr *SnapshotInfoSliceIterator) Close() error

Close always returns nil.

func (*SnapshotInfoSliceIterator) Err added in v0.3.5

func (itr *SnapshotInfoSliceIterator) Err() error

Err always returns nil.

func (*SnapshotInfoSliceIterator) Next added in v0.3.5

func (itr *SnapshotInfoSliceIterator) Next() bool

Next moves to the next snapshot. Returns true if another snapshot is available.

func (*SnapshotInfoSliceIterator) Snapshot added in v0.3.5

func (itr *SnapshotInfoSliceIterator) Snapshot() SnapshotInfo

Snapshot returns the metadata from the currently positioned snapshot.

type SnapshotIterator added in v0.3.5

type SnapshotIterator interface {
	io.Closer

	// Prepares the next snapshot for reading with the Snapshot() method.
	// Returns true if another snapshot is available. Returns false if no more
	// snapshots are available or if an error occurred.
	Next() bool

	// Returns an error that occurred during iteration.
	Err() error

	// Returns metadata for the currently positioned snapshot.
	Snapshot() SnapshotInfo
}

SnapshotIterator represents an iterator over a collection of snapshot metadata.

type StreamClient

type StreamClient interface {
	// Stream returns a reader which contains and optional snapshot followed
	// by a series of WAL segments. This stream begins from the given position.
	Stream(ctx context.Context, pos Pos) (StreamReader, error)
}

StreamClient represents a client for streaming changes to a replica DB.

type StreamReader

type StreamReader interface {
	io.ReadCloser
	PageSize() int
	Next() (*StreamRecordHeader, error)
}

StreamReader represents a reader that streams snapshot and WAL records.

type StreamRecordHeader

type StreamRecordHeader struct {
	Type       int
	Flags      int
	Generation string
	Index      int
	Offset     int64
	Size       int64
}

func (*StreamRecordHeader) MarshalBinary

func (hdr *StreamRecordHeader) MarshalBinary() ([]byte, error)

func (*StreamRecordHeader) Pos

func (hdr *StreamRecordHeader) Pos() Pos

func (*StreamRecordHeader) UnmarshalBinary

func (hdr *StreamRecordHeader) UnmarshalBinary(data []byte) error

UnmarshalBinary from data into hdr.

type WALDownloader

type WALDownloader struct {

	// File info used for downloaded WAL files.
	Mode     os.FileMode
	Uid, Gid int

	// Number of downloads occurring in parallel.
	Parallelism int
	// contains filtered or unexported fields
}

WALDownloader represents a parallel downloader of WAL files from a replica client.

It works on a per-index level so WAL files are always downloaded in their entirety and are not segmented. WAL files are downloaded from minIndex to maxIndex, inclusively, and are written to a path prefix. WAL files are named with the prefix and suffixed with the WAL index. It is the responsibility of the caller to clean up these WAL files.

The purpose of the parallization is that RTT & WAL apply time can consume much of the restore time so it's useful to download multiple WAL files in the background to minimize the latency. While some WAL indexes may be downloaded out of order, the WALDownloader ensures that Next() always returns the WAL files sequentially.

func NewWALDownloader

func NewWALDownloader(client ReplicaClient, prefix string, generation string, minIndex, maxIndex int) *WALDownloader

NewWALDownloader returns a new instance of WALDownloader.

func (*WALDownloader) Close

func (d *WALDownloader) Close() (err error)

Close cancels all downloads and returns any error that has occurred.

func (*WALDownloader) N

func (d *WALDownloader) N() int

N returns the number of WAL files returned by Next().

func (*WALDownloader) Next

func (d *WALDownloader) Next(ctx context.Context) (int, string, error)

Next returns the index & local file path of the next downloaded WAL file.

type WALInfo added in v0.2.0

type WALInfo struct {
	Generation string
	Index      int
	CreatedAt  time.Time
}

WALInfo represents file information about a WAL file.

type WALInfoSlice added in v0.3.5

type WALInfoSlice []WALInfo

WALInfoSlice represents a slice of WAL metadata.

func (WALInfoSlice) Len added in v0.3.5

func (a WALInfoSlice) Len() int

func (WALInfoSlice) Less added in v0.3.5

func (a WALInfoSlice) Less(i, j int) bool

func (WALInfoSlice) Swap added in v0.3.5

func (a WALInfoSlice) Swap(i, j int)

type WALNotFoundError

type WALNotFoundError struct {
	Generation string
	Index      int
}

WALNotFoundError is returned by WALDownloader if an WAL index is not found.

func (*WALNotFoundError) Error

func (e *WALNotFoundError) Error() string

Error returns the error string.

type WALSegmentInfo added in v0.3.5

type WALSegmentInfo struct {
	Generation string
	Index      int
	Offset     int64
	Size       int64
	CreatedAt  time.Time
}

WALSegmentInfo represents file information about a WAL segment file.

func SliceWALSegmentIterator added in v0.3.5

func SliceWALSegmentIterator(itr WALSegmentIterator) ([]WALSegmentInfo, error)

SliceWALSegmentIterator returns all WAL segment files from an iterator as a slice.

func (*WALSegmentInfo) Pos added in v0.3.5

func (info *WALSegmentInfo) Pos() Pos

Pos returns the WAL position when the segment was made.

type WALSegmentInfoSlice added in v0.3.5

type WALSegmentInfoSlice []WALSegmentInfo

WALSegmentInfoSlice represents a slice of WAL segment metadata.

func (WALSegmentInfoSlice) Len added in v0.3.5

func (a WALSegmentInfoSlice) Len() int

func (WALSegmentInfoSlice) Less added in v0.3.5

func (a WALSegmentInfoSlice) Less(i, j int) bool

func (WALSegmentInfoSlice) Swap added in v0.3.5

func (a WALSegmentInfoSlice) Swap(i, j int)

type WALSegmentInfoSliceIterator added in v0.3.5

type WALSegmentInfoSliceIterator struct {
	// contains filtered or unexported fields
}

WALSegmentInfoSliceIterator represents an iterator for iterating over a slice of wal segments.

func NewWALSegmentInfoSliceIterator added in v0.3.5

func NewWALSegmentInfoSliceIterator(a []WALSegmentInfo) *WALSegmentInfoSliceIterator

NewWALSegmentInfoSliceIterator returns a new instance of WALSegmentInfoSliceIterator.

func (*WALSegmentInfoSliceIterator) Close added in v0.3.5

func (itr *WALSegmentInfoSliceIterator) Close() error

Close always returns nil.

func (*WALSegmentInfoSliceIterator) Err added in v0.3.5

Err always returns nil.

func (*WALSegmentInfoSliceIterator) Next added in v0.3.5

func (itr *WALSegmentInfoSliceIterator) Next() bool

Next moves to the next wal segment. Returns true if another segment is available.

func (*WALSegmentInfoSliceIterator) WALSegment added in v0.3.5

func (itr *WALSegmentInfoSliceIterator) WALSegment() WALSegmentInfo

WALSegment returns the metadata from the currently positioned wal segment.

type WALSegmentIterator added in v0.3.5

type WALSegmentIterator interface {
	io.Closer

	// Prepares the next WAL for reading with the WAL() method.
	// Returns true if another WAL is available. Returns false if no more
	// WAL files are available or if an error occurred.
	Next() bool

	// Returns an error that occurred during iteration.
	Err() error

	// Returns metadata for the currently positioned WAL segment file.
	WALSegment() WALSegmentInfo
}

WALSegmentIterator represents an iterator over a collection of WAL segments.

type WALWriter

type WALWriter struct {
	Salt0, Salt1 uint32
	// contains filtered or unexported fields
}

WALWriter represents a writer to a SQLite WAL file.

func NewWALWriter

func NewWALWriter(path string, mode os.FileMode, pageSize int) *WALWriter

NewWALWriter returns a new instance of WALWriter.

func (*WALWriter) Close

func (w *WALWriter) Close() error

Close closes the file handle to the WAL file.

func (*WALWriter) Open

func (w *WALWriter) Open() (err error)

Open opens the file handle to the WAL file.

func (*WALWriter) WriteFrame

func (w *WALWriter) WriteFrame(pgno, commit uint32, data []byte) error

func (*WALWriter) WriteHeader

func (w *WALWriter) WriteHeader() error

WriteHeader writes the WAL header to the beginning of the file.

Directories

Path Synopsis
cmd
litestream command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL