Documentation
¶
Overview ¶
Package posting takes care of posting lists. It contains logic for mutation layers, merging them with BadgerDB, etc.
Index ¶
- Constants
- Variables
- func Cleanup()
- func DeleteAll() error
- func DeleteAllForNs(ns uint64) error
- func DeleteData(ns uint64) error
- func DeleteNamespace(ns uint64) error
- func DeletePredicate(ctx context.Context, attr string, ts uint64) error
- func FromBackupPostingList(bl *pb.BackupPostingList) *pb.PostingList
- func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64
- func Init(ps *badger.DB, cacheSize int64, removeOnUpdate bool)
- func MarshalPostingList(plist *pb.PostingList, alloc *z.Allocator) *bpb.KV
- func NewPosting(t *pb.DirectedEdge) *pb.Posting
- func NewViLocalCache(delegate *LocalCache) *viLocalCache
- func NewViTxn(delegate *Txn) *viTxn
- func Oracle() *oracle
- func RemoveCacheFor(key []byte)
- func ResetCache()
- func SetEnabledDetailedMetrics(enableMetrics bool)
- func TypeID(edge *pb.DirectedEdge) types.TypeID
- func UpdateMaxCost(maxCost int64)
- type Cache
- type CachePL
- type EqContainer
- type IndexRebuild
- type List
- func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error
- func (l *List) AllUntaggedValues(readTs uint64) ([]types.Val, error)
- func (l *List) AllValues(readTs uint64) ([]types.Val, error)
- func (l *List) ApproxLen() int
- func (l *List) DeepSize() uint64
- func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string, listType bool) ([]*pb.Facets, error)
- func (l *List) FindPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posting, err error)
- func (l *List) GetLangTags(readTs uint64) ([]string, error)
- func (l *List) GetLength(readTs uint64) int
- func (l *List) GetPosting(startTs uint64) *pb.PostingList
- func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error)
- func (l *List) Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error
- func (l *List) Length(readTs, afterUid uint64) int
- func (l *List) PartSplits() []uint64
- func (l *List) PostingFor(readTs uint64, langs []string) (p *pb.Posting, rerr error)
- func (l *List) Postings(opt ListOptions, postFn func(*pb.Posting) error) error
- func (l *List) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error)
- func (l *List) SetTs(readTs uint64)
- func (l *List) StaticValue(readTs uint64) (*pb.PostingList, error)
- func (l *List) ToBackupPostingList(bl *pb.BackupPostingList, alloc *z.Allocator, buf *z.Buffer) (*bpb.KV, error)
- func (l *List) Uids(opt ListOptions) (*pb.List, error)
- func (l *List) Value(readTs uint64) (rval types.Val, rerr error)
- func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error)
- func (l *List) ValueForTag(readTs uint64, tag string) (rval types.Val, rerr error)
- func (l *List) ValueWithLockHeld(readTs uint64) (rval types.Val, rerr error)
- type ListOptions
- type LocalCache
- func (lc *LocalCache) Find(pred []byte, filter func([]byte) bool) (uint64, error)
- func (lc *LocalCache) Get(key []byte) (*List, error)
- func (lc *LocalCache) GetFromDelta(key []byte) (*List, error)
- func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error)
- func (lc *LocalCache) GetUids(key []byte) (*List, error)
- func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List
- func (lc *LocalCache) UpdateCommitTs(commitTs uint64)
- func (lc *LocalCache) UpdateDeltasAndDiscardLists()
- type MemoryLayer
- type MutableLayer
- type Options
- type StatContainer
- type StatsHolder
- type Txn
- func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error
- func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32, isErrored bool)
- func (txn *Txn) Get(key []byte) (*List, error)
- func (txn *Txn) GetFromDelta(key []byte) (*List, error)
- func (txn *Txn) GetScalarList(key []byte) (*List, error)
- func (txn *Txn) ShouldAbort() bool
- func (txn *Txn) Store(pl *List) *List
- func (txn *Txn) Update()
- func (txn *Txn) UpdateCachedKeys(commitTs uint64)
- type TxnWriter
Constants ¶
const ( // Set means set in mutation layer. It contributes 1 in Length. Set uint32 = 0x01 // Del means delete in mutation layer. It contributes -1 in Length. Del uint32 = 0x02 // Ovr means overwrite in mutation layer. It contributes 0 in Length. Ovr uint32 = 0x03 // BitSchemaPosting signals that the value stores a schema or type. BitSchemaPosting byte = 0x01 // BitDeltaPosting signals that the value stores the delta of a posting list. BitDeltaPosting byte = 0x04 // BitCompletePosting signals that the values stores a complete posting list. BitCompletePosting byte = 0x08 // BitEmptyPosting signals that the value stores an empty posting list. BitEmptyPosting byte = 0x10 )
Variables ¶
var ( // ErrRetry can be triggered if the posting list got deleted from memory due to a hard commit. // In such a case, retry. ErrRetry = errors.New("Temporary error. Please retry") // ErrNoValue would be returned if no value was found in the posting list. ErrNoValue = errors.New("No value found") // ErrStopIteration is returned when an iteration is terminated early. ErrStopIteration = errors.New("Stop iteration") )
var ( // ErrTsTooOld is returned when a transaction is too old to be applied. ErrTsTooOld = errors.Errorf("Transaction is too old") // ErrInvalidKey is returned when trying to read a posting list using // an invalid key (e.g the key to a single part of a larger multi-part list). ErrInvalidKey = errors.Errorf("cannot read posting list using multi-part list key") // ErrHighPriorityOp is returned when rollup is cancelled so that operations could start. ErrHighPriorityOp = errors.New("Cancelled rollup to make way for high priority operation") // IncrRollup is used to batch keys for rollup incrementally. IncrRollup = &incrRollupi{ priorityKeys: make([]*pooledKeys, 2), } )
var (
EnableDetailedMetrics bool
)
Functions ¶
func DeleteAllForNs ¶
func DeleteData ¶
DeleteData deletes all data for the namespace but leaves types and schema intact.
func DeleteNamespace ¶
DeleteNamespace bans the namespace and deletes its predicates/types from the schema.
func DeletePredicate ¶
DeletePredicate deletes all entries and indices for a given predicate.
func FromBackupPostingList ¶
func FromBackupPostingList(bl *pb.BackupPostingList) *pb.PostingList
FromBackupPostingList converts a posting list in the format used for backups to a normal posting list.
func GetConflictKey ¶
func MarshalPostingList ¶
MarshalPostingList returns a KV with the marshalled posting list. The caller SHOULD SET the Key and Version for the returned KV.
func NewPosting ¶
func NewPosting(t *pb.DirectedEdge) *pb.Posting
NewPosting takes the given edge and returns its equivalent representation as a posting.
func NewViLocalCache ¶
func NewViLocalCache(delegate *LocalCache) *viLocalCache
func Oracle ¶
func Oracle() *oracle
Oracle returns the global oracle instance. TODO: Oracle should probably be located in worker package, instead of posting package now that we don't run inSnapshot anymore.
func RemoveCacheFor ¶
func RemoveCacheFor(key []byte)
RemoveCacheFor will delete the list corresponding to the given key.
func ResetCache ¶
func ResetCache()
func SetEnabledDetailedMetrics ¶
func SetEnabledDetailedMetrics(enableMetrics bool)
func TypeID ¶
func TypeID(edge *pb.DirectedEdge) types.TypeID
TypeID returns the typeid of destination vertex
func UpdateMaxCost ¶
func UpdateMaxCost(maxCost int64)
Types ¶
type CachePL ¶
type CachePL struct {
// contains filtered or unexported fields
}
func NewCachePL ¶
func NewCachePL() *CachePL
type EqContainer ¶
func NewEqContainer ¶
func NewEqContainer() *EqContainer
func (*EqContainer) Estimate ¶
func (eq *EqContainer) Estimate(key []byte) uint64
func (*EqContainer) InsertRecord ¶
func (eq *EqContainer) InsertRecord(key []byte, count uint64)
type IndexRebuild ¶
type IndexRebuild struct {
Attr string
StartTs uint64
OldSchema *pb.SchemaUpdate
CurrentSchema *pb.SchemaUpdate
}
IndexRebuild holds the info needed to initiate a rebuilt of the indices.
func (*IndexRebuild) BuildData ¶
func (rb *IndexRebuild) BuildData(ctx context.Context) error
BuildData updates data.
func (*IndexRebuild) BuildIndexes ¶
func (rb *IndexRebuild) BuildIndexes(ctx context.Context) error
BuildIndexes builds indexes.
func (*IndexRebuild) DropIndexes ¶
func (rb *IndexRebuild) DropIndexes(ctx context.Context) error
DropIndexes drops the indexes that need to be rebuilt.
func (*IndexRebuild) GetQuerySchema ¶
func (rb *IndexRebuild) GetQuerySchema() *pb.SchemaUpdate
GetQuerySchema returns the schema that can be served while indexes are getting built. Query schema is defined as current schema minus tokens to delete from current schema.
func (*IndexRebuild) NeedIndexRebuild ¶
func (rb *IndexRebuild) NeedIndexRebuild() bool
NeedIndexRebuild returns true if any of the tokenizer, reverse or count indexes need to be rebuilt.
type List ¶
List stores the in-memory representation of a posting list.
func GetNoStore ¶
GetNoStore returns the list stored in the key or creates a new one if it doesn't exist. It does not store the list in any cache.
func NewList ¶
func NewList(key []byte, plist *pb.PostingList, minTs uint64) *List
NewList returns a new list with an immutable layer set to plist and the timestamp of the immutable layer set to minTs.
func ReadPostingList ¶
ReadPostingList constructs the posting list from the disk using the passed iterator. Use forward iterator with allversions enabled in iter options. key would now be owned by the posting list. So, ensure that it isn't reused elsewhere.
func (*List) AddMutationWithIndex ¶
AddMutationWithIndex is addMutation with support for indexing. It also supports reverse edges.
func (*List) AllUntaggedValues ¶
AllUntaggedValues returns all the values in the posting list with no language tag.
func (*List) Facets ¶
func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string, listType bool) ([]*pb.Facets, error)
Facets gives facets for the posting representing value.
func (*List) FindPosting ¶
func (*List) GetLangTags ¶
GetLangTags finds the language tags of each posting in the list.
func (*List) GetPosting ¶
func (l *List) GetPosting(startTs uint64) *pb.PostingList
func (*List) IsEmpty ¶
IsEmpty returns true if there are no uids at the given timestamp after the given UID.
func (*List) Iterate ¶
Iterate will allow you to iterate over the mutable and immutable layers of this posting List, while having acquired a read lock. So, please keep this iteration cheap, otherwise mutations would get stuck. The iteration will start after the provided UID. The results would not include this uid. The function will loop until either the posting List is fully iterated, or you return a false in the provided function, which will indicate to the function to break out of the iteration.
pl.Iterate(..., func(p *pb.posting) error {
// Use posting p
return nil // to continue iteration.
return errStopIteration // to break iteration.
})
func (*List) PartSplits ¶
PartSplits returns an empty array if the list has not been split into multiple parts. Otherwise, it returns an array containing the start UID of each part.
func (*List) PostingFor ¶
PostingFor returns the posting according to the preferred language list.
func (*List) Postings ¶
Postings calls postFn with the postings that are common with UIDs in the opt ListOptions.
func (*List) Rollup ¶
Rollup performs the rollup process, merging the immutable and mutable layers and outputting the resulting list so it can be written to disk. During this process, the list might be split into multiple lists if the main list or any of the existing parts become too big.
A normal list has the following format: <key> -> <posting list with all the data for this list>
A multi-part list is stored in multiple keys. The keys for the parts will be generated by appending the first UID in the part to the key. The list will have the following format: <key> -> <posting list that includes no postings but a list of each part's start UID> <key, 1> -> <first part of the list with all the data for this part> <key, next start UID> -> <second part of the list with all the data for this part> ... <key, last start UID> -> <last part of the list with all its data>
The first part of a multi-part list always has start UID 1 and will be the last part to be deleted, at which point the entire list will be marked for deletion. As the list grows, existing parts might be split if they become too big.
You can provide a readTs for Rollup. This should either be math.MaxUint64, or it should be a timestamp that was resevered for rollup. This would ensure that we read only till that time. If read ts is provided, Once the rollup is done, we check the maximum timestamp. We store the results at that max timestamp + 1. This mechanism allows us to make sure that
Since we write at max timestamp + 1, we can side step any issues that arise by wal replay.
Earlier one of the solution was to write at ts + 1. It didn't work as index transactions don't conflict so they can get committed at consecutive timestamps. This leads to some data being overwriten by rollup.
No other transcation happens at readTs. This way we can be sure that we won't overwrite any transaction that happened.
Latest data. We wait until readTs - 1, so that we know that we are reading the latest data. If we read stale data, it can cause to delete some old transactions.
Even though we have reserved readTs for rollup, we don't store the data there. This is done so that the rollup is written as close as possible to actual data. This can cause issues if someone is reading data between two timestamps.
Drop operation can cause issues if they are rolled up. Since we are storing results at ts + 1, in dgraph.drop.op. When we do drop op, we delete the relevant data first using a mutation. Then we write a record into the dgraph.drop.op. We use this record to figure out if a drop was performed. This helps us during backup, when we want to know if we need to read a given backup or not. A backup, which has a drop record, would render older backups unnecessary.
If we rollup the dgraph.drop.op, and store result on ts + 1, it effectively copies the original record into a new location. We want to see if there can be any issues in backup/restore due to this. To ensure that there is no issue in writing on ts + 1, we do the following analysis.
Analysis is done for drop op, but it would be the same for drop predicate and namespace. Assume that there were two backups, at b1 and b2. We move rollup ts around to see if it can cause any issues. There can be 3 cases:
1. b1 < ts < b2. In this case, we would have a drop record in b2. This is the same behaviour as we would have writen on ts.
2. b1 = ts < b2. In this case, we would have a drop record in b1, and in b2. Originally, only b1 would have a drop record. With this new approach, b2 would also have a drop record. This is okay because last entry in b1 is drop, so it wouldn't have any data to be applied.
3. b1 < ts < ts + 1 = b2. In this case, we would have both drop drop records in b2. No issues in this case.
This proves that writing rollups at ts + 1 would not cause any issues with dgraph.drop.op. The only issue would come if a rollup happens at ts + k. If a backup happens in between ts and ts + k, it could lead to some data being dropped during restore.
func (*List) SetTs ¶
SetTs allows us to set the transaction timestamp in mutation map. Should be used before the posting list is passed on to the functions that would read the data.
func (*List) StaticValue ¶
func (l *List) StaticValue(readTs uint64) (*pb.PostingList, error)
func (*List) ToBackupPostingList ¶
func (l *List) ToBackupPostingList( bl *pb.BackupPostingList, alloc *z.Allocator, buf *z.Buffer) (*bpb.KV, error)
ToBackupPostingList uses rollup to generate a single list with no splits. It's used during backup so that each backed up posting list is stored in a single key.
func (*List) Uids ¶
func (l *List) Uids(opt ListOptions) (*pb.List, error)
Uids returns the UIDs given some query params. We have to apply the filtering before applying (offset, count). WARNING: Calling this function just to get UIDs is expensive
func (*List) Value ¶
Value returns the default value from the posting list. The default value is defined as the value without a language tag. Value cannot be used to read from cache
func (*List) ValueFor ¶
ValueFor returns a value from posting list, according to preferred language list. If list is empty, value without language is returned; if such value is not available, value with smallest UID is returned. If list consists of one or more languages, first available value is returned. If no language from the list matches the values, processing is the same as for empty list.
func (*List) ValueForTag ¶
ValueForTag returns the value in the posting list with the given language tag.
type ListOptions ¶
type ListOptions struct {
ReadTs uint64
AfterUid uint64 // Any UIDs returned must be after this value.
Intersect *pb.List // Intersect results with this list of UIDs.
First int
}
ListOptions is used in List.Uids (in posting) to customize our output list of UIDs, for each posting list. It should be pb.to this package.
type LocalCache ¶
LocalCache stores a cache of posting lists and deltas. This doesn't sync, so call this only when you don't care about dirty posting lists in memory(for example before populating snapshot) or after calling syncAllMarks
func NewLocalCache ¶
func NewLocalCache(startTs uint64) *LocalCache
NewLocalCache returns a new LocalCache instance.
func NoCache ¶
func NoCache(startTs uint64) *LocalCache
NoCache returns a new LocalCache instance, which won't cache anything. Useful to pass startTs around.
func (*LocalCache) Get ¶
func (lc *LocalCache) Get(key []byte) (*List, error)
Get retrieves the cached version of the list associated with the given key.
func (*LocalCache) GetFromDelta ¶
func (lc *LocalCache) GetFromDelta(key []byte) (*List, error)
GetFromDelta gets the cached version of the list without reading from disk and only applies the existing deltas. This is used in situations where the posting list will only be modified and not read (e.g adding index mutations).
func (*LocalCache) GetSinglePosting ¶
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error)
GetSinglePosting retrieves the cached version of the first item in the list associated with the given key. This is used for retrieving the value of a scalar predicats.
func (*LocalCache) SetIfAbsent ¶
func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List
SetIfAbsent adds the list for the specified key to the cache. If a list for the same key already exists, the cache will not be modified and the existing list will be returned instead. This behavior is meant to prevent the goroutines using the cache from ending up with an orphaned version of a list.
func (*LocalCache) UpdateCommitTs ¶
func (lc *LocalCache) UpdateCommitTs(commitTs uint64)
func (*LocalCache) UpdateDeltasAndDiscardLists ¶
func (lc *LocalCache) UpdateDeltasAndDiscardLists()
UpdateDeltasAndDiscardLists updates the delta cache before removing the stored posting lists.
type MemoryLayer ¶
type MemoryLayer struct {
// contains filtered or unexported fields
}
type MutableLayer ¶
type MutableLayer struct {
// contains filtered or unexported fields
}
MutableLayer is the structure that will store mutable layer of the posting list. Every posting list has an immutable layer and a mutable layer. Whenever posting is added into a list, it's added as deltas into the posting list. Once this list of deltas keep piling up, they are converted into a complete posting list through rollup and stored as immutable layer. Mutable layer contains all the deltas after the last complete posting list. Mutable Layer used to be a map from commitTs to PostingList. Every transaction that starts, gets its own copy of a posting list that it stores in the local cache of the txn. Everytime we make a copy of the postling list, we had to deep clone the map. If we give the same map by reference we start seeing concurrent writes and reads into the map causing issues. With this new MutableLayer struct, we know that committedEntries will not get changed and this can be copied by reference without any issues. This structure, makes it much faster to clone the Mutable Layer and be faster.
type Options ¶
Options contains options for the postings package.
var Config Options
Config stores the posting options of this instance.
type StatContainer ¶
type StatsHolder ¶
func GetStatsHolder ¶
func GetStatsHolder() *StatsHolder
func NewStatsHolder ¶
func NewStatsHolder() *StatsHolder
func (*StatsHolder) InsertRecord ¶
func (sh *StatsHolder) InsertRecord(pred string, key []byte, count uint64)
func (*StatsHolder) ProcessEqPredicate ¶
func (sh *StatsHolder) ProcessEqPredicate(pred string, key []byte) uint64
type Txn ¶
type Txn struct {
StartTs uint64
// Fields which can changed after init
sync.Mutex
// contains filtered or unexported fields
}
Txn represents a transaction.
func (*Txn) CommitToDisk ¶
CommitToDisk commits a transaction to disk. This function only stores deltas to the commit timestamps. It does not try to generate a state. State generation is done via rollups, which happen when a snapshot is created. Don't call this for schema mutations. Directly commit them.
func (*Txn) FillContext ¶
func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32, isErrored bool)
FillContext updates the given transaction context with data from this transaction.
func (*Txn) GetFromDelta ¶
GetFromDelta retrieves the posting list from delta cache, not from Badger.
func (*Txn) ShouldAbort ¶
ShouldAbort returns whether the transaction should be aborted.
func (*Txn) Update ¶
func (txn *Txn) Update()
Update calls UpdateDeltasAndDiscardLists on the local cache.
func (*Txn) UpdateCachedKeys ¶
RemoveCachedKeys will delete the cached list by this txn.
type TxnWriter ¶
type TxnWriter struct {
// contains filtered or unexported fields
}
TxnWriter is in charge or writing transactions to badger.
func NewTxnWriter ¶
func NewTxnWriter(db *badger.DB) *TxnWriter
NewTxnWriter returns a new TxnWriter instance.
func (*TxnWriter) Flush ¶
Flush waits until all operations are done and all data is written to disk.