frac

package
v0.63.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2025 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Active

type Active struct {
	Config *Config

	BaseFileName string

	MIDs *UInt64s
	RIDs *UInt64s

	DocBlocks *UInt64s

	TokenList *TokenList

	DocsPositions *DocsPositions
	// contains filtered or unexported fields
}

func NewActive

func NewActive(
	baseFileName string,
	activeIndexer *ActiveIndexer,
	readLimiter *storage.ReadLimiter,
	docsCache *cache.Cache[[]byte],
	sortCache *cache.Cache[[]byte],
	cfg *Config,
) *Active

func (*Active) Append

func (f *Active) Append(docs, metas []byte, wg *sync.WaitGroup) (err error)

Append causes data to be written on disk and sends metas to index workers

func (*Active) AppendIDs

func (f *Active) AppendIDs(ids []seq.ID) []uint32

func (*Active) Contains

func (f *Active) Contains(id seq.MID) bool

func (*Active) DataProvider

func (f *Active) DataProvider(ctx context.Context) (*activeDataProvider, func())

func (*Active) Fetch added in v0.62.1

func (f *Active) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error)

func (*Active) GetAllDocuments

func (f *Active) GetAllDocuments() []uint32

func (*Active) Info

func (f *Active) Info() *common.Info

func (*Active) IsIntersecting

func (f *Active) IsIntersecting(from, to seq.MID) bool

func (*Active) Offload added in v0.60.0

func (f *Active) Offload(context.Context, storage.Uploader) (bool, error)

Offload for Active fraction is no-op.

Since search within Active fraction is too costly (we have to replay the whole index in memory), we decided to support offloading only for Sealed fractions.

func (*Active) Release

func (f *Active) Release()

func (*Active) Replay

func (f *Active) Replay(ctx context.Context) error

func (*Active) Search added in v0.62.1

func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error)

func (*Active) String

func (f *Active) String() string

func (*Active) Suicide

func (f *Active) Suicide()

func (*Active) UpdateStats

func (f *Active) UpdateStats(minMID, maxMID seq.MID, docCount uint32, sizeCount uint64)

type ActiveIndexer

type ActiveIndexer struct {
	// contains filtered or unexported fields
}

func NewActiveIndexer

func NewActiveIndexer(workerCount, chLen int) *ActiveIndexer

func (*ActiveIndexer) Index

func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch)

func (*ActiveIndexer) Start

func (ai *ActiveIndexer) Start()

func (*ActiveIndexer) Stop

func (ai *ActiveIndexer) Stop()

type ActiveSealingSource added in v0.62.0

type ActiveSealingSource struct {
	// contains filtered or unexported fields
}

ActiveSealingSource transforms data from in-memory (frac.Active) storage into a format suitable for disk writing during index creation.

The main purpose of this type is to provide access to sorted data through a set of iterators that allow sequential processing of data in sized blocks for disk writing:

  • TokenBlocks() - iterator for token blocks, sorted by fields and values
  • Fields() - iterator for sorted fields with maximum TIDs
  • IDsBlocks() - iterator for document ID blocks and their positions
  • TokenLIDs() - iterator for LID lists for each token
  • Docs() - iterator for documents themselves with duplicate handling

All iterators work with pre-sorted data and return information in an order optimal for creating disk index structures.

func NewActiveSealingSource added in v0.62.0

func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSealingSource, error)

NewActiveSealingSource creates a new data source for sealing based on an active in-memory index.

func (*ActiveSealingSource) BlocksOffsets added in v0.62.0

func (src *ActiveSealingSource) BlocksOffsets() []uint64

BlocksOffsets returns document block offsets.

func (*ActiveSealingSource) Docs added in v0.62.0

func (src *ActiveSealingSource) Docs() iter.Seq2[seq.ID, []byte]

Docs returns an iterator for documents with their IDs. Handles duplicate IDs (for nested indexes).

func (*ActiveSealingSource) Fields added in v0.62.0

func (src *ActiveSealingSource) Fields() iter.Seq2[string, uint32]

Fields returns an iterator for sorted fields and their maximum TIDs. Fields are sorted lexicographically, ensuring predictable order when building disk index structures.

func (*ActiveSealingSource) IDsBlocks added in v0.62.0

func (src *ActiveSealingSource) IDsBlocks(blockSize int) iter.Seq2[[]seq.ID, []seq.DocPos]

IDsBlocks returns an iterator for document ID blocks and corresponding positions. IDs are sorted. Block size is controlled by blockSize parameter for balance between performance and memory usage.

func (*ActiveSealingSource) Info added in v0.62.0

func (src *ActiveSealingSource) Info() *common.Info

Info returns index metadata information.

func (*ActiveSealingSource) LastError added in v0.62.0

func (src *ActiveSealingSource) LastError() error

LastError returns the last error that occurred during processing.

func (*ActiveSealingSource) SortDocs added in v0.62.0

func (src *ActiveSealingSource) SortDocs() error

SortDocs sorts documents and writes them in compressed form to disk. Creates a temporary file that is then renamed to the final one.

func (*ActiveSealingSource) TokenBlocks added in v0.62.0

func (src *ActiveSealingSource) TokenBlocks(blockSize int) iter.Seq[[][]byte]

TokenBlocks returns an iterator for token blocks for disk writing. Tokens are pre-sorted: first by fields, then lexicographically within each field. Each block contains up to blockSize bytes of data for efficient writing.

func (*ActiveSealingSource) TokenLIDs added in v0.62.0

func (src *ActiveSealingSource) TokenLIDs() iter.Seq[[]uint32]

TokenLIDs returns an iterator for LID lists for each token. LIDs are converted to new numbering after document sorting. Each iterator call returns a list of documents containing a specific token, in sorted order.

type ActiveWriter

type ActiveWriter struct {
	// contains filtered or unexported fields
}

func NewActiveWriter

func NewActiveWriter(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter

func (*ActiveWriter) Stop

func (a *ActiveWriter) Stop()

func (*ActiveWriter) Write

func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error

type AggLimits

type AggLimits struct {
	MaxFieldTokens     int // MaxFieldTokens max AggQuery.Field uniq values to parse.
	MaxGroupTokens     int // MaxGroupTokens max AggQuery.GroupBy unique values.
	MaxTIDsPerFraction int // MaxTIDsPerFraction max number of tokens per fraction.
}

type Config

type Config struct {
	Search SearchConfig

	SkipSortDocs bool
	KeepMetaFile bool
}

type DocsPositions

type DocsPositions struct {
	// contains filtered or unexported fields
}

func NewSyncDocsPositions

func NewSyncDocsPositions() *DocsPositions

func (*DocsPositions) Get

func (dp *DocsPositions) Get(id seq.ID) seq.DocPos

func (*DocsPositions) GetSync

func (dp *DocsPositions) GetSync(id seq.ID) seq.DocPos

func (*DocsPositions) SetMultiple

func (dp *DocsPositions) SetMultiple(ids []seq.ID, pos []seq.DocPos) []seq.ID

SetMultiple returns a slice of added ids

type Empty added in v0.62.1

type Empty struct {
	// contains filtered or unexported fields
}

func (Empty) Contains added in v0.62.1

func (Empty) Contains(mid seq.MID) bool

func (Empty) Fetch added in v0.62.1

func (Empty) Fetch(context.Context, []seq.ID) ([][]byte, error)

func (Empty) Info added in v0.62.1

func (e Empty) Info() *common.Info

func (Empty) IsIntersecting added in v0.62.1

func (Empty) IsIntersecting(seq.MID, seq.MID) bool

func (Empty) Offload added in v0.62.1

func (Empty) Offload(ctx context.Context, u storage.Uploader) (bool, error)

func (Empty) Search added in v0.62.1

func (Empty) Search(_ context.Context, params processor.SearchParams) (*seq.QPR, error)

func (Empty) Suicide added in v0.62.1

func (Empty) Suicide()

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter optimizes sequential writing and fsync calls for concurrent writers.

The write offset is calculated strictly sequentially using atomic. After that, a request for fsync is sent. The request waits if fsync is being performed from previous requests. During this wait, other fsync requests may arrive that are also waiting for the previous one to complete. After that, a new fsync is performed, after which all requests receive a response about the successful (or unsuccessful) fsync.

This results in one fsync system call for several writers performing a write at approximately the same time.

func NewFileWriter

func NewFileWriter(ws writeSyncer, offset int64, skipSync bool) *FileWriter

func (*FileWriter) Stop

func (fs *FileWriter) Stop()

func (*FileWriter) Write

func (fs *FileWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error)

type Fraction

type Fraction interface {
	Info() *common.Info
	IsIntersecting(from seq.MID, to seq.MID) bool
	Contains(mid seq.MID) bool
	Fetch(context.Context, []seq.ID) ([][]byte, error)
	Search(context.Context, processor.SearchParams) (*seq.QPR, error)
	Offload(ctx context.Context, u storage.Uploader) (bool, error)
	Suicide()
}
var EmptyFraction Fraction = Empty{
	// contains filtered or unexported fields
}

type IndexCache

type IndexCache struct {
	Registry   *cache.Cache[[]byte]
	MIDs       *cache.Cache[[]byte]
	RIDs       *cache.Cache[[]byte]
	Params     *cache.Cache[seqids.BlockParams]
	Tokens     *cache.Cache[*token.Block]
	TokenTable *cache.Cache[token.Table]
	LIDs       *cache.Cache[*lids.Block]
}

func (*IndexCache) Release

func (s *IndexCache) Release()

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

func (*Loader) Load

func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, indexReader *storage.IndexReader)

type PSD

type PSD int // emulates hard shutdown on different stages of fraction deletion, used for tests
const (
	Off PSD = iota
	HalfRename
	HalfRemove
)

type Remote added in v0.60.0

type Remote struct {
	Config *Config

	BaseFileName string
	// contains filtered or unexported fields
}

Remote fraction is a fraction that is backed by remote storage.

Structure of Remote fraction is almost identical to the Sealed one. In fact, they share the same on-disk binary layout, access methods and any other logic, but having Remote fraction allows us to easily distinguish between local and remote fractions.

func NewRemote added in v0.60.0

func NewRemote(
	ctx context.Context,
	baseFile string,
	readLimiter *storage.ReadLimiter,
	indexCache *IndexCache,
	docsCache *cache.Cache[[]byte],
	info *common.Info,
	config *Config,
	s3cli *s3.Client,
) *Remote

func (*Remote) Contains added in v0.60.0

func (f *Remote) Contains(mid seq.MID) bool

func (*Remote) DataProvider added in v0.60.0

func (f *Remote) DataProvider(ctx context.Context) (*sealedDataProvider, func())

func (*Remote) Fetch added in v0.62.1

func (f *Remote) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error)

func (*Remote) Info added in v0.60.0

func (f *Remote) Info() *common.Info

func (*Remote) IsIntersecting added in v0.60.0

func (f *Remote) IsIntersecting(from, to seq.MID) bool

func (*Remote) Offload added in v0.60.0

func (f *Remote) Offload(context.Context, storage.Uploader) (bool, error)

func (*Remote) Search added in v0.62.1

func (f *Remote) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error)

func (*Remote) String added in v0.60.0

func (f *Remote) String() string

func (*Remote) Suicide added in v0.60.0

func (f *Remote) Suicide()

type Sealed

type Sealed struct {
	Config *Config

	BaseFileName string

	// shit for testing
	PartialSuicideMode PSD
	// contains filtered or unexported fields
}

func NewSealed

func NewSealed(
	baseFile string,
	readLimiter *storage.ReadLimiter,
	indexCache *IndexCache,
	docsCache *cache.Cache[[]byte],
	info *common.Info,
	config *Config,
) *Sealed

func NewSealedPreloaded

func NewSealedPreloaded(
	baseFile string,
	preloaded *sealed.PreloadedData,
	rl *storage.ReadLimiter,
	indexCache *IndexCache,
	docsCache *cache.Cache[[]byte],
	config *Config,
) *Sealed

func (*Sealed) Contains

func (f *Sealed) Contains(id seq.MID) bool

func (*Sealed) DataProvider

func (f *Sealed) DataProvider(ctx context.Context) (*sealedDataProvider, func())

func (*Sealed) Fetch added in v0.62.1

func (f *Sealed) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error)

func (*Sealed) Info

func (f *Sealed) Info() *common.Info

func (*Sealed) IsIntersecting

func (f *Sealed) IsIntersecting(from, to seq.MID) bool

func (*Sealed) Offload added in v0.60.0

func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error)

Offload saves `.docs` (or `.sdocs`) and `.index` files into remote storage. It does not free any of the occupied memory (nor on disk nor in memory).

func (*Sealed) Search added in v0.62.1

func (f *Sealed) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error)

func (*Sealed) String

func (f *Sealed) String() string

func (*Sealed) Suicide

func (f *Sealed) Suicide()

type SearchConfig

type SearchConfig struct {
	AggLimits AggLimits
}

type SeqIDCmp

type SeqIDCmp struct {
	// contains filtered or unexported fields
}

type TokenLIDs

type TokenLIDs struct {
	// contains filtered or unexported fields
}

func (*TokenLIDs) GetLIDs

func (tl *TokenLIDs) GetLIDs(mids, rids *UInt64s) []uint32

func (*TokenLIDs) PutLIDsInQueue

func (tl *TokenLIDs) PutLIDsInQueue(lids []uint32) int

type TokenList

type TokenList struct {
	FieldTIDs map[string][]uint32
	// contains filtered or unexported fields
}

func NewActiveTokenList

func NewActiveTokenList(workers int) *TokenList

func (*TokenList) Append

func (tl *TokenList) Append(tokens [][]byte, fieldsLengths []int, tokenLIDsPlaces []*TokenLIDs) []*TokenLIDs

func (*TokenList) FindPattern

func (tl *TokenList) FindPattern(ctx context.Context, t parser.Token) ([]uint32, error)

func (*TokenList) GetAllTokenLIDs

func (tl *TokenList) GetAllTokenLIDs() *TokenLIDs

func (*TokenList) GetFieldSizes

func (tl *TokenList) GetFieldSizes() map[string]uint32

func (*TokenList) GetTIDsByField

func (tl *TokenList) GetTIDsByField(f string) []uint32

func (*TokenList) GetValByTID

func (tl *TokenList) GetValByTID(tid uint32) []byte

func (*TokenList) Provide

func (tl *TokenList) Provide(tid uint32) *TokenLIDs

func (*TokenList) Stop

func (tl *TokenList) Stop()

type UInt64s

type UInt64s struct {
	// contains filtered or unexported fields
}

func NewIDs

func NewIDs() *UInt64s

func (*UInt64s) Append

func (l *UInt64s) Append(val uint64) uint32

func (*UInt64s) GetVals

func (l *UInt64s) GetVals() []uint64

func (*UInt64s) Len

func (l *UInt64s) Len() uint32

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL