store

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2024 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SQLITE_DATABASE_HEADER_STRING = "SQLite format 3\x00"

	SQLITE_DATABASE_HEADER_SIZE = 100
)
View Source
const (
	// CompactionLevelRestoreTarget is the default compaction level used to find an LTX file
	// suitable for restore to specific timestamp.
	//
	// This value corresponds with the granularity that restore requests can see.
	// Levels below CompactionLevelRestoreTarget effectively
	// become used for better write durability (writes after the L1
	// 10s window will be in durable storage) instead of for restorability.
	//
	// We choose a restore target higher than L1 to improve restore speed and usability,
	// since pulling many small L1 (default 10s granularity) files may take a while.
	CompactionLevelRestoreTarget = 2
)
View Source
const (
	MaxCompactionFileN = 100
)

Variables

View Source
var ErrStopIter = errors.New("stop iter")

ErrStopIter indicates that the ongoing iteration should be aborted

Functions

func AttachMetadata

func AttachMetadata(ctx context.Context, client StorageClient, paths []StoragePath, parallelism int) error

AttachMetadata fetches metadata for every path in parallel.

func CompareStoragePath

func CompareStoragePath(a, b *StoragePath) int

CompareStoragePath returns an integer comparing two paths. The result will be 0 if a == b, -1 if a < b, and +1 if a > b.

func ForEachStorageCluster

func ForEachStorageCluster(ctx context.Context, client StorageClient, fn func(cluster string) error) error

ForEachStorageCluster executes fn once for each cluster in storage.

func ForEachStorageDatabase

func ForEachStorageDatabase(ctx context.Context, client StorageClient, fn func(cluster, database string) error) error

ForEachStorageDatabase executes fn once for each database in storage.

func FormatStorageLevelDir

func FormatStorageLevelDir(cluster, database string, level int, sep rune) (_ string, retErr error)

FormatStorageLevelDir formats path to the storage level.

func FormatStoragePath

func FormatStoragePath(p StoragePath, sep rune) (_ string, retErr error)

FormatStoragePath formats path into a string path.

func NewCompactorFromPaths

func NewCompactorFromPaths(ctx context.Context, client StorageClient, paths []StoragePath, w io.Writer) (c *ltx.Compactor, close func() error, retErr error)

NewCompactorFromPaths returns a compactor from a set of storage paths.

func StorageTXIDRange

func StorageTXIDRange(ctx context.Context, client StorageClient, cluster, database string, level int) (min, max ltx.TXID, err error)

StorageTXIDRange returns the minimum & maximum TXID for a compaction level.

Types

type ChangeSet

type ChangeSet map[uint32]struct{}

ChangeSet represents a set of changed database pages.

func NewChangeSetFromPaths

func NewChangeSetFromPaths(ctx context.Context, client StorageClient, paths []StoragePath) (ChangeSet, error)

NewChangeSetFromPaths computes a set of changed pages using the provided LTX files.

func (ChangeSet) Add

func (c ChangeSet) Add(pgno uint32)

Add adds the given page number to the change set

func (ChangeSet) AsSlice

func (c ChangeSet) AsSlice() []uint32

AsSlice returns the contents of the change set as sorted slice

type CompactionLevel

type CompactionLevel struct {
	// The numeric level. Must match the index in the list of levels.
	Level int

	// The frequency that the level is compacted from the previous level.
	Interval time.Duration

	// The duration that files in this level are stored. This should be
	// set higher than the interval of level-1, with some safety margin.
	Retention time.Duration
}

CompactionLevel represents a compaction level. You may want to tweak these values alongside CompactionLevelRestoreTarget, CompactionLevelSnapshot, and CompactionLevelMax.

Level 0 always refers to the on-disk sqlite db. Level 1 through CompactionLevelMax are available for arbitrary configuration. Level 9 always refers to the full database snapshot level.

func (*CompactionLevel) NextCompactionAt

func (lvl *CompactionLevel) NextCompactionAt(now time.Time) time.Time

NextCompactionAt returns the time until the next compaction occurs. Returns the current time if it is exactly a multiple of the level interval.

type CompactionLevels

type CompactionLevels []*CompactionLevel

CompactionLevels represents a sorted slice of non-snapshot compaction levels.

func (CompactionLevels) IsValidLevel

func (a CompactionLevels) IsValidLevel(level int) bool

IsValidLevel returns true if level is a valid compaction level number.

func (CompactionLevels) MaxLevel

func (a CompactionLevels) MaxLevel() int

MaxLevel return the highest non-snapshot compaction level.

func (CompactionLevels) NextLevel

func (a CompactionLevels) NextLevel(level int) int

NextLevel returns the next compaction level. Returns -1 if there is no next level.

func (CompactionLevels) PrevLevel

func (a CompactionLevels) PrevLevel(level int) int

PrevLevel returns the previous compaction level. Returns -1 if there is no previous level.

func (CompactionLevels) Validate

func (a CompactionLevels) Validate() error

Validate returns an error if the levels are invalid.

type CompactionRequest

type CompactionRequest struct {
	Cluster        string
	Database       string
	Level          int
	IdempotencyKey int
}

CompactionRequest represents a request for a database to be compacted at a given level in the future. It contains an idempotency key as it needs to avoid the race condition where a new compaction request occurs while the request is being processed.

type DB

type DB struct {
	ID                int
	Cluster           string
	Name              string
	HWM               ltx.TXID
	TXID              ltx.TXID     // derived field
	PostApplyChecksum ltx.Checksum // derived field
	PageSize          uint32       // derived field
	Commit            uint32       // derived field
	Timestamp         time.Time    // derived field
}

DB represents a database row in the shard.

func (*DB) Pos

func (db *DB) Pos() ltx.Pos

type DBInfo

type DBInfo struct {
	Name            string
	RestorablePaths []StoragePath
}

DBInfo holds basic info about a single database.

type DBTX

type DBTX interface {
	ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
	PrepareContext(context.Context, string) (*sql.Stmt, error)
	QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
	QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}

type Page

type Page struct {
	DBID    int
	Pgno    uint32
	MinTXID ltx.TXID
	MaxTXID ltx.TXID
	Chksum  uint64
	Nonce   []byte
	Tag     []byte
	Data    []byte
}

Page returns a page that is valid between a transaction range.

type StorageClient

type StorageClient interface {
	io.Closer

	// Clusters returns a list of clusters within an org.
	Clusters(ctx context.Context) ([]string, error)

	// Databases returns a list of databases within a cluster.
	Databases(ctx context.Context, cluster string) ([]string, error)

	// Levels returns a list of levels for a database.
	Levels(ctx context.Context, cluster, database string) ([]int, error)

	// Metadata returns the metadata for a given path.
	Metadata(ctx context.Context, path StoragePath) (StorageMetadata, error)

	// OpenFile returns a reader for a specific file. Returns os.NotExist if not found.
	OpenFile(ctx context.Context, path StoragePath) (io.ReadCloser, error)

	// WriteFile writes the contents of r to a path in long-term storage.
	WriteFile(ctx context.Context, path StoragePath, r io.Reader) error

	// DeleteFiles removes one or more files from storage.
	DeleteFiles(ctx context.Context, paths []StoragePath) error

	// Iterator returns an iterator over a given database's compaction level.
	Iterator(ctx context.Context, cluster, db string, level int) (StoragePathIterator, error)
}

StorageClient represents a client for long-term storage.

type StorageMetadata

type StorageMetadata struct {
	PageSize          uint32       // size of the pages in the file
	Commit            uint32       // size of the database after applying, in pages
	Timestamp         time.Time    // timestamp of when the transaction was committed
	PreApplyChecksum  ltx.Checksum // checksum of the database before applying LTX file
	PostApplyChecksum ltx.Checksum // checksum of the database after applying LTX file
}

StorageMetadata represents metadata associated with a StoragePath. This is data that is optionally stored with the path but is not a part of the formatted pathname.

func NewStorageMetadataFromLTX

func NewStorageMetadataFromLTX(header ltx.Header, trailer ltx.Trailer) StorageMetadata

NewStorageMetadataFromLTX returns storage metadata from an LTX file's header & trailer.

func (StorageMetadata) IsZero

func (m StorageMetadata) IsZero() bool

IsZero returns true if all the fields are zero values.

type StoragePath

type StoragePath struct {
	Cluster  string
	Database string
	Level    int
	MinTXID  ltx.TXID
	MaxTXID  ltx.TXID

	Metadata  StorageMetadata // LTX-based metadata
	Size      int64           // file size
	CreatedAt time.Time       // timestamp of when the file was created (determined by storage system)
}

StoragePath represents an path identifier for a file in long-term storage.

func CalcChangesPlan

func CalcChangesPlan(ctx context.Context, client StorageClient, cluster, database string, maxLevel int, txID ltx.TXID) ([]StoragePath, error)

CalcChangesPlan returns a list of storage paths needed to query a list of changed DB pages from the given txID up until the latest available DB TX.

func CalcSnapshotPlan

func CalcSnapshotPlan(ctx context.Context, client StorageClient, cluster, database string, txID ltx.TXID) ([]StoragePath, error)

CalcSnapshotPlan returns a list of storage paths to create a snapshot at the given TXID.

func FindStoragePaths

func FindStoragePaths(ctx context.Context, client StorageClient, cluster, database string, level int, filter func(StoragePath) (bool, error)) ([]StoragePath, error)

FindStoragePaths returns a list of storage paths that match filter.

func MaxStoragePath

func MaxStoragePath(ctx context.Context, client StorageClient, cluster, database string, level int) (StoragePath, error)

MaxStoragePath returns the last storage path for a level. Returns a zero value if no storage files exists in the level.

func NewStoragePath

func NewStoragePath(cluster, database string, level int, minTXID, maxTXID ltx.TXID) StoragePath

NewStoragePath returns a new instance of StoragePath.

func (StoragePath) Equal

func (p StoragePath) Equal(other StoragePath) bool

Equal returns true if the path portion of p equals other.

func (StoragePath) IsZero

func (p StoragePath) IsZero() bool

IsZero returns true if p is the zero value.

func (StoragePath) String

func (p StoragePath) String() string

String returns the string representation of p.

func (*StoragePath) Validate

func (p *StoragePath) Validate() error

Validate returns nil if the path is valid.

type StoragePathIterator

type StoragePathIterator interface {
	io.Closer

	// NextStoragePath returns the next available storage path.
	// Returns io.EOF when no more paths are available.
	NextStoragePath(ctx context.Context) (StoragePath, error)
}

StoragePathIterator iterates over a sorted list of storage paths.

type StoragePathSlice

type StoragePathSlice []StoragePath

StoragePathSlice represents a slice of storage paths.

func (StoragePathSlice) MaxTXID

func (a StoragePathSlice) MaxTXID() ltx.TXID

MaxTXID returns the MaxTXID of the last element in the slice. Returns zero if the slice is empty.

type StoragePathSliceIterator

type StoragePathSliceIterator struct {
	// contains filtered or unexported fields
}

StoragePathSliceIterator iterates over a slice of storage files.

func NewStoragePathSliceIterator

func NewStoragePathSliceIterator(a []StoragePath) *StoragePathSliceIterator

NewStoragePathSliceIterator returns a new instance of StoragePathSliceIterator.

func (*StoragePathSliceIterator) Close

func (itr *StoragePathSliceIterator) Close() error

Close is a no-op.

func (*StoragePathSliceIterator) NextStoragePath

func (itr *StoragePathSliceIterator) NextStoragePath(ctx context.Context) (StoragePath, error)

NextStoragePath returns the next available storage path. Returns io.EOF when no more paths are available.

type Store

type Store struct {
	RemoteClient StorageClient

	Levels CompactionLevels

	Now func() time.Time

	// If true, compactions are run in background goroutines.
	// This is typically disabled for testing purposes.
	CompactionEnabled bool

	// Time between snapshots
	SnapshotInterval time.Duration
	// contains filtered or unexported fields
}

func NewStore

func NewStore(config *lfsb.Config) *Store

func (*Store) CompactDBToLevel

func (s *Store) CompactDBToLevel(ctx context.Context, callerTx *sql.Tx, cluster, database string, dstLevel int) (StoragePath, error)

CompactDBToLevel compacts all transaction files from the next lower level since the last TXID in the level to be compacted. This ensures that higher level compactions line up on TXID with lower levels and makes it easier to compute the list of files needed to perform a restore.

Returns the path of the newly compacted storage path. Returns ENOCOMPACTION if no compaction occurred. Returns EPARTIALCOMPACTION if there were too many files to compact.

func (*Store) DeleteCluster

func (s *Store) DeleteCluster(ctx context.Context, cluster string) error

func (*Store) DropDB

func (s *Store) DropDB(ctx context.Context, cluster, database string) error

DropDB hard deletes the database from the shard database and remote storage.

func (*Store) EnforceRemoteRetention

func (s *Store) EnforceRemoteRetention(ctx context.Context, cluster, database string, level int, maxTXID ltx.TXID) error

EnforceRemoteRetention removes all files before a given TXID that are past the retention period.

func (*Store) FindClusters

func (s *Store) FindClusters(ctx context.Context) ([]string, error)

FindClusters returns a list of all clusters in the store.

func (*Store) FindDBByName

func (s *Store) FindDBByName(ctx context.Context, cluster, name string) (*DB, error)

FindDBByName returns a database entry by cluster/name.

func (*Store) FindDBsByCluster

func (s *Store) FindDBsByCluster(ctx context.Context, cluster string) ([]*DB, error)

func (*Store) FindStoragePathByTimestamp

func (s *Store) FindStoragePathByTimestamp(ctx context.Context, cluster, database string, timestamp time.Time) (StoragePath, error)

func (*Store) FindTXIDByTimestamp

func (s *Store) FindTXIDByTimestamp(ctx context.Context, cluster, database string, timestamp time.Time) (ltx.TXID, error)

FindTXIDByTimestamp returns the TX closest to the given timestamp.

func (*Store) Info

func (s *Store) Info(ctx context.Context, cluster, database string, all bool) (*DBInfo, error)

func (*Store) NextCompactionAt

func (s *Store) NextCompactionAt(level int) time.Time

NextCompactionAt returns the time until the next compaction occurs.

func (*Store) Open

func (s *Store) Open(ctx context.Context) error

func (*Store) Path

func (s *Store) Path() string

Path returns the root data directory.

func (*Store) ProcessCompactions

func (s *Store) ProcessCompactions(ctx context.Context, dstLevel int) error

ProcessCompactions attempts to process all compaction requests at the given level. Any individual compaction failures are logged.

func (*Store) RestoreToTx

func (s *Store) RestoreToTx(ctx context.Context, cluster, database string, txID ltx.TXID) (ltx.TXID, error)

RestoreToTx restores the database to the give TX ID.

func (*Store) StoreDatabase

func (s *Store) StoreDatabase(ctx context.Context, cluster, database string, source io.Reader) (ltx.Pos, error)

StoreDatabase stores a SQLite database read from rd as a single LTX file on top of any existing LTX files.

func (*Store) TempDir

func (s *Store) TempDir() string

TempDir returns a "tmp" directory under the data directory for temporary files.

func (*Store) WriteDatabaseTo

func (s *Store) WriteDatabaseTo(ctx context.Context, cluster, name string, txID ltx.TXID, w io.Writer) error

WriteDatabaseTo writes a database decoded from an LTX snapshot to the given writer.

func (*Store) WriteLTXPageRangeFrom

func (s *Store) WriteLTXPageRangeFrom(ctx context.Context, cluster, database string, minTXID ltx.TXID, w io.Writer) (ltx.Header, ltx.Trailer, error)

WriteLTXPageRangeFrom writes a compacted LTX file for all updated pages since minTXID inclusive. Returns the maximum transaction ID read up to. Returns ENOCOMPACTION if no transactions found.

func (*Store) WriteLocalSnapshotTo

func (s *Store) WriteLocalSnapshotTo(ctx context.Context, cluster, database string, txID ltx.TXID, w io.Writer) error

WriteLocalSnapshotTo attempts to read the snapshot out from the local sqlite store.

func (*Store) WriteRemoteSnapshotTo

func (s *Store) WriteRemoteSnapshotTo(ctx context.Context, cluster, name string, txID ltx.TXID, w io.Writer) error

WriteRemoteSnapshotTo writes a snapshot (from remote storage only) at a given TXID to w.

func (*Store) WriteSnapshotTo

func (s *Store) WriteSnapshotTo(ctx context.Context, cluster, database string, txID ltx.TXID, w io.Writer) error

WriteSnapshotTo writes the snapshot at a given TXID to w.

func (*Store) WriteTx

func (s *Store) WriteTx(ctx context.Context, cluster, database string, r io.Reader, opt *WriteTxOptions) (ltx.Pos, error)

WriteTx appends an LTX file to a database.

type Txn

type Txn struct {
	DBID              int
	MinTXID           ltx.TXID
	MaxTXID           ltx.TXID
	PageSize          uint32
	Commit            uint32
	Timestamp         time.Time
	PreApplyChecksum  ltx.Checksum
	PostApplyChecksum ltx.Checksum
}

Txn represents a row in the "txns" table in the data file.

func (*Txn) PostApplyPos

func (txn *Txn) PostApplyPos() ltx.Pos

PostApplyPos returns the replication position after the txn is applied.

func (*Txn) PreApplyPos

func (txn *Txn) PreApplyPos() ltx.Pos

PreApplyPos returns the replication position before the txn is applied.

type WriteTxOptions

type WriteTxOptions struct {
	AppendSnapshot bool
	Timestamp      time.Time
}

type WriteTxPage

type WriteTxPage struct {
	Pgno uint32
	Data []byte
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL