frac

package
v0.61.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: Apache-2.0 Imports: 46 Imported by: 0

Documentation

Index

Constants

View Source
const DistributionBucket = time.Minute
View Source
const DistributionMaxInterval = 24 * time.Hour
View Source
const DistributionSpreadThreshold = 10 * time.Minute

Variables

This section is empty.

Functions

func ExtractDocTime

func ExtractDocTime(docRoot *insaneJSON.Root) (time.Time, []string)

ExtractDocTime extracts timestamp from doc It searches by one of supported field name and parses by supported formats If no field was found or not parsable it returns time.Now()

func IsItBinaryEncodedMetaData

func IsItBinaryEncodedMetaData(b []byte) bool

func PutDocMetasCompressor

func PutDocMetasCompressor(c *DocsMetasCompressor)

Types

type Active

type Active struct {
	Config *Config

	BaseFileName string

	MIDs *UInt64s
	RIDs *UInt64s

	DocBlocks *UInt64s

	TokenList *TokenList

	DocsPositions *DocsPositions
	// contains filtered or unexported fields
}

func NewActive

func NewActive(
	baseFileName string,
	activeIndexer *ActiveIndexer,
	readLimiter *storage.ReadLimiter,
	docsCache *cache.Cache[[]byte],
	sortCache *cache.Cache[[]byte],
	cfg *Config,
) *Active

func (*Active) Append

func (f *Active) Append(docs, metas []byte, wg *sync.WaitGroup) (err error)

Append causes data to be written on disk and sends metas to index workers

func (*Active) AppendIDs

func (f *Active) AppendIDs(ids []seq.ID) []uint32

func (*Active) Contains

func (f *Active) Contains(id seq.MID) bool

func (*Active) DataProvider

func (f *Active) DataProvider(ctx context.Context) (DataProvider, func())

func (*Active) GetAllDocuments

func (f *Active) GetAllDocuments() []uint32

func (*Active) Info

func (f *Active) Info() *Info

func (*Active) IsIntersecting

func (f *Active) IsIntersecting(from, to seq.MID) bool

func (*Active) Offload added in v0.60.0

func (f *Active) Offload(context.Context, storage.Uploader) (bool, error)

Offload for Active fraction is no-op.

Since search within Active fraction is too costly (we have to replay the whole index in memory), we decided to support offloading only for Sealed fractions.

func (*Active) Release

func (f *Active) Release()

func (*Active) Replay

func (f *Active) Replay(ctx context.Context) error

func (*Active) String

func (f *Active) String() string

func (*Active) Suicide

func (f *Active) Suicide()

func (*Active) UpdateStats

func (f *Active) UpdateStats(minMID, maxMID seq.MID, docCount uint32, sizeCount uint64)

type ActiveIndexer

type ActiveIndexer struct {
	// contains filtered or unexported fields
}

func NewActiveIndexer

func NewActiveIndexer(workerCount, chLen int) *ActiveIndexer

func (*ActiveIndexer) Index

func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch)

func (*ActiveIndexer) Start

func (ai *ActiveIndexer) Start()

func (*ActiveIndexer) Stop

func (ai *ActiveIndexer) Stop()

type ActiveWriter

type ActiveWriter struct {
	// contains filtered or unexported fields
}

func NewActiveWriter

func NewActiveWriter(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter

func (*ActiveWriter) Stop

func (a *ActiveWriter) Stop()

func (*ActiveWriter) Write

func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error

type AggLimits

type AggLimits struct {
	MaxFieldTokens     int // MaxFieldTokens max AggQuery.Field uniq values to parse.
	MaxGroupTokens     int // MaxGroupTokens max AggQuery.GroupBy unique values.
	MaxTIDsPerFraction int // MaxTIDsPerFraction max number of tokens per fraction.
}

type BlockInfo

type BlockInfo struct {
	Info *Info
}

func (*BlockInfo) Pack

func (b *BlockInfo) Pack(buf []byte) []byte

func (*BlockInfo) Unpack

func (b *BlockInfo) Unpack(data []byte) error

type BlockOffsets added in v0.59.0

type BlockOffsets struct {
	IDsTotal uint32 // todo: the best place for this field is Info block
	Offsets  []uint64
}

func (*BlockOffsets) Pack added in v0.59.0

func (b *BlockOffsets) Pack(buf []byte) []byte

func (*BlockOffsets) Unpack added in v0.59.0

func (b *BlockOffsets) Unpack(data []byte) error

type Config

type Config struct {
	Search SearchConfig

	SkipSortDocs bool
	KeepMetaFile bool
}

type DataProvider

type DataProvider interface {
	Fetch([]seq.ID) ([][]byte, error)
	Search(processor.SearchParams) (*seq.QPR, error)
}

type DiskBlocksProducer

type DiskBlocksProducer struct {
	// contains filtered or unexported fields
}

func NewDiskBlocksProducer

func NewDiskBlocksProducer() *DiskBlocksProducer

type DiskBlocksWriter

type DiskBlocksWriter struct {
	// contains filtered or unexported fields
}

func NewSealedBlockWriter

func NewSealedBlockWriter(ws io.WriteSeeker) *DiskBlocksWriter

func (*DiskBlocksWriter) NewBlockFormer

func (w *DiskBlocksWriter) NewBlockFormer(name string, size int) *storage.BlockFormer

func (*DiskBlocksWriter) WriteRegistryBlock

func (w *DiskBlocksWriter) WriteRegistryBlock() error

type DocProvider

type DocProvider struct {
	DocCount int
	Docs     []byte
	Metas    []byte
	// contains filtered or unexported fields
}

func NewDocProvider

func NewDocProvider() *DocProvider

func (*DocProvider) Append

func (dp *DocProvider) Append(doc []byte, docRoot *insaneJSON.Root, id seq.ID, tokens []seq.Token)

func (*DocProvider) Provide

func (dp *DocProvider) Provide() (storage.DocBlock, storage.DocBlock)

func (*DocProvider) TryReset

func (dp *DocProvider) TryReset()

type DocsMetasCompressor

type DocsMetasCompressor struct {
	// contains filtered or unexported fields
}

func GetDocsMetasCompressor

func GetDocsMetasCompressor(docsCompressLevel, metaCompressLevel int) *DocsMetasCompressor

func (*DocsMetasCompressor) CompressDocsAndMetas

func (c *DocsMetasCompressor) CompressDocsAndMetas(docs, meta []byte)

CompressDocsAndMetas prepare docs and meta blocks for bulk insert.

func (*DocsMetasCompressor) DocsMetas

func (c *DocsMetasCompressor) DocsMetas() ([]byte, []byte)

type DocsPositions

type DocsPositions struct {
	// contains filtered or unexported fields
}

func NewSyncDocsPositions

func NewSyncDocsPositions() *DocsPositions

func (*DocsPositions) Get

func (dp *DocsPositions) Get(id seq.ID) seq.DocPos

func (*DocsPositions) GetSync

func (dp *DocsPositions) GetSync(id seq.ID) seq.DocPos

func (*DocsPositions) SetMultiple

func (dp *DocsPositions) SetMultiple(ids []seq.ID, pos []seq.DocPos) []seq.ID

SetMultiple returns a slice of added ids

type EmptyDataProvider

type EmptyDataProvider struct{}

func (EmptyDataProvider) Fetch

func (EmptyDataProvider) Fetch([]seq.ID) ([][]byte, error)

func (EmptyDataProvider) Search

func (EmptyDataProvider) Search(params processor.SearchParams) (*seq.QPR, error)

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter optimizes sequential writing and fsync calls for concurrent writers.

The write offset is calculated strictly sequentially using atomic. After that, a request for fsync is sent. The request waits if fsync is being performed from previous requests. During this wait, other fsync requests may arrive that are also waiting for the previous one to complete. After that, a new fsync is performed, after which all requests receive a response about the successful (or unsuccessful) fsync.

This results in one fsync system call for several writers performing a write at approximately the same time.

func NewFileWriter

func NewFileWriter(ws writeSyncer, offset int64, skipSync bool) *FileWriter

func (*FileWriter) Stop

func (fs *FileWriter) Stop()

func (*FileWriter) Write

func (fs *FileWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error)

type Fraction

type Fraction interface {
	Info() *Info
	IsIntersecting(from seq.MID, to seq.MID) bool
	Contains(mid seq.MID) bool
	DataProvider(context.Context) (DataProvider, func())
	Offload(ctx context.Context, u storage.Uploader) (bool, error)
	Suicide()
}

type IndexCache

type IndexCache struct {
	Registry   *cache.Cache[[]byte]
	MIDs       *cache.Cache[[]byte]
	RIDs       *cache.Cache[[]byte]
	Params     *cache.Cache[seqids.BlockParams]
	Tokens     *cache.Cache[*token.Block]
	TokenTable *cache.Cache[token.Table]
	LIDs       *cache.Cache[*lids.Block]
}

func (*IndexCache) Release

func (s *IndexCache) Release()

type Info

type Info struct {
	Path          string                   `json:"name"`
	Ver           string                   `json:"ver"`
	BinaryDataVer config.BinaryDataVersion `json:"binary_data_ver"`
	DocsTotal     uint32                   `json:"docs_total"`
	DocsOnDisk    uint64                   `json:"docs_on_disk"`  // how much compressed docs data is stored on disk
	DocsRaw       uint64                   `json:"docs_raw"`      // how much raw docs data is appended
	MetaOnDisk    uint64                   `json:"meta_on_disk"`  // how much compressed metadata is stored on disk
	IndexOnDisk   uint64                   `json:"index_on_disk"` // how much compressed index data is stored on disk

	ConstRegularBlockSize int `json:"const_regular_block_size"`
	ConstIDsPerBlock      int `json:"const_ids_per_block"`
	ConstLIDBlockCap      int `json:"const_lid_block_cap"`

	From         seq.MID               `json:"from"`
	To           seq.MID               `json:"to"`
	CreationTime uint64                `json:"creation_time"`
	SealingTime  uint64                `json:"sealing_time"`
	Distribution *seq.MIDsDistribution `json:"distribution"`
}

func NewInfo

func NewInfo(filename string, docsOnDisk, metaOnDisk uint64) *Info

func (*Info) BuildDistribution

func (s *Info) BuildDistribution(ids []seq.ID)

func (*Info) FullSize

func (s *Info) FullSize() uint64

func (*Info) InitEmptyDistribution

func (s *Info) InitEmptyDistribution() bool

func (*Info) IsIntersecting

func (s *Info) IsIntersecting(from, to seq.MID) bool

func (*Info) Name

func (s *Info) Name() string

func (*Info) String

func (s *Info) String() string

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

func (*Loader) Load

func (l *Loader) Load(state *sealedState, info *Info, indexReader *storage.IndexReader)

type MetaData

type MetaData struct {
	ID seq.ID
	// Size of an uncompressed document in bytes.
	Size   uint32
	Tokens []MetaToken
}

func (*MetaData) MarshalBinaryTo

func (m *MetaData) MarshalBinaryTo(b []byte) []byte

func (MetaData) String

func (m MetaData) String() string

String used in tests for human-readable output.

func (*MetaData) UnmarshalBinary

func (m *MetaData) UnmarshalBinary(b []byte) error

type MetaToken

type MetaToken struct {
	Key   []byte
	Value []byte
}

func (*MetaToken) MarshalBinaryTo

func (m *MetaToken) MarshalBinaryTo(b []byte) []byte

func (MetaToken) String

func (m MetaToken) String() string

String used in tests for human-readable output.

func (*MetaToken) UnmarshalBinary

func (m *MetaToken) UnmarshalBinary(b []byte) ([]byte, error)

type PSD

type PSD int // emulates hard shutdown on different stages of fraction deletion, used for tests
const (
	Off PSD = iota
	HalfRename
	HalfRemove
)

type PreloadedData

type PreloadedData struct {
	// contains filtered or unexported fields
}

func Seal

func Seal(f *Active, params SealParams) (*PreloadedData, error)

type Remote added in v0.60.0

type Remote struct {
	Config *Config

	BaseFileName string
	// contains filtered or unexported fields
}

Remote fraction is a fraction that is backed by remote storage.

Structure of Remote fraction is almost identical to the Sealed one. In fact, they share the same on-disk binary layout, access methods and any other logic, but having Remote fraction allows us to easily distinguish between local and remote fractions.

func NewRemote added in v0.60.0

func NewRemote(
	ctx context.Context,
	baseFile string,
	readLimiter *storage.ReadLimiter,
	indexCache *IndexCache,
	docsCache *cache.Cache[[]byte],
	info *Info,
	config *Config,
	s3cli *s3.Client,
) *Remote

func (*Remote) Contains added in v0.60.0

func (f *Remote) Contains(mid seq.MID) bool

func (*Remote) DataProvider added in v0.60.0

func (f *Remote) DataProvider(ctx context.Context) (DataProvider, func())

func (*Remote) Info added in v0.60.0

func (f *Remote) Info() *Info

func (*Remote) IsIntersecting added in v0.60.0

func (f *Remote) IsIntersecting(from, to seq.MID) bool

func (*Remote) Offload added in v0.60.0

func (f *Remote) Offload(context.Context, storage.Uploader) (bool, error)

func (*Remote) String added in v0.60.0

func (f *Remote) String() string

func (*Remote) Suicide added in v0.60.0

func (f *Remote) Suicide()

type SealParams

type SealParams struct {
	IDsZstdLevel           int
	LIDsZstdLevel          int
	TokenListZstdLevel     int
	DocsPositionsZstdLevel int
	TokenTableZstdLevel    int

	DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block.
	DocBlockSize       int // DocBlockSize is decompressed payload size of document block.
}

type Sealed

type Sealed struct {
	Config *Config

	BaseFileName string

	// shit for testing
	PartialSuicideMode PSD
	// contains filtered or unexported fields
}

func NewSealed

func NewSealed(
	baseFile string,
	readLimiter *storage.ReadLimiter,
	indexCache *IndexCache,
	docsCache *cache.Cache[[]byte],
	info *Info,
	config *Config,
) *Sealed

func NewSealedPreloaded

func NewSealedPreloaded(
	baseFile string,
	preloaded *PreloadedData,
	rl *storage.ReadLimiter,
	indexCache *IndexCache,
	docsCache *cache.Cache[[]byte],
	config *Config,
) *Sealed

func (*Sealed) Contains

func (f *Sealed) Contains(id seq.MID) bool

func (*Sealed) DataProvider

func (f *Sealed) DataProvider(ctx context.Context) (DataProvider, func())

func (*Sealed) Info

func (f *Sealed) Info() *Info

func (*Sealed) IsIntersecting

func (f *Sealed) IsIntersecting(from, to seq.MID) bool

func (*Sealed) Offload added in v0.60.0

func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error)

Offload saves `.docs` (or `.sdocs`) and `.index` files into remote storage. It does not free any of the occupied memory (nor on disk nor in memory).

func (*Sealed) String

func (f *Sealed) String() string

func (*Sealed) Suicide

func (f *Sealed) Suicide()

type SearchConfig

type SearchConfig struct {
	AggLimits AggLimits
}

type SeqIDCmp

type SeqIDCmp struct {
	// contains filtered or unexported fields
}

type TokenLIDs

type TokenLIDs struct {
	// contains filtered or unexported fields
}

func (*TokenLIDs) GetLIDs

func (tl *TokenLIDs) GetLIDs(mids, rids *UInt64s) []uint32

func (*TokenLIDs) PutLIDsInQueue

func (tl *TokenLIDs) PutLIDsInQueue(lids []uint32) int

type TokenList

type TokenList struct {
	FieldTIDs map[string][]uint32
	// contains filtered or unexported fields
}

func NewActiveTokenList

func NewActiveTokenList(workers int) *TokenList

func (*TokenList) Append

func (tl *TokenList) Append(tokens [][]byte, fieldsLengths []int, tokenLIDsPlaces []*TokenLIDs) []*TokenLIDs

func (*TokenList) FindPattern

func (tl *TokenList) FindPattern(ctx context.Context, t parser.Token, tids []uint32) ([]uint32, error)

func (*TokenList) GetAllTokenLIDs

func (tl *TokenList) GetAllTokenLIDs() *TokenLIDs

func (*TokenList) GetFieldSizes

func (tl *TokenList) GetFieldSizes() map[string]uint32

func (*TokenList) GetTIDsByField

func (tl *TokenList) GetTIDsByField(f string) []uint32

func (*TokenList) GetValByTID

func (tl *TokenList) GetValByTID(tid uint32) []byte

func (*TokenList) Provide

func (tl *TokenList) Provide(tid uint32) *TokenLIDs

func (*TokenList) Stop

func (tl *TokenList) Stop()

type UInt64s

type UInt64s struct {
	// contains filtered or unexported fields
}

func NewIDs

func NewIDs() *UInt64s

func (*UInt64s) Append

func (l *UInt64s) Append(val uint64) uint32

func (*UInt64s) GetVals

func (l *UInt64s) GetVals() []uint64

func (*UInt64s) Len

func (l *UInt64s) Len() uint32

Directories

Path Synopsis
sealed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL