posting

package
v25.0.0-custom-qt-impr... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2025 License: Apache-2.0 Imports: 41 Imported by: 2

Documentation

Overview

Package posting takes care of posting lists. It contains logic for mutation layers, merging them with BadgerDB, etc.

Index

Constants

View Source
const (
	// Set means set in mutation layer. It contributes 1 in Length.
	Set uint32 = 0x01
	// Del means delete in mutation layer. It contributes -1 in Length.
	Del uint32 = 0x02
	// Ovr means overwrite in mutation layer. It contributes 0 in Length.
	Ovr uint32 = 0x03

	// BitSchemaPosting signals that the value stores a schema or type.
	BitSchemaPosting byte = 0x01
	// BitDeltaPosting signals that the value stores the delta of a posting list.
	BitDeltaPosting byte = 0x04
	// BitCompletePosting signals that the values stores a complete posting list.
	BitCompletePosting byte = 0x08
	// BitEmptyPosting signals that the value stores an empty posting list.
	BitEmptyPosting byte = 0x10
)

Variables

View Source
var (
	// ErrRetry can be triggered if the posting list got deleted from memory due to a hard commit.
	// In such a case, retry.
	ErrRetry = errors.New("Temporary error. Please retry")
	// ErrNoValue would be returned if no value was found in the posting list.
	ErrNoValue = errors.New("No value found")
	// ErrStopIteration is returned when an iteration is terminated early.
	ErrStopIteration = errors.New("Stop iteration")
)
View Source
var (
	// ErrTsTooOld is returned when a transaction is too old to be applied.
	ErrTsTooOld = errors.Errorf("Transaction is too old")
	// ErrInvalidKey is returned when trying to read a posting list using
	// an invalid key (e.g the key to a single part of a larger multi-part list).
	ErrInvalidKey = errors.Errorf("cannot read posting list using multi-part list key")
	// ErrHighPriorityOp is returned when rollup is cancelled so that operations could start.
	ErrHighPriorityOp = errors.New("Cancelled rollup to make way for high priority operation")

	// IncrRollup is used to batch keys for rollup incrementally.
	IncrRollup = &incrRollupi{
		priorityKeys: make([]*pooledKeys, 2),
	}
)
View Source
var (
	EnableDetailedMetrics bool
)

Functions

func Cleanup

func Cleanup()

Cleanup waits until the closer has finished processing.

func DeleteAll

func DeleteAll() error

DeleteAll deletes all entries in the posting list.

func DeleteAllForNs

func DeleteAllForNs(ns uint64) error

func DeleteData

func DeleteData(ns uint64) error

DeleteData deletes all data for the namespace but leaves types and schema intact.

func DeleteNamespace

func DeleteNamespace(ns uint64) error

DeleteNamespace bans the namespace and deletes its predicates/types from the schema.

func DeletePredicate

func DeletePredicate(ctx context.Context, attr string, ts uint64) error

DeletePredicate deletes all entries and indices for a given predicate.

func FromBackupPostingList

func FromBackupPostingList(bl *pb.BackupPostingList) *pb.PostingList

FromBackupPostingList converts a posting list in the format used for backups to a normal posting list.

func GetConflictKey

func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64

func Init

func Init(ps *badger.DB, cacheSize int64, removeOnUpdate bool)

Init initializes the posting lists package, the in memory and dirty list hash.

func MarshalPostingList

func MarshalPostingList(plist *pb.PostingList, alloc *z.Allocator) *bpb.KV

MarshalPostingList returns a KV with the marshalled posting list. The caller SHOULD SET the Key and Version for the returned KV.

func NewPosting

func NewPosting(t *pb.DirectedEdge) *pb.Posting

NewPosting takes the given edge and returns its equivalent representation as a posting.

func NewViLocalCache

func NewViLocalCache(delegate *LocalCache) *viLocalCache

func NewViTxn

func NewViTxn(delegate *Txn) *viTxn

func Oracle

func Oracle() *oracle

Oracle returns the global oracle instance. TODO: Oracle should probably be located in worker package, instead of posting package now that we don't run inSnapshot anymore.

func RemoveCacheFor

func RemoveCacheFor(key []byte)

RemoveCacheFor will delete the list corresponding to the given key.

func ResetCache

func ResetCache()

func SetEnabledDetailedMetrics

func SetEnabledDetailedMetrics(enableMetrics bool)

func TypeID

func TypeID(edge *pb.DirectedEdge) types.TypeID

TypeID returns the typeid of destination vertex

func UpdateMaxCost

func UpdateMaxCost(maxCost int64)

Types

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

type CachePL

type CachePL struct {
	// contains filtered or unexported fields
}

func NewCachePL

func NewCachePL() *CachePL

func (*CachePL) Set

func (c *CachePL) Set(l *List, readTs uint64)

type EqContainer

type EqContainer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewEqContainer

func NewEqContainer() *EqContainer

func (*EqContainer) Estimate

func (eq *EqContainer) Estimate(key []byte) uint64

func (*EqContainer) InsertRecord

func (eq *EqContainer) InsertRecord(key []byte, count uint64)

type IndexRebuild

type IndexRebuild struct {
	Attr          string
	StartTs       uint64
	OldSchema     *pb.SchemaUpdate
	CurrentSchema *pb.SchemaUpdate
}

IndexRebuild holds the info needed to initiate a rebuilt of the indices.

func (*IndexRebuild) BuildData

func (rb *IndexRebuild) BuildData(ctx context.Context) error

BuildData updates data.

func (*IndexRebuild) BuildIndexes

func (rb *IndexRebuild) BuildIndexes(ctx context.Context) error

BuildIndexes builds indexes.

func (*IndexRebuild) DropIndexes

func (rb *IndexRebuild) DropIndexes(ctx context.Context) error

DropIndexes drops the indexes that need to be rebuilt.

func (*IndexRebuild) GetQuerySchema

func (rb *IndexRebuild) GetQuerySchema() *pb.SchemaUpdate

GetQuerySchema returns the schema that can be served while indexes are getting built. Query schema is defined as current schema minus tokens to delete from current schema.

func (*IndexRebuild) NeedIndexRebuild

func (rb *IndexRebuild) NeedIndexRebuild() bool

NeedIndexRebuild returns true if any of the tokenizer, reverse or count indexes need to be rebuilt.

type List

type List struct {
	x.SafeMutex
	// contains filtered or unexported fields
}

List stores the in-memory representation of a posting list.

func GetNew

func GetNew(key []byte, pstore *badger.DB, readTs uint64, readUids bool) (*List, error)

func GetNoStore

func GetNoStore(key []byte, readTs uint64) (rlist *List, err error)

GetNoStore returns the list stored in the key or creates a new one if it doesn't exist. It does not store the list in any cache.

func NewList

func NewList(key []byte, plist *pb.PostingList, minTs uint64) *List

NewList returns a new list with an immutable layer set to plist and the timestamp of the immutable layer set to minTs.

func ReadPostingList

func ReadPostingList(key []byte, it *badger.Iterator) (*List, error)

ReadPostingList constructs the posting list from the disk using the passed iterator. Use forward iterator with allversions enabled in iter options. key would now be owned by the posting list. So, ensure that it isn't reused elsewhere.

func (*List) AddMutationWithIndex

func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error

AddMutationWithIndex is addMutation with support for indexing. It also supports reverse edges.

func (*List) AllUntaggedValues

func (l *List) AllUntaggedValues(readTs uint64) ([]types.Val, error)

AllUntaggedValues returns all the values in the posting list with no language tag.

func (*List) AllValues

func (l *List) AllValues(readTs uint64) ([]types.Val, error)

AllValues returns all the values in the posting list.

func (*List) ApproxLen

func (l *List) ApproxLen() int

ApproxLen returns an approximate count of the UIDs in the posting list.

func (*List) DeepSize

func (l *List) DeepSize() uint64

DeepSize computes the memory taken by a Posting List

func (*List) Facets

func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string,
	listType bool) ([]*pb.Facets, error)

Facets gives facets for the posting representing value.

func (*List) FindPosting

func (l *List) FindPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posting, err error)

func (*List) GetLangTags

func (l *List) GetLangTags(readTs uint64) ([]string, error)

GetLangTags finds the language tags of each posting in the list.

func (*List) GetLength

func (l *List) GetLength(readTs uint64) int

func (*List) GetPosting

func (l *List) GetPosting(startTs uint64) *pb.PostingList

func (*List) IsEmpty

func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error)

IsEmpty returns true if there are no uids at the given timestamp after the given UID.

func (*List) Iterate

func (l *List) Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error

Iterate will allow you to iterate over the mutable and immutable layers of this posting List, while having acquired a read lock. So, please keep this iteration cheap, otherwise mutations would get stuck. The iteration will start after the provided UID. The results would not include this uid. The function will loop until either the posting List is fully iterated, or you return a false in the provided function, which will indicate to the function to break out of the iteration.

	pl.Iterate(..., func(p *pb.posting) error {
   // Use posting p
   return nil // to continue iteration.
   return errStopIteration // to break iteration.
 })

func (*List) Length

func (l *List) Length(readTs, afterUid uint64) int

Length iterates over the mutation layer and counts number of elements.

func (*List) PartSplits

func (l *List) PartSplits() []uint64

PartSplits returns an empty array if the list has not been split into multiple parts. Otherwise, it returns an array containing the start UID of each part.

func (*List) PostingFor

func (l *List) PostingFor(readTs uint64, langs []string) (p *pb.Posting, rerr error)

PostingFor returns the posting according to the preferred language list.

func (*List) Postings

func (l *List) Postings(opt ListOptions, postFn func(*pb.Posting) error) error

Postings calls postFn with the postings that are common with UIDs in the opt ListOptions.

func (*List) Rollup

func (l *List) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error)

Rollup performs the rollup process, merging the immutable and mutable layers and outputting the resulting list so it can be written to disk. During this process, the list might be split into multiple lists if the main list or any of the existing parts become too big.

A normal list has the following format: <key> -> <posting list with all the data for this list>

A multi-part list is stored in multiple keys. The keys for the parts will be generated by appending the first UID in the part to the key. The list will have the following format: <key> -> <posting list that includes no postings but a list of each part's start UID> <key, 1> -> <first part of the list with all the data for this part> <key, next start UID> -> <second part of the list with all the data for this part> ... <key, last start UID> -> <last part of the list with all its data>

The first part of a multi-part list always has start UID 1 and will be the last part to be deleted, at which point the entire list will be marked for deletion. As the list grows, existing parts might be split if they become too big.

You can provide a readTs for Rollup. This should either be math.MaxUint64, or it should be a timestamp that was resevered for rollup. This would ensure that we read only till that time. If read ts is provided, Once the rollup is done, we check the maximum timestamp. We store the results at that max timestamp + 1. This mechanism allows us to make sure that

  • Since we write at max timestamp + 1, we can side step any issues that arise by wal replay.

  • Earlier one of the solution was to write at ts + 1. It didn't work as index transactions don't conflict so they can get committed at consecutive timestamps. This leads to some data being overwriten by rollup.

  • No other transcation happens at readTs. This way we can be sure that we won't overwrite any transaction that happened.

  • Latest data. We wait until readTs - 1, so that we know that we are reading the latest data. If we read stale data, it can cause to delete some old transactions.

  • Even though we have reserved readTs for rollup, we don't store the data there. This is done so that the rollup is written as close as possible to actual data. This can cause issues if someone is reading data between two timestamps.

  • Drop operation can cause issues if they are rolled up. Since we are storing results at ts + 1, in dgraph.drop.op. When we do drop op, we delete the relevant data first using a mutation. Then we write a record into the dgraph.drop.op. We use this record to figure out if a drop was performed. This helps us during backup, when we want to know if we need to read a given backup or not. A backup, which has a drop record, would render older backups unnecessary.

    If we rollup the dgraph.drop.op, and store result on ts + 1, it effectively copies the original record into a new location. We want to see if there can be any issues in backup/restore due to this. To ensure that there is no issue in writing on ts + 1, we do the following analysis.

    Analysis is done for drop op, but it would be the same for drop predicate and namespace. Assume that there were two backups, at b1 and b2. We move rollup ts around to see if it can cause any issues. There can be 3 cases:

    1. b1 < ts < b2. In this case, we would have a drop record in b2. This is the same behaviour as we would have writen on ts.

    2. b1 = ts < b2. In this case, we would have a drop record in b1, and in b2. Originally, only b1 would have a drop record. With this new approach, b2 would also have a drop record. This is okay because last entry in b1 is drop, so it wouldn't have any data to be applied.

    3. b1 < ts < ts + 1 = b2. In this case, we would have both drop drop records in b2. No issues in this case.

    This proves that writing rollups at ts + 1 would not cause any issues with dgraph.drop.op. The only issue would come if a rollup happens at ts + k. If a backup happens in between ts and ts + k, it could lead to some data being dropped during restore.

func (*List) SetTs

func (l *List) SetTs(readTs uint64)

SetTs allows us to set the transaction timestamp in mutation map. Should be used before the posting list is passed on to the functions that would read the data.

func (*List) StaticValue

func (l *List) StaticValue(readTs uint64) (*pb.PostingList, error)

func (*List) ToBackupPostingList

func (l *List) ToBackupPostingList(
	bl *pb.BackupPostingList, alloc *z.Allocator, buf *z.Buffer) (*bpb.KV, error)

ToBackupPostingList uses rollup to generate a single list with no splits. It's used during backup so that each backed up posting list is stored in a single key.

func (*List) Uids

func (l *List) Uids(opt ListOptions) (*pb.List, error)

Uids returns the UIDs given some query params. We have to apply the filtering before applying (offset, count). WARNING: Calling this function just to get UIDs is expensive

func (*List) Value

func (l *List) Value(readTs uint64) (rval types.Val, rerr error)

Value returns the default value from the posting list. The default value is defined as the value without a language tag. Value cannot be used to read from cache

func (*List) ValueFor

func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error)

ValueFor returns a value from posting list, according to preferred language list. If list is empty, value without language is returned; if such value is not available, value with smallest UID is returned. If list consists of one or more languages, first available value is returned. If no language from the list matches the values, processing is the same as for empty list.

func (*List) ValueForTag

func (l *List) ValueForTag(readTs uint64, tag string) (rval types.Val, rerr error)

ValueForTag returns the value in the posting list with the given language tag.

func (*List) ValueWithLockHeld

func (l *List) ValueWithLockHeld(readTs uint64) (rval types.Val, rerr error)

type ListOptions

type ListOptions struct {
	ReadTs    uint64
	AfterUid  uint64   // Any UIDs returned must be after this value.
	Intersect *pb.List // Intersect results with this list of UIDs.
	First     int
}

ListOptions is used in List.Uids (in posting) to customize our output list of UIDs, for each posting list. It should be pb.to this package.

type LocalCache

type LocalCache struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalCache stores a cache of posting lists and deltas. This doesn't sync, so call this only when you don't care about dirty posting lists in memory(for example before populating snapshot) or after calling syncAllMarks

func NewLocalCache

func NewLocalCache(startTs uint64) *LocalCache

NewLocalCache returns a new LocalCache instance.

func NoCache

func NoCache(startTs uint64) *LocalCache

NoCache returns a new LocalCache instance, which won't cache anything. Useful to pass startTs around.

func (*LocalCache) Find

func (lc *LocalCache) Find(pred []byte, filter func([]byte) bool) (uint64, error)

func (*LocalCache) Get

func (lc *LocalCache) Get(key []byte) (*List, error)

Get retrieves the cached version of the list associated with the given key.

func (*LocalCache) GetFromDelta

func (lc *LocalCache) GetFromDelta(key []byte) (*List, error)

GetFromDelta gets the cached version of the list without reading from disk and only applies the existing deltas. This is used in situations where the posting list will only be modified and not read (e.g adding index mutations).

func (*LocalCache) GetSinglePosting

func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error)

GetSinglePosting retrieves the cached version of the first item in the list associated with the given key. This is used for retrieving the value of a scalar predicats.

func (*LocalCache) GetUids

func (lc *LocalCache) GetUids(key []byte) (*List, error)

func (*LocalCache) SetIfAbsent

func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List

SetIfAbsent adds the list for the specified key to the cache. If a list for the same key already exists, the cache will not be modified and the existing list will be returned instead. This behavior is meant to prevent the goroutines using the cache from ending up with an orphaned version of a list.

func (*LocalCache) UpdateCommitTs

func (lc *LocalCache) UpdateCommitTs(commitTs uint64)

func (*LocalCache) UpdateDeltasAndDiscardLists

func (lc *LocalCache) UpdateDeltasAndDiscardLists()

UpdateDeltasAndDiscardLists updates the delta cache before removing the stored posting lists.

type MemoryLayer

type MemoryLayer struct {
	// contains filtered or unexported fields
}

func (*MemoryLayer) ReadData

func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64, readUids bool) (*List, error)

type MutableLayer

type MutableLayer struct {
	// contains filtered or unexported fields
}

MutableLayer is the structure that will store mutable layer of the posting list. Every posting list has an immutable layer and a mutable layer. Whenever posting is added into a list, it's added as deltas into the posting list. Once this list of deltas keep piling up, they are converted into a complete posting list through rollup and stored as immutable layer. Mutable layer contains all the deltas after the last complete posting list. Mutable Layer used to be a map from commitTs to PostingList. Every transaction that starts, gets its own copy of a posting list that it stores in the local cache of the txn. Everytime we make a copy of the postling list, we had to deep clone the map. If we give the same map by reference we start seeing concurrent writes and reads into the map causing issues. With this new MutableLayer struct, we know that committedEntries will not get changed and this can be copied by reference without any issues. This structure, makes it much faster to clone the Mutable Layer and be faster.

type Options

type Options struct {
	sync.Mutex

	CommitFraction float64
}

Options contains options for the postings package.

var Config Options

Config stores the posting options of this instance.

type StatContainer

type StatContainer interface {
	InsertRecord([]byte, uint64)
	Estimate([]byte) uint64
}

type StatsHolder

type StatsHolder struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func GetStatsHolder

func GetStatsHolder() *StatsHolder

func NewStatsHolder

func NewStatsHolder() *StatsHolder

func (*StatsHolder) InsertRecord

func (sh *StatsHolder) InsertRecord(pred string, key []byte, count uint64)

func (*StatsHolder) ProcessEqPredicate

func (sh *StatsHolder) ProcessEqPredicate(pred string, key []byte) uint64

type Txn

type Txn struct {
	StartTs uint64

	// Fields which can changed after init
	sync.Mutex
	// contains filtered or unexported fields
}

Txn represents a transaction.

func NewTxn

func NewTxn(startTs uint64) *Txn

NewTxn returns a new Txn instance.

func (*Txn) CommitToDisk

func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error

CommitToDisk commits a transaction to disk. This function only stores deltas to the commit timestamps. It does not try to generate a state. State generation is done via rollups, which happen when a snapshot is created. Don't call this for schema mutations. Directly commit them.

func (*Txn) FillContext

func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32, isErrored bool)

FillContext updates the given transaction context with data from this transaction.

func (*Txn) Get

func (txn *Txn) Get(key []byte) (*List, error)

Get retrieves the posting list for the given list from the local cache.

func (*Txn) GetFromDelta

func (txn *Txn) GetFromDelta(key []byte) (*List, error)

GetFromDelta retrieves the posting list from delta cache, not from Badger.

func (*Txn) GetScalarList

func (txn *Txn) GetScalarList(key []byte) (*List, error)

func (*Txn) ShouldAbort

func (txn *Txn) ShouldAbort() bool

ShouldAbort returns whether the transaction should be aborted.

func (*Txn) Store

func (txn *Txn) Store(pl *List) *List

Store is used by tests.

func (*Txn) Update

func (txn *Txn) Update()

Update calls UpdateDeltasAndDiscardLists on the local cache.

func (*Txn) UpdateCachedKeys

func (txn *Txn) UpdateCachedKeys(commitTs uint64)

RemoveCachedKeys will delete the cached list by this txn.

type TxnWriter

type TxnWriter struct {
	// contains filtered or unexported fields
}

TxnWriter is in charge or writing transactions to badger.

func NewTxnWriter

func NewTxnWriter(db *badger.DB) *TxnWriter

NewTxnWriter returns a new TxnWriter instance.

func (*TxnWriter) Flush

func (w *TxnWriter) Flush() error

Flush waits until all operations are done and all data is written to disk.

func (*TxnWriter) SetAt

func (w *TxnWriter) SetAt(key, val []byte, meta byte, ts uint64) error

SetAt writes a key-value pair at the given timestamp.

func (*TxnWriter) Wait

func (w *TxnWriter) Wait() error

func (*TxnWriter) Write

func (w *TxnWriter) Write(kvs *pb.KVList) error

Write stores the given key-value pairs in badger.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL