Documentation
¶
Index ¶
- Constants
- Variables
- func AttachMetadata(ctx context.Context, client StorageClient, paths []StoragePath, ...) error
- func CompareStoragePath(a, b *StoragePath) int
- func ForEachStorageCluster(ctx context.Context, client StorageClient, fn func(cluster string) error) error
- func ForEachStorageDatabase(ctx context.Context, client StorageClient, ...) error
- func FormatStorageLevelDir(cluster, database string, level int, sep rune) (_ string, retErr error)
- func FormatStoragePath(p StoragePath, sep rune) (_ string, retErr error)
- func NewCompactorFromPaths(ctx context.Context, client StorageClient, paths []StoragePath, w io.Writer) (c *ltx.Compactor, close func() error, retErr error)
- func StorageTXIDRange(ctx context.Context, client StorageClient, cluster, database string, level int) (min, max ltx.TXID, err error)
- type ChangeSet
- type CompactionLevel
- type CompactionLevels
- type CompactionRequest
- type DB
- type DBInfo
- type DBTX
- type Page
- type StorageClient
- type StorageMetadata
- type StoragePath
- func CalcChangesPlan(ctx context.Context, client StorageClient, cluster, database string, ...) ([]StoragePath, error)
- func CalcSnapshotPlan(ctx context.Context, client StorageClient, cluster, database string, ...) ([]StoragePath, error)
- func FindStoragePaths(ctx context.Context, client StorageClient, cluster, database string, level int, ...) ([]StoragePath, error)
- func MaxStoragePath(ctx context.Context, client StorageClient, cluster, database string, level int) (StoragePath, error)
- func NewStoragePath(cluster, database string, level int, minTXID, maxTXID ltx.TXID) StoragePath
- type StoragePathIterator
- type StoragePathSlice
- type StoragePathSliceIterator
- type Store
- func (s *Store) CompactDBToLevel(ctx context.Context, callerTx *sql.Tx, cluster, database string, dstLevel int) (StoragePath, error)
- func (s *Store) DeleteCluster(ctx context.Context, cluster string) error
- func (s *Store) DropDB(ctx context.Context, cluster, database string) error
- func (s *Store) EnforceRemoteRetention(ctx context.Context, cluster, database string, level int, maxTXID ltx.TXID) error
- func (s *Store) FindClusters(ctx context.Context) ([]string, error)
- func (s *Store) FindDBByName(ctx context.Context, cluster, name string) (*DB, error)
- func (s *Store) FindDBsByCluster(ctx context.Context, cluster string) ([]*DB, error)
- func (s *Store) FindStoragePathByTimestamp(ctx context.Context, cluster, database string, timestamp time.Time) (StoragePath, error)
- func (s *Store) FindTXIDByTimestamp(ctx context.Context, cluster, database string, timestamp time.Time) (ltx.TXID, error)
- func (s *Store) Info(ctx context.Context, cluster, database string, all bool) (*DBInfo, error)
- func (s *Store) NextCompactionAt(level int) time.Time
- func (s *Store) Open(ctx context.Context) error
- func (s *Store) Path() string
- func (s *Store) ProcessCompactions(ctx context.Context, dstLevel int) error
- func (s *Store) RestoreToTx(ctx context.Context, cluster, database string, txID ltx.TXID) (ltx.TXID, error)
- func (s *Store) StoreDatabase(ctx context.Context, cluster, database string, source io.Reader) (ltx.Pos, error)
- func (s *Store) TempDir() string
- func (s *Store) WriteDatabaseTo(ctx context.Context, cluster, name string, txID ltx.TXID, w io.Writer) error
- func (s *Store) WriteLTXPageRangeFrom(ctx context.Context, cluster, database string, minTXID ltx.TXID, w io.Writer) (ltx.Header, ltx.Trailer, error)
- func (s *Store) WriteLocalSnapshotTo(ctx context.Context, cluster, database string, txID ltx.TXID, w io.Writer) error
- func (s *Store) WriteRemoteSnapshotTo(ctx context.Context, cluster, name string, txID ltx.TXID, w io.Writer) error
- func (s *Store) WriteSnapshotTo(ctx context.Context, cluster, database string, txID ltx.TXID, w io.Writer) error
- func (s *Store) WriteTx(ctx context.Context, cluster, database string, r io.Reader, ...) (ltx.Pos, error)
- type Txn
- type WriteTxOptions
- type WriteTxPage
Constants ¶
const ( SQLITE_DATABASE_HEADER_STRING = "SQLite format 3\x00" SQLITE_DATABASE_HEADER_SIZE = 100 )
const ( // CompactionLevelRestoreTarget is the default compaction level used to find an LTX file // suitable for restore to specific timestamp. // // This value corresponds with the granularity that restore requests can see. // Levels below CompactionLevelRestoreTarget effectively // become used for better write durability (writes after the L1 // 10s window will be in durable storage) instead of for restorability. // // We choose a restore target higher than L1 to improve restore speed and usability, // since pulling many small L1 (default 10s granularity) files may take a while. CompactionLevelRestoreTarget = 2 )
const (
MaxCompactionFileN = 100
)
Variables ¶
var ErrStopIter = errors.New("stop iter")
ErrStopIter indicates that the ongoing iteration should be aborted
Functions ¶
func AttachMetadata ¶
func AttachMetadata(ctx context.Context, client StorageClient, paths []StoragePath, parallelism int) error
AttachMetadata fetches metadata for every path in parallel.
func CompareStoragePath ¶
func CompareStoragePath(a, b *StoragePath) int
CompareStoragePath returns an integer comparing two paths. The result will be 0 if a == b, -1 if a < b, and +1 if a > b.
func ForEachStorageCluster ¶
func ForEachStorageCluster(ctx context.Context, client StorageClient, fn func(cluster string) error) error
ForEachStorageCluster executes fn once for each cluster in storage.
func ForEachStorageDatabase ¶
func ForEachStorageDatabase(ctx context.Context, client StorageClient, fn func(cluster, database string) error) error
ForEachStorageDatabase executes fn once for each database in storage.
func FormatStorageLevelDir ¶
FormatStorageLevelDir formats path to the storage level.
func FormatStoragePath ¶
func FormatStoragePath(p StoragePath, sep rune) (_ string, retErr error)
FormatStoragePath formats path into a string path.
func NewCompactorFromPaths ¶
func NewCompactorFromPaths(ctx context.Context, client StorageClient, paths []StoragePath, w io.Writer) (c *ltx.Compactor, close func() error, retErr error)
NewCompactorFromPaths returns a compactor from a set of storage paths.
func StorageTXIDRange ¶
func StorageTXIDRange(ctx context.Context, client StorageClient, cluster, database string, level int) (min, max ltx.TXID, err error)
StorageTXIDRange returns the minimum & maximum TXID for a compaction level.
Types ¶
type ChangeSet ¶
type ChangeSet map[uint32]struct{}
ChangeSet represents a set of changed database pages.
func NewChangeSetFromPaths ¶
func NewChangeSetFromPaths(ctx context.Context, client StorageClient, paths []StoragePath) (ChangeSet, error)
NewChangeSetFromPaths computes a set of changed pages using the provided LTX files.
type CompactionLevel ¶
type CompactionLevel struct {
// The numeric level. Must match the index in the list of levels.
Level int
// The frequency that the level is compacted from the previous level.
Interval time.Duration
// The duration that files in this level are stored. This should be
// set higher than the interval of level-1, with some safety margin.
Retention time.Duration
}
CompactionLevel represents a compaction level. You may want to tweak these values alongside CompactionLevelRestoreTarget, CompactionLevelSnapshot, and CompactionLevelMax.
Level 0 always refers to the on-disk sqlite db. Level 1 through CompactionLevelMax are available for arbitrary configuration. Level 9 always refers to the full database snapshot level.
func (*CompactionLevel) NextCompactionAt ¶
func (lvl *CompactionLevel) NextCompactionAt(now time.Time) time.Time
NextCompactionAt returns the time until the next compaction occurs. Returns the current time if it is exactly a multiple of the level interval.
type CompactionLevels ¶
type CompactionLevels []*CompactionLevel
CompactionLevels represents a sorted slice of non-snapshot compaction levels.
func (CompactionLevels) IsValidLevel ¶
func (a CompactionLevels) IsValidLevel(level int) bool
IsValidLevel returns true if level is a valid compaction level number.
func (CompactionLevels) MaxLevel ¶
func (a CompactionLevels) MaxLevel() int
MaxLevel return the highest non-snapshot compaction level.
func (CompactionLevels) NextLevel ¶
func (a CompactionLevels) NextLevel(level int) int
NextLevel returns the next compaction level. Returns -1 if there is no next level.
func (CompactionLevels) PrevLevel ¶
func (a CompactionLevels) PrevLevel(level int) int
PrevLevel returns the previous compaction level. Returns -1 if there is no previous level.
func (CompactionLevels) Validate ¶
func (a CompactionLevels) Validate() error
Validate returns an error if the levels are invalid.
type CompactionRequest ¶
CompactionRequest represents a request for a database to be compacted at a given level in the future. It contains an idempotency key as it needs to avoid the race condition where a new compaction request occurs while the request is being processed.
type DB ¶
type DB struct {
ID int
Cluster string
Name string
HWM ltx.TXID
TXID ltx.TXID // derived field
PostApplyChecksum ltx.Checksum // derived field
PageSize uint32 // derived field
Commit uint32 // derived field
Timestamp time.Time // derived field
}
DB represents a database row in the shard.
type DBInfo ¶
type DBInfo struct {
Name string
RestorablePaths []StoragePath
}
DBInfo holds basic info about a single database.
type Page ¶
type Page struct {
DBID int
Pgno uint32
MinTXID ltx.TXID
MaxTXID ltx.TXID
Chksum uint64
Nonce []byte
Tag []byte
Data []byte
}
Page returns a page that is valid between a transaction range.
type StorageClient ¶
type StorageClient interface {
io.Closer
// Clusters returns a list of clusters within an org.
Clusters(ctx context.Context) ([]string, error)
// Databases returns a list of databases within a cluster.
Databases(ctx context.Context, cluster string) ([]string, error)
// Levels returns a list of levels for a database.
Levels(ctx context.Context, cluster, database string) ([]int, error)
// Metadata returns the metadata for a given path.
Metadata(ctx context.Context, path StoragePath) (StorageMetadata, error)
// OpenFile returns a reader for a specific file. Returns os.NotExist if not found.
OpenFile(ctx context.Context, path StoragePath) (io.ReadCloser, error)
// WriteFile writes the contents of r to a path in long-term storage.
WriteFile(ctx context.Context, path StoragePath, r io.Reader) error
// DeleteFiles removes one or more files from storage.
DeleteFiles(ctx context.Context, paths []StoragePath) error
// Iterator returns an iterator over a given database's compaction level.
Iterator(ctx context.Context, cluster, db string, level int) (StoragePathIterator, error)
}
StorageClient represents a client for long-term storage.
type StorageMetadata ¶
type StorageMetadata struct {
PageSize uint32 // size of the pages in the file
Commit uint32 // size of the database after applying, in pages
Timestamp time.Time // timestamp of when the transaction was committed
PreApplyChecksum ltx.Checksum // checksum of the database before applying LTX file
PostApplyChecksum ltx.Checksum // checksum of the database after applying LTX file
}
StorageMetadata represents metadata associated with a StoragePath. This is data that is optionally stored with the path but is not a part of the formatted pathname.
func NewStorageMetadataFromLTX ¶
func NewStorageMetadataFromLTX(header ltx.Header, trailer ltx.Trailer) StorageMetadata
NewStorageMetadataFromLTX returns storage metadata from an LTX file's header & trailer.
func (StorageMetadata) IsZero ¶
func (m StorageMetadata) IsZero() bool
IsZero returns true if all the fields are zero values.
type StoragePath ¶
type StoragePath struct {
Cluster string
Database string
Level int
MinTXID ltx.TXID
MaxTXID ltx.TXID
Metadata StorageMetadata // LTX-based metadata
Size int64 // file size
CreatedAt time.Time // timestamp of when the file was created (determined by storage system)
}
StoragePath represents an path identifier for a file in long-term storage.
func CalcChangesPlan ¶
func CalcChangesPlan(ctx context.Context, client StorageClient, cluster, database string, maxLevel int, txID ltx.TXID) ([]StoragePath, error)
CalcChangesPlan returns a list of storage paths needed to query a list of changed DB pages from the given txID up until the latest available DB TX.
func CalcSnapshotPlan ¶
func CalcSnapshotPlan(ctx context.Context, client StorageClient, cluster, database string, txID ltx.TXID) ([]StoragePath, error)
CalcSnapshotPlan returns a list of storage paths to create a snapshot at the given TXID.
func FindStoragePaths ¶
func FindStoragePaths(ctx context.Context, client StorageClient, cluster, database string, level int, filter func(StoragePath) (bool, error)) ([]StoragePath, error)
FindStoragePaths returns a list of storage paths that match filter.
func MaxStoragePath ¶
func MaxStoragePath(ctx context.Context, client StorageClient, cluster, database string, level int) (StoragePath, error)
MaxStoragePath returns the last storage path for a level. Returns a zero value if no storage files exists in the level.
func NewStoragePath ¶
func NewStoragePath(cluster, database string, level int, minTXID, maxTXID ltx.TXID) StoragePath
NewStoragePath returns a new instance of StoragePath.
func (StoragePath) Equal ¶
func (p StoragePath) Equal(other StoragePath) bool
Equal returns true if the path portion of p equals other.
func (StoragePath) IsZero ¶
func (p StoragePath) IsZero() bool
IsZero returns true if p is the zero value.
func (StoragePath) String ¶
func (p StoragePath) String() string
String returns the string representation of p.
func (*StoragePath) Validate ¶
func (p *StoragePath) Validate() error
Validate returns nil if the path is valid.
type StoragePathIterator ¶
type StoragePathIterator interface {
io.Closer
// NextStoragePath returns the next available storage path.
// Returns io.EOF when no more paths are available.
NextStoragePath(ctx context.Context) (StoragePath, error)
}
StoragePathIterator iterates over a sorted list of storage paths.
type StoragePathSlice ¶
type StoragePathSlice []StoragePath
StoragePathSlice represents a slice of storage paths.
func (StoragePathSlice) MaxTXID ¶
func (a StoragePathSlice) MaxTXID() ltx.TXID
MaxTXID returns the MaxTXID of the last element in the slice. Returns zero if the slice is empty.
type StoragePathSliceIterator ¶
type StoragePathSliceIterator struct {
// contains filtered or unexported fields
}
StoragePathSliceIterator iterates over a slice of storage files.
func NewStoragePathSliceIterator ¶
func NewStoragePathSliceIterator(a []StoragePath) *StoragePathSliceIterator
NewStoragePathSliceIterator returns a new instance of StoragePathSliceIterator.
func (*StoragePathSliceIterator) Close ¶
func (itr *StoragePathSliceIterator) Close() error
Close is a no-op.
func (*StoragePathSliceIterator) NextStoragePath ¶
func (itr *StoragePathSliceIterator) NextStoragePath(ctx context.Context) (StoragePath, error)
NextStoragePath returns the next available storage path. Returns io.EOF when no more paths are available.
type Store ¶
type Store struct {
RemoteClient StorageClient
Levels CompactionLevels
Now func() time.Time
// If true, compactions are run in background goroutines.
// This is typically disabled for testing purposes.
CompactionEnabled bool
// Time between snapshots
SnapshotInterval time.Duration
// contains filtered or unexported fields
}
func (*Store) CompactDBToLevel ¶
func (s *Store) CompactDBToLevel(ctx context.Context, callerTx *sql.Tx, cluster, database string, dstLevel int) (StoragePath, error)
CompactDBToLevel compacts all transaction files from the next lower level since the last TXID in the level to be compacted. This ensures that higher level compactions line up on TXID with lower levels and makes it easier to compute the list of files needed to perform a restore.
Returns the path of the newly compacted storage path. Returns ENOCOMPACTION if no compaction occurred. Returns EPARTIALCOMPACTION if there were too many files to compact.
func (*Store) DeleteCluster ¶
func (*Store) EnforceRemoteRetention ¶
func (s *Store) EnforceRemoteRetention(ctx context.Context, cluster, database string, level int, maxTXID ltx.TXID) error
EnforceRemoteRetention removes all files before a given TXID that are past the retention period.
func (*Store) FindClusters ¶
FindClusters returns a list of all clusters in the store.
func (*Store) FindDBByName ¶
FindDBByName returns a database entry by cluster/name.
func (*Store) FindDBsByCluster ¶
func (*Store) FindStoragePathByTimestamp ¶
func (*Store) FindTXIDByTimestamp ¶
func (s *Store) FindTXIDByTimestamp(ctx context.Context, cluster, database string, timestamp time.Time) (ltx.TXID, error)
FindTXIDByTimestamp returns the TX closest to the given timestamp.
func (*Store) NextCompactionAt ¶
NextCompactionAt returns the time until the next compaction occurs.
func (*Store) ProcessCompactions ¶
ProcessCompactions attempts to process all compaction requests at the given level. Any individual compaction failures are logged.
func (*Store) RestoreToTx ¶
func (s *Store) RestoreToTx(ctx context.Context, cluster, database string, txID ltx.TXID) (ltx.TXID, error)
RestoreToTx restores the database to the give TX ID.
func (*Store) StoreDatabase ¶
func (s *Store) StoreDatabase(ctx context.Context, cluster, database string, source io.Reader) (ltx.Pos, error)
StoreDatabase stores a SQLite database read from rd as a single LTX file on top of any existing LTX files.
func (*Store) TempDir ¶
TempDir returns a "tmp" directory under the data directory for temporary files.
func (*Store) WriteDatabaseTo ¶
func (s *Store) WriteDatabaseTo(ctx context.Context, cluster, name string, txID ltx.TXID, w io.Writer) error
WriteDatabaseTo writes a database decoded from an LTX snapshot to the given writer.
func (*Store) WriteLTXPageRangeFrom ¶
func (s *Store) WriteLTXPageRangeFrom(ctx context.Context, cluster, database string, minTXID ltx.TXID, w io.Writer) (ltx.Header, ltx.Trailer, error)
WriteLTXPageRangeFrom writes a compacted LTX file for all updated pages since minTXID inclusive. Returns the maximum transaction ID read up to. Returns ENOCOMPACTION if no transactions found.
func (*Store) WriteLocalSnapshotTo ¶
func (s *Store) WriteLocalSnapshotTo(ctx context.Context, cluster, database string, txID ltx.TXID, w io.Writer) error
WriteLocalSnapshotTo attempts to read the snapshot out from the local sqlite store.
func (*Store) WriteRemoteSnapshotTo ¶
func (s *Store) WriteRemoteSnapshotTo(ctx context.Context, cluster, name string, txID ltx.TXID, w io.Writer) error
WriteRemoteSnapshotTo writes a snapshot (from remote storage only) at a given TXID to w.
type Txn ¶
type Txn struct {
DBID int
MinTXID ltx.TXID
MaxTXID ltx.TXID
PageSize uint32
Commit uint32
Timestamp time.Time
PreApplyChecksum ltx.Checksum
PostApplyChecksum ltx.Checksum
}
Txn represents a row in the "txns" table in the data file.
func (*Txn) PostApplyPos ¶
PostApplyPos returns the replication position after the txn is applied.
func (*Txn) PreApplyPos ¶
PreApplyPos returns the replication position before the txn is applied.