litefs

package module
v0.2.0-beta3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2022 License: Apache-2.0 Imports: 22 Imported by: 1

README

LiteFS GitHub release (latest by date) Status GitHub

LiteFS is a FUSE-based file system for replicating SQLite databases across a cluster of machines. It works as a passthrough file system that intercepts writes to SQLite databases in order to detect transaction boundaries and record changes on a per-transaction level in LTX files.

This project is actively maintained but is currently in an alpha state. The file system and replication are functional but testing & hardening are needed to make it production-ready. This repository is open source in order to collect feedback and ideas for how to make SQLite replication better.

Example usage

The following tutorial demonstrates how to use LiteFS to replicate a SQLite database across two live systems: a primary system and a secondary system.

The primary system can write to its local SQLite database, and LiteFS will replicate the changes to the secondary system. The secondary system will be a read-only replica of the primary. It won't be able to make changes, but if the primary system becomes unavailable, the secondary system will take over as the primary node.

Install dependencies

First, install the required packages on each system where you'll be running LiteFS.

apt install fuse3 libfuse-dev sqlite3 consul wget tar
Install LiteFS

Next, download and install the LiteFS binary on each system:

ARCH="amd64" # Change to your system's architecture.
VERSION="0.1.0" # Change to the latest LiteFS version
pushd $(mktemp --directory) && \
  wget "https://github.com/superfly/litefs/releases/download/v${VERSION}/litefs-v${VERSION}-linux-${ARCH}.tar.gz" && \
  tar xvf "litefs-v${VERSION}-linux-${ARCH}.tar.gz" && \
  sudo mv litefs /usr/local/bin && \
popd
Start Consul on primary node

LiteFS uses Consul for leader election.

On your primary node, run Consul in development mode:

consul agent -client "::" -dev
Configure your primary node

First, choose the location for your SQLite database on your primary node:

# Choose where you want to store your SQLite database.
export DB_FILE="${HOME}/litefs-demo1/data.db"

# Make sure parent directory exists.
export DB_DIR="$(dirname ${DB_FILE})"
mkdir --parents "${DB_DIR}"

Then, choose the hostname for your primary node.

# Leave as-is if both nodes are on the same network. Otherwise, change to a
# domain name or IP that other nodes can use to connect with this node.
export HOSTNAME="$(hostname --fqdn)"

Then, choose the port on which LiteFS will listen for connections from other LiteFS nodes.

export LITEFS_PORT="20202"

Then, specify the URL of the Consul server. If it's running on the same node as your primary node, you can leave it as the default value:

export CONSUL_URL="http://localhost:8500"

Finally, create your LiteFS config file. Create a file called litefs.yml with the following contents:

mount-dir: "${DB_DIR}"

http:
  addr: ":${LITEFS_PORT}"

consul:
  url: "${CONSUL_URL}"
  advertise-url: "http://${HOSTNAME}:${LITEFS_PORT}"

LiteFS expands environment variables in its config file, so you can leave the environment variable names in the file, and LiteFS will expand them at runtime.

For more details on LiteFS's configuration options, see the example config.

Launch primary LiteFS node

Now that your configuration is done, it's time to launch LiteFS:

litefs -config litefs.yml

If everything worked, you should see a message like this:

primary lease acquired, advertising as http://bert.localdomain:20202
LiteFS mounted to: /home/user/litefs-demo1
http server listening on: http://bert:20202
stream connected
Configure your secondary node

Now that we have your primary node up and running, it's time to create a secondary LiteFS node.

First, choose the directory that LiteFS will use to replicate SQLite databases:

export DB_DIR="${HOME}/litefs-demo2"
mkdir --parents "${DB_DIR}"

Then, choose the hostname through which other nodes can connect to this node.

# Leave as-is if both nodes are on the same network. Otherwise, change to a
# domain name or IP that other nodes can use to connect with this node.
export HOSTNAME="$(hostname --fqdn)"

Then, choose the port on which LiteFS will listen for connections from other LiteFS nodes.

export LITEFS_PORT="30303"

Then, specify the URL of the Consul server on the primary node.:

# Change to the domain name or IP of your primary node.
export PRIMARY_NODE="bert"
export CONSUL_URL="http://${PRIMARY_NODE}:8500"

Finally, create your litefs.yml config file with the following contents:

mount-dir: "${DB_DIR}"

http:
  addr: ":${LITEFS_PORT}"

consul:
  url: "${CONSUL_URL}"
  advertise-url: "http://${HOSTNAME}:${LITEFS_PORT}"
Launch secondary LiteFS node

Just as you did with your first node, it's time to run LiteFS on your secondary node:

litefs -config litefs.yml

If everything worked, you should see a message indicating that your secondary node has successfully connected to the Consul server and your primary LiteFS node:

initializing consul: key=http://bert:8500 url=litefs/primary advertise-url=http://ernie.localdomain:
LiteFS mounted to: /home/user/example-data2
http server listening on: http://localhost:30303
existing primary found (http://bert.localdomain:20202), connecting as replica
Replicating data across nodes

Now that both nodes are running, you're ready to see LiteFS in action.

Go back to your primary node, and start a new terminal session. Run the following commands to create a new SQLite table and populate it with some data.

# Use the same database file you specified above.
DB_FILE="${HOME}/litefs-demo1/data.db"

sqlite3 "${DB_FILE}" 'CREATE TABLE movies (title TEXT, rating INT)'
sqlite3 "${DB_FILE}" 'INSERT INTO movies (title, rating) VALUES ("The Jerk", 10)'
sqlite3 "${DB_FILE}" 'INSERT INTO movies VALUES ("Election", 9)'

Now, switch to your secondary node and see if LiteFS replicated the data:

# Use the same database file you specified above.
DB_FILE="${HOME}/litefs-demo2/data.db"

sqlite3 "${DB_FILE}" 'SELECT * FROM movies'

You should see that LiteFS has replicated data from your primary node onto your secondary node:

The Jerk|10
Election|9
Failing over to your secondary node

From your secondary node, try adding some data to the database:

$ sqlite3 "${DB_FILE}" 'INSERT INTO movies VALUES ("Chairman of the Board", 1)'
Error: unable to open database file

Whoops. Your secondary node failed when it tried to add data. What's going on?

This is by design. To ensure the integrity of the data, only one node can act as a writer. All the other nodes are read-only, and they will see an error if they attempt to write to the database.

If the primary node becomes unavailable, Consul will appoint a new primary node. To see how that works, return to the LiteFS session on your primary node, and use Ctrl+C to kill the process.

You should see this output:

signal received, litefs shutting down
stream disconnected
exiting primary, destroying lease

Now, go back to your secondary node, and try the INSERT query again:

sqlite3 "${DB_FILE}" 'INSERT INTO movies VALUES ("Chairman of the Board", 1)'

This time, the INSERT worked because your secondary node has taken over as your primary.

You can see the new row in the database:

sqlite3 "${DB_FILE}" "SELECT * FROM movies"
The Jerk|10
Election|9
Chairman of the Board|1
Caveats

If litefs does not exit cleanly then you may need to manually run umount to unmount the file system before re-mounting it:

umount -f /path/to/mnt

litefs will not unmount cleanly if there is a SQLite connection open so be sure to close your application or sqlite3 sessions before unmounting.

Architecture

The LiteFS system is composed of 3 major parts:

  1. FUSE file system: intercepts file system calls to record transactions.
  2. Leader election: currently implemented by Consul using sessions
  3. HTTP server: provides an API for replica nodes to receive changes.
Lite Transaction Files (LTX)

Each transaction in SQLite is simply a collection of one or more pages to be written. This is done safely by the rollback journal or the write-ahead log (WAL) within a SQLite database.

An LTX file is an additional packaging format for these change sets. Unlike the journal or the WAL, the LTX file is optimized for use in a replication system by utilizing the following:

  • Checksumming across the LTX file to ensure consistency.
  • Rolling checksum of the entire database on every transaction.
  • Sorted pages for efficient compactions to ensure fast recovery time.
  • Page-level encryption (future work)
  • Transactional event data (future work)

Each LTX file is associated with an autoincrementing transaction ID (TXID) so that replicas can know their position relative to the primary node. This TXID is also associated with a rolling checksum of the entire database to ensure that the database is never corrupted if a split brain occurs. Please see the Guarantees section to understand how async replication and split brain works.

File system

The FUSE-based file system allows the user to mount LiteFS to a directory. For the primary node in the cluster, this means it can intercept write transactions via the file system interface and it is transparent to the application and SQLite.

For replica nodes, the file system adds protections by ensuring databases are not writeable. The file system also provides information about the current primary node to the application via the .primary file.

In SQLite, write transactions work by copying pages out to the rollback journal, updating pages in the database file, and then deleting the rollback journal when complete. LiteFS passes all these file system calls through to the underlying files, however, it intercepts the journal deletion at the end to convert the updated pages to an LTX file.

Currently, LiteFS only supports the SQLite rollback journal but it will support WAL mode and possibly wal2 in the future.

Leader election

Because LiteFS is meant to be used in ephemeral deployments such as Fly.io or Kubernetes, it cannot use a distributed consensus algorithm that requires strong membership such as Raft. Instead, it delegates leader election to Consul sessions and uses a time-based lease system.

Distributed leases work by obtaining a lock on a key within Consul which guarantees that only one node can be the primary at any given time. This lease has a time-to-live (TTL) which is automatically renewed by the primary as long as it is alive. If the primary shuts down cleanly, the lease is destroyed and another node can immediately become the new primary. If the primary dies unexpectedly then the TTL must expire before a new node will become primary.

Since LiteFS uses async replication, replica nodes may be at different replication positions, however, whichever node becomes primary will dictate the state of the database. This means replicas which are further ahead could potentially lose some transactions. See the Guarantees section below for more information.

HTTP server

Replica nodes communicate with the primary node over HTTP. When they connect to the primary node, they specify their replication position, which is their transaction ID and a rolling checksum of the entire database. The primary node will then begin sending transaction data to the replica starting from that position. If the primary no longer has that transaction position available, it will resend a snapshot of the current database and begin replicating transactions from there.

Guarantees

LiteFS is intended to provide easy, live, asychronous replication across ephemeral nodes in a cluster. This approach makes trade-offs as compared with simpler disaster recovery tools such as Litestream and more complex but strongly-consistent tools such as rqlite.

As with any async replication system, there's a window of time where transactions are only durable on the primary node and have not been replicated to a replica node. A catastrophic crash on the primary would cause these transactions to be lost. Typically, this window is subsecond as transactions can quickly be shuttled from the primary to the replicas.

Synchronous replication and time-bounded asynchronous replication is planned for future versions of LiteFS.

Ensuring consistency during split brain

Because LiteFS uses async replication, there is the potential that a primary could receive writes but is unable to replicate them during a network partition. If the primary node loses its leader status and later connects to the new leader, its database state will have diverged from the new leader. If it naively began applying transactions from the new leader, it could corrupt its database state.

Instead, LiteFS utilizes a rolling checksum which represents a checksum of the entire database at every transaction. When the old primary node connects to the new primary node, it will see that its checksum is different even though its transaction ID could be the same. At this point, it will resnapshot the database from the new primary to ensure consistency.

Rolling checksum implementation

The rolling checksum is implemented by combining checksums of every page together. When a page is written, LiteFS will compute the CRC64 of the page number and the page data and XOR them into the rolling checksum. It will also compute this same page checksum for the old page data and XOR that value out of the rolling checksum.

This approach gives us strong guarantees about the exact byte contents of the database at every transaction and it is fast to compute. As XOR is associative, it is also possible to compute on a raw database file from scratch to ensure consistency.

Contributing

LiteFS contributions work a little different than most GitHub projects. If you have a small bug fix or typo fix, please PR directly to this repository.

If you would like to contribute a feature, please follow these steps:

  1. Discuss the feature in an issue on this GitHub repository.
  2. Create a pull request to your fork of the repository.
  3. Post a link to your pull request in the issue for consideration.

This project has a roadmap and features are added and tested in a certain order. Additionally, it's likely that code style, implementation details, and test coverage will need to be tweaked so it's easier to for me to grab your implementation as a starting point when implementing a feature.

Documentation

Index

Constants

View Source
const (
	SQLITE_DATABASE_HEADER_STRING = "SQLite format 3\x00"

	/// Magic header string that identifies a SQLite journal header.
	/// https://www.sqlite.org/fileformat.html#the_rollback_journal
	SQLITE_JOURNAL_HEADER_STRING = "\xd9\xd5\x05\xf9\x20\xa1\x63\xd7"

	// Location of the database size, in pages, in the main database file.
	SQLITE_DATABASE_SIZE_OFFSET = 28
)
View Source
const (
	LockTypePending  = 0x40000000
	LockTypeReserved = 0x40000001
	LockTypeShared   = 0x40000002
)
View Source
const (
	WALHeaderSize      = 32
	WALFrameHeaderSize = 24
	WALIndexHeaderSize = 136
)

SQLite constants

View Source
const (
	PENDING_BYTE  = 0x40000000
	RESERVED_BYTE = (PENDING_BYTE + 1)
	SHARED_FIRST  = (PENDING_BYTE + 2)
	SHARED_SIZE   = 510
)

SQLite rollback journal lock constants.

View Source
const (
	WAL_WRITE_LOCK   = 120
	WAL_CKPT_LOCK    = 121
	WAL_RECOVER_LOCK = 122
	WAL_READ_LOCK0   = 123
	WAL_READ_LOCK1   = 124
	WAL_READ_LOCK2   = 125
	WAL_READ_LOCK3   = 126
	WAL_READ_LOCK4   = 127
)

SQLite WAL lock constants.

View Source
const (
	F_OFD_GETLK  = 36
	F_OFD_SETLK  = 37
	F_OFD_SETLKW = 38
)

Open file description lock constants.

View Source
const (
	JournalModeDelete   = "DELETE"
	JournalModeTruncate = "TRUNCATE"
	JournalModePersist  = "PERSIST"
	JournalModeWAL      = "WAL"
)
View Source
const (
	FileTypeNone = FileType(iota)
	FileTypeDatabase
	FileTypeJournal
	FileTypeWAL
	FileTypeSHM
	FileTypePos
)

Database file types.

View Source
const (
	StreamFrameTypeLTX   = StreamFrameType(1)
	StreamFrameTypeReady = StreamFrameType(2)
)
View Source
const (
	RWMutexStateUnlocked = iota
	RWMutexStateShared
	RWMutexStateExclusive
)
View Source
const (
	DefaultRetentionDuration        = 1 * time.Minute
	DefaultRetentionMonitorInterval = 1 * time.Minute
)

Default store settings.

View Source
const IDLength = 24

IDLength is the length of a node ID, in bytes.

View Source
const RWMutexInterval = 10 * time.Microsecond

RWMutexInterval is the time between reattempting lock acquisition.

Variables

View Source
var (
	ErrDatabaseNotFound = fmt.Errorf("database not found")
	ErrDatabaseExists   = fmt.Errorf("database already exists")

	ErrNoPrimary     = errors.New("no primary")
	ErrPrimaryExists = errors.New("primary exists")
	ErrLeaseExpired  = errors.New("lease expired")

	ErrReadOnlyReplica = fmt.Errorf("read only replica")
)

LiteFS errors

Functions

func TrimName

func TrimName(name string) string

TrimName removes "-journal", "-shm" or "-wal" from the given name.

func WriteStreamFrame

func WriteStreamFrame(w io.Writer, f StreamFrame) error

WriteStreamFrame writes the stream type & frame to the writer.

Types

type Client

type Client interface {
	// Stream starts a long-running connection to stream changes from another node.
	Stream(ctx context.Context, rawurl string, id string, posMap map[string]Pos) (io.ReadCloser, error)
}

Client represents a client for connecting to other LiteFS nodes.

type DB

type DB struct {

	// Returns the current time. Used for mocking time in tests.
	Now func() time.Time
	// contains filtered or unexported fields
}

DB represents a SQLite database.

func NewDB

func NewDB(store *Store, name string, path string) *DB

NewDB returns a new instance of DB.

func (*DB) ApplyLTX added in v0.2.0

func (db *DB) ApplyLTX(ctx context.Context, path string) error

ApplyLTX applies an LTX file to the database.

func (*DB) CommitJournal

func (db *DB) CommitJournal(mode JournalMode) error

CommitJournal deletes the journal file which commits or rolls back the transaction.

func (*DB) CreateJournal

func (db *DB) CreateJournal() (*os.File, error)

CreateJournal creates a new journal file on disk.

func (*DB) DatabasePath added in v0.1.1

func (db *DB) DatabasePath() string

DatabasePath returns the path to the underlying database file.

func (*DB) EnforceRetention added in v0.2.0

func (db *DB) EnforceRetention(ctx context.Context, minTime time.Time) error

EnforceRetention removes all LTX files created before minTime.

func (*DB) GuardSet added in v0.2.0

func (db *DB) GuardSet() *GuardSet

GuardSet returns a set of guards that can control locking for a single lock owner.

func (*DB) InWriteTx

func (db *DB) InWriteTx() bool

InWriteTx returns true if the RESERVED lock has an exclusive lock.

func (*DB) JournalPath added in v0.1.1

func (db *DB) JournalPath() string

JournalPath returns the path to the underlying journal file.

func (*DB) LTXDir

func (db *DB) LTXDir() string

LTXDir returns the path to the directory of LTX transaction files.

func (*DB) LTXPath

func (db *DB) LTXPath(minTXID, maxTXID uint64) string

LTXPath returns the path of an LTX file.

func (*DB) Name

func (db *DB) Name() string

Name of the database name.

func (*DB) Open

func (db *DB) Open() error

Open initializes the database from files in its data directory.

func (*DB) OpenLTXFile

func (db *DB) OpenLTXFile(txID uint64) (*os.File, error)

OpenLTXFile returns a file handle to an LTX file that contains the given TXID.

func (*DB) PageSize added in v0.2.0

func (db *DB) PageSize() uint32

PageSize returns the page size of the underlying database.

func (*DB) Path

func (db *DB) Path() string

Path of the database's data directory.

func (*DB) PendingLock

func (db *DB) PendingLock() *RWMutex

func (*DB) Pos

func (db *DB) Pos() Pos

Pos returns the current transaction position of the database.

func (*DB) ReadLTXDir added in v0.2.0

func (db *DB) ReadLTXDir() ([]fs.DirEntry, error)

ReadLTXDir returns DirEntry for every LTX file.

func (*DB) ReservedLock

func (db *DB) ReservedLock() *RWMutex

func (*DB) SharedLock

func (db *DB) SharedLock() *RWMutex

func (*DB) Store added in v0.2.0

func (db *DB) Store() *Store

Store returns the store that the database is a member of.

func (*DB) TXID

func (db *DB) TXID() uint64

TXID returns the current transaction ID.

func (*DB) WriteDatabase

func (db *DB) WriteDatabase(f *os.File, data []byte, offset int64) error

WriteDatabase writes data to the main database file.

func (*DB) WriteJournal

func (db *DB) WriteJournal(f *os.File, data []byte, offset int64) error

WriteJournal writes data to the rollback journal file.

func (*DB) WriteSnapshotTo added in v0.2.0

func (db *DB) WriteSnapshotTo(ctx context.Context, dst io.Writer) (header ltx.Header, trailer ltx.Trailer, err error)

WriteSnapshotTo writes an LTX snapshot to dst.

type FileType

type FileType int

FileType represents a type of SQLite file.

func (FileType) IsValid

func (t FileType) IsValid() bool

IsValid returns true if t is a valid file type.

type GuardSet added in v0.2.0

type GuardSet struct {
	// contains filtered or unexported fields
}

GuardSet represents a set of mutex guards held on database locks by a single owner.

func (*GuardSet) Guard added in v0.2.0

func (s *GuardSet) Guard(lockType LockType) *RWMutexGuard

Guard returns a guard by lock type. Panic on invalid lock type.

func (*GuardSet) Unlock added in v0.2.0

func (s *GuardSet) Unlock()

Unlock unlocks all the guards in reversed order that they are acquired by SQLite.

type Invalidator added in v0.1.1

type Invalidator interface {
	InvalidateDB(db *DB, offset, size int64) error
	InvalidatePos(db *DB) error
}

Invalidator is a callback for the store to use to invalidate the kernel page cache.

type JournalMode

type JournalMode string

JournalMode represents a SQLite journal mode.

type LTXStreamFrame

type LTXStreamFrame struct {
	Name string // database name
}

func (*LTXStreamFrame) ReadFrom

func (f *LTXStreamFrame) ReadFrom(r io.Reader) (int64, error)

func (*LTXStreamFrame) Type

Type returns the type of stream frame.

func (*LTXStreamFrame) WriteTo

func (f *LTXStreamFrame) WriteTo(w io.Writer) (int64, error)

type Lease

type Lease interface {
	RenewedAt() time.Time
	TTL() time.Duration

	// Renew attempts to reset the TTL on the lease.
	// Returns ErrLeaseExpired if the lease has expired or was deleted.
	Renew(ctx context.Context) error

	// Close attempts to remove the lease from the server.
	Close() error
}

Lease represents an acquired lease from a Leaser.

type Leaser

type Leaser interface {
	io.Closer

	AdvertiseURL() string

	// Acquire attempts to acquire the lease to become the primary.
	Acquire(ctx context.Context) (Lease, error)

	// PrimaryInfo attempts to read the current primary data.
	// Returns ErrNoPrimary if no primary currently has the lease.
	PrimaryInfo(ctx context.Context) (PrimaryInfo, error)
}

Leaser represents an API for obtaining a lease for leader election.

type LockType

type LockType int

LockType represents a SQLite lock type.

func ParseLockRange

func ParseLockRange(start, end uint64) []LockType

ParseLockRange returns a list of SQLite locks that are within a range.

type Pos

type Pos struct {
	TXID              uint64
	PostApplyChecksum uint64
}

Pos represents the transactional position of a database.

func (Pos) IsZero

func (p Pos) IsZero() bool

IsZero returns true if the position is empty.

func (Pos) String added in v0.2.0

func (p Pos) String() string

String returns a string representation of the position.

type PrimaryInfo added in v0.2.0

type PrimaryInfo struct {
	Hostname     string `json:"hostname"`
	AdvertiseURL string `json:"advertise-url"`
}

PrimaryInfo is the JSON object stored in the Consul lease value.

func (*PrimaryInfo) Clone added in v0.2.0

func (info *PrimaryInfo) Clone() *PrimaryInfo

Clone returns a copy of info.

type RWMutex

type RWMutex struct {
	// contains filtered or unexported fields
}

RWMutex is a reader/writer mutual exclusion lock. It wraps the sync package to provide additional capabilities such as lock upgrades & downgrades. It only supports TryLock() & TryRLock() as that is what's supported by our FUSE file system.

func (*RWMutex) Guard added in v0.2.0

func (rw *RWMutex) Guard() RWMutexGuard

Guard returns an unlocked guard for the mutex.

func (*RWMutex) State

func (rw *RWMutex) State() RWMutexState

State returns whether the mutex has a exclusive lock, one or more shared locks, or if the mutex is unlocked.

type RWMutexGuard

type RWMutexGuard struct {
	// contains filtered or unexported fields
}

RWMutexGuard is a reference to a mutex. Locking, unlocking, upgrading, & downgrading operations are all performed via the guard instead of directly on the RWMutex itself as this works similarly to how POSIX locks work.

func (*RWMutexGuard) CanLock

func (g *RWMutexGuard) CanLock() bool

CanLock returns true if the guard can become an exclusive lock.

func (*RWMutexGuard) CanRLock added in v0.2.0

func (g *RWMutexGuard) CanRLock() bool

CanRLock returns true if the guard can become a shared lock.

func (*RWMutexGuard) Lock added in v0.2.0

func (g *RWMutexGuard) Lock(ctx context.Context) error

Lock attempts to obtain a exclusive lock for the guard. Returns an error if ctx is done.

func (*RWMutexGuard) RLock

func (g *RWMutexGuard) RLock(ctx context.Context) error

RLock attempts to obtain a shared lock for the guard. Returns an error if ctx is done.

func (*RWMutexGuard) TryLock

func (g *RWMutexGuard) TryLock() bool

TryLock upgrades the lock from a shared lock to an exclusive lock. This is a no-op if the lock is already an exclusive lock.

func (*RWMutexGuard) TryRLock added in v0.2.0

func (g *RWMutexGuard) TryRLock() bool

TryRLock attempts to obtain a shared lock on the mutex for the guard. This will upgrade an unlocked guard and downgrade an exclusive guard. Shared guards are a no-op.

func (*RWMutexGuard) Unlock

func (g *RWMutexGuard) Unlock()

Unlock unlocks the underlying mutex.

type RWMutexState

type RWMutexState int

RWMutexState represents the lock state of an RWMutex or RWMutexGuard.

func (RWMutexState) String added in v0.2.0

func (s RWMutexState) String() string

String returns the string representation of the state.

type ReadyStreamFrame added in v0.2.0

type ReadyStreamFrame struct {
}

func (*ReadyStreamFrame) ReadFrom added in v0.2.0

func (f *ReadyStreamFrame) ReadFrom(r io.Reader) (int64, error)

func (*ReadyStreamFrame) Type added in v0.2.0

Type returns the type of stream frame.

func (*ReadyStreamFrame) WriteTo added in v0.2.0

func (f *ReadyStreamFrame) WriteTo(w io.Writer) (int64, error)

type StaticLease added in v0.2.0

type StaticLease struct {
	// contains filtered or unexported fields
}

StaticLease represents a lease for a fixed primary.

func (*StaticLease) Close added in v0.2.0

func (l *StaticLease) Close() error

func (*StaticLease) Renew added in v0.2.0

func (l *StaticLease) Renew(ctx context.Context) error

Renew is a no-op.

func (*StaticLease) RenewedAt added in v0.2.0

func (l *StaticLease) RenewedAt() time.Time

RenewedAt returns the Unix epoch in UTC.

func (*StaticLease) TTL added in v0.2.0

func (l *StaticLease) TTL() time.Duration

TTL returns the duration until the lease expires which is a time well into the future.

type StaticLeaser added in v0.2.0

type StaticLeaser struct {
	// contains filtered or unexported fields
}

StaticLeaser always returns a lease to a static primary.

func NewStaticLeaser added in v0.2.0

func NewStaticLeaser(isPrimary bool, hostname, advertiseURL string) *StaticLeaser

NewStaticLeaser returns a new instance of StaticLeaser.

func (*StaticLeaser) Acquire added in v0.2.0

func (l *StaticLeaser) Acquire(ctx context.Context) (Lease, error)

Acquire returns a lease if this node is the static primary. Otherwise returns ErrPrimaryExists.

func (*StaticLeaser) AdvertiseURL added in v0.2.0

func (l *StaticLeaser) AdvertiseURL() string

AdvertiseURL returns the primary URL if this is the primary. Otherwise returns blank.

func (*StaticLeaser) Close added in v0.2.0

func (l *StaticLeaser) Close() (err error)

Close is a no-op.

func (*StaticLeaser) IsPrimary added in v0.2.0

func (l *StaticLeaser) IsPrimary() bool

IsPrimary returns true if the current node is the primary.

func (*StaticLeaser) PrimaryInfo added in v0.2.0

func (l *StaticLeaser) PrimaryInfo(ctx context.Context) (PrimaryInfo, error)

PrimaryInfo returns the primary's info. Returns ErrNoPrimary if the node is the primary.

type Store

type Store struct {

	// Client used to connect to other LiteFS instances.
	Client Client

	// Leaser manages the lease that controls leader election.
	Leaser Leaser

	// Length of time to retain LTX files.
	RetentionDuration        time.Duration
	RetentionMonitorInterval time.Duration

	// Callback to notify kernel of file changes.
	Invalidator Invalidator

	// If true, computes and verifies the checksum of the entire database
	// after every transaction. Should only be used during testing.
	StrictVerify bool
	// contains filtered or unexported fields
}

Store represents a collection of databases.

func NewStore

func NewStore(path string, candidate bool) *Store

NewStore returns a new instance of Store.

func (*Store) Candidate added in v0.2.0

func (s *Store) Candidate() bool

Candidate returns true if store is eligible to be the primary.

func (*Store) Close

func (s *Store) Close() error

Close signals for the store to shut down.

func (*Store) CreateDB

func (s *Store) CreateDB(name string) (*DB, *os.File, error)

CreateDB creates a new database with the given name. The returned file handle must be closed by the caller. Returns an error if a database with the same name already exists.

func (*Store) CreateDBIfNotExists added in v0.2.0

func (s *Store) CreateDBIfNotExists(name string) (*DB, error)

CreateDBIfNotExists creates an empty database with the given name.

func (*Store) DB

func (s *Store) DB(name string) *DB

DBByName returns a database by name. Returns nil if the database does not exist.

func (*Store) DBDir

func (s *Store) DBDir() string

DBDir returns the folder that stores all databases.

func (*Store) DBPath added in v0.2.0

func (s *Store) DBPath(name string) string

DBPath returns the folder that stores a single database.

func (*Store) DBs

func (s *Store) DBs() []*DB

DBs returns a list of databases.

func (*Store) EnforceRetention added in v0.2.0

func (s *Store) EnforceRetention(ctx context.Context) (err error)

EnforceRetention enforces retention of LTX files on all databases.

func (*Store) ID added in v0.2.0

func (s *Store) ID() string

ID returns the unique identifier for this instance. Available after Open(). Persistent across restarts if underlying storage is persistent.

func (*Store) IsPrimary

func (s *Store) IsPrimary() bool

IsPrimary returns true if store has a lease to be the primary.

func (*Store) MarkDirty

func (s *Store) MarkDirty(name string)

MarkDirty marks a database dirty on all subscribers.

func (*Store) Open

func (s *Store) Open() error

Open initializes the store based on files in the data directory.

func (*Store) Path

func (s *Store) Path() string

Path returns underlying data directory.

func (*Store) PosMap

func (s *Store) PosMap() map[string]Pos

PosMap returns a map of databases and their transactional position.

func (*Store) PrimaryCtx added in v0.2.0

func (s *Store) PrimaryCtx(ctx context.Context) context.Context

PrimaryCtx wraps ctx with another context that will cancel when no longer primary.

func (*Store) PrimaryInfo added in v0.2.0

func (s *Store) PrimaryInfo() *PrimaryInfo

PrimaryInfo returns info about the current primary.

func (*Store) ReadyCh added in v0.2.0

func (s *Store) ReadyCh() chan struct{}

ReadyCh returns a channel that is closed once the store has become primary or once it has connected to the primary.

func (*Store) Subscribe

func (s *Store) Subscribe() *Subscriber

Subscribe creates a new subscriber for store changes.

func (*Store) Unsubscribe

func (s *Store) Unsubscribe(sub *Subscriber)

Unsubscribe removes a subscriber from the store.

type StoreVar added in v0.2.0

type StoreVar Store

func (*StoreVar) String added in v0.2.0

func (v *StoreVar) String() string

type StreamFrame

type StreamFrame interface {
	io.ReaderFrom
	io.WriterTo
	Type() StreamFrameType
}

func ReadStreamFrame

func ReadStreamFrame(r io.Reader) (StreamFrame, error)

ReadStreamFrame reads a the stream type & frame from the reader.

type StreamFrameType

type StreamFrameType uint32

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber subscribes to changes to databases in the store.

It implements a set of "dirty" databases instead of a channel of all events as clients can be slow and we don't want to cause channels to back up. It is the responsibility of the caller to determine the state changes which is usually just checking the position of the client versus the store's database.

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close removes the subscriber from the store.

func (*Subscriber) DirtySet

func (s *Subscriber) DirtySet() map[string]struct{}

DirtySet returns a set of database IDs that have changed since the last call to DirtySet(). This call clears the set.

func (*Subscriber) MarkDirty

func (s *Subscriber) MarkDirty(name string)

MarkDirty marks a database ID as dirty.

func (*Subscriber) NotifyCh

func (s *Subscriber) NotifyCh() <-chan struct{}

NotifyCh returns a channel that receives a value when the dirty set has changed.

Directories

Path Synopsis
cmd
litefs command
go:build linux
go:build linux

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL