shelf

package
v1.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2026 License: Apache-2.0 Imports: 46 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RecordEncodeModeOneRow = 1
	RecordEncodeModeFull   = 2
)
View Source
const DefaultShardFreeDuration = 600 // Second

Variables

This section is empty.

Functions

func AllocWalSeq

func AllocWalSeq() uint64

func BuildWalCreatedEventKey added in v1.5.0

func BuildWalCreatedEventKey(info *ShardInfo) string

func DecrInuseWalCount added in v1.5.2

func DecrInuseWalCount()

func EncodeColVal added in v1.4.1

func EncodeColVal(dst []byte, cv *record.ColVal) []byte

func EncodeColValRow added in v1.4.1

func EncodeColValRow(dst []byte, typ int, cv *record.ColVal, rowIdx int) []byte

func EncodeColumnSchema added in v1.4.1

func EncodeColumnSchema(dst []byte, schema *record.Field) []byte

func EncodeRecord added in v1.4.1

func EncodeRecord(dst []byte, rec *record.Record) []byte

func EncodeRecordRow added in v1.4.1

func EncodeRecordRow(dst []byte, rec *record.Record, rowIdx int) []byte

func IncrInuseWalCount added in v1.5.2

func IncrInuseWalCount()

func IsUniqueSorted added in v1.4.1

func IsUniqueSorted[S ~[]E, E cmp.Ordered](x S) bool

func IsWriteLimited added in v1.5.2

func IsWriteLimited() bool

func LZ4CompressBlock

func LZ4CompressBlock(src, dst []byte) ([]byte, error)

func LZ4DecompressBlock

func LZ4DecompressBlock(src, dst []byte) ([]byte, error)

func Open

func Open()

func ReadRecord

func ReadRecord(shardID uint64, mst string, sid uint64, tr *util.TimeRange,
	schema record.Schemas, ascending bool) *record.Record

func SnappyCompressBlock

func SnappyCompressBlock(src, dst []byte) []byte

func SnappyDecompressBlock

func SnappyDecompressBlock(src, dst []byte) ([]byte, error)

func UpdateWriteLimited added in v1.5.2

func UpdateWriteLimited()

Types

type Blob added in v1.4.1

type Blob struct {
	// contains filtered or unexported fields
}

func (*Blob) Data added in v1.5.0

func (b *Blob) Data() []byte

func (*Blob) Done added in v1.4.1

func (b *Blob) Done(err error)

func (*Blob) Error added in v1.4.1

func (b *Blob) Error() error

func (*Blob) Hash added in v1.4.1

func (b *Blob) Hash() uint64

func (*Blob) Instance added in v1.5.0

func (b *Blob) Instance() transport.Codec

func (*Blob) IsEmpty added in v1.4.1

func (b *Blob) IsEmpty() bool

func (*Blob) Iterator added in v1.4.1

func (b *Blob) Iterator() *BlobIterator

func (*Blob) Marshal added in v1.5.0

func (b *Blob) Marshal(buf []byte) ([]byte, error)

func (*Blob) MicroSince added in v1.4.1

func (b *Blob) MicroSince() int64

func (*Blob) Reset added in v1.4.1

func (b *Blob) Reset()

func (*Blob) ResetTime added in v1.4.1

func (b *Blob) ResetTime()

func (*Blob) SetShardID added in v1.4.1

func (b *Blob) SetShardID(id uint64)

func (*Blob) ShardID added in v1.4.1

func (b *Blob) ShardID() uint64

func (*Blob) Size added in v1.5.0

func (b *Blob) Size() int

func (*Blob) TimeRange added in v1.4.1

func (b *Blob) TimeRange() *util.TimeRange

func (*Blob) Unmarshal added in v1.5.0

func (b *Blob) Unmarshal(buf []byte) error

func (*Blob) WriteRecordRow added in v1.4.1

func (b *Blob) WriteRecordRow(seriesKey []byte, rec *record.Record, rowIndex int)

type BlobGroup added in v1.4.1

type BlobGroup struct {
	// contains filtered or unexported fields
}

func NewBlobGroup added in v1.4.1

func NewBlobGroup(size int) (*BlobGroup, func())

func (*BlobGroup) Error added in v1.4.1

func (bg *BlobGroup) Error() error

func (*BlobGroup) GetBlobs added in v1.5.0

func (bg *BlobGroup) GetBlobs() []Blob

func (*BlobGroup) GetRecord added in v1.5.0

func (bg *BlobGroup) GetRecord() *record.Record

func (*BlobGroup) GroupingRow added in v1.4.1

func (bg *BlobGroup) GroupingRow(mst string, seriesKey []byte, rec *record.Record, rowIndex int)

func (*BlobGroup) Init added in v1.4.1

func (bg *BlobGroup) Init(size int)

func (*BlobGroup) Instance added in v1.5.0

func (bg *BlobGroup) Instance() transport.Codec

func (*BlobGroup) Marshal added in v1.5.0

func (bg *BlobGroup) Marshal(buf []byte) ([]byte, error)

func (*BlobGroup) MemSize added in v1.5.0

func (bg *BlobGroup) MemSize() int

func (*BlobGroup) Release added in v1.5.0

func (bg *BlobGroup) Release()

func (*BlobGroup) ResetTime added in v1.4.1

func (bg *BlobGroup) ResetTime()

func (*BlobGroup) SetBlobs added in v1.5.0

func (bg *BlobGroup) SetBlobs(blobs []Blob)

func (*BlobGroup) Size added in v1.5.0

func (bg *BlobGroup) Size() int

func (*BlobGroup) Unmarshal added in v1.5.0

func (bg *BlobGroup) Unmarshal(buf []byte) error

func (*BlobGroup) Wait added in v1.4.1

func (bg *BlobGroup) Wait()

func (*BlobGroup) Walk added in v1.4.1

func (bg *BlobGroup) Walk(fn func(blob *Blob))

type BlobIterator added in v1.4.1

type BlobIterator struct {
	// contains filtered or unexported fields
}

func NewBlobIterator added in v1.4.1

func NewBlobIterator(data []byte) *BlobIterator

func (*BlobIterator) Next added in v1.4.1

func (itr *BlobIterator) Next() ([]byte, []byte, error)

type Index

type Index interface {
	GetSeriesIdBySeriesKeyFromCache([]byte) (uint64, error)
	CreateIndexIfNotExistsBySeries([]byte, []byte, influx.PointTags) (uint64, error)
	GetSeries(sid uint64, buf []byte, condition influxql.Expr, callback func(key *influx.SeriesKey)) error
}

type IndexCreator

type IndexCreator struct {
	// contains filtered or unexported fields
}

func (*IndexCreator) Create

func (c *IndexCreator) Create(wal *Wal)

type IndexCreatorManager added in v1.5.0

type IndexCreatorManager struct {
	// contains filtered or unexported fields
}

func NewIndexCreatorManager added in v1.5.0

func NewIndexCreatorManager() *IndexCreatorManager

func (*IndexCreatorManager) Alloc added in v1.5.0

func (icm *IndexCreatorManager) Alloc(idx Index) *IndexCreator

func (*IndexCreatorManager) Recycle added in v1.5.0

func (icm *IndexCreatorManager) Recycle(creator *IndexCreator)

type Offsets

type Offsets struct {
	// contains filtered or unexported fields
}

func (*Offsets) Add

func (o *Offsets) Add(v int64)

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(id int, queue *util.Queue[Blob]) *Processor

func (*Processor) Close

func (p *Processor) Close()

func (*Processor) ForceFlush added in v1.4.1

func (p *Processor) ForceFlush(shardID uint64)

func (*Processor) GetShards

func (p *Processor) GetShards(shardID uint64) *Shard

func (*Processor) RegisterShard

func (p *Processor) RegisterShard(shardID uint64, info *ShardInfo)

func (*Processor) Run

func (p *Processor) Run()

func (*Processor) Stop

func (p *Processor) Stop()

func (*Processor) UnRegisterShard

func (p *Processor) UnRegisterShard(id uint64)

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner() *Runner

func (*Runner) Close

func (r *Runner) Close()

func (*Runner) ForceFlush added in v1.4.1

func (r *Runner) ForceFlush(shardID uint64)

func (*Runner) GetWALs

func (r *Runner) GetWALs(shardID uint64, mst string, tr *util.TimeRange) ([]*Wal, func())

func (*Runner) IndexCreatorManager added in v1.5.0

func (r *Runner) IndexCreatorManager() *IndexCreatorManager

func (*Runner) RegisterShard

func (r *Runner) RegisterShard(id uint64, info *ShardInfo)

func (*Runner) Schedule

func (r *Runner) Schedule(shardID uint64, blob *Blob)

func (*Runner) ScheduleGroup added in v1.4.1

func (r *Runner) ScheduleGroup(shardID uint64, group *BlobGroup) error

func (*Runner) Size

func (r *Runner) Size() int

func (*Runner) UnregisterShard

func (r *Runner) UnregisterShard(id uint64)

type SeriesKeyOffsets

type SeriesKeyOffsets struct {
	// contains filtered or unexported fields
}

func NewSeriesKeyOffsets

func NewSeriesKeyOffsets() *SeriesKeyOffsets

func (*SeriesKeyOffsets) Add

func (o *SeriesKeyOffsets) Add(key []byte, ofs int64)

func (*SeriesKeyOffsets) Len

func (o *SeriesKeyOffsets) Len() int

func (*SeriesKeyOffsets) Pop

func (o *SeriesKeyOffsets) Pop() ([]byte, int64, int64)

type SeriesMap added in v1.5.0

type SeriesMap struct {
	// contains filtered or unexported fields
}

func NewSeriesMap added in v1.5.0

func NewSeriesMap() *SeriesMap

func (*SeriesMap) Add added in v1.5.0

func (s *SeriesMap) Add(mst string, sid uint64)

func (*SeriesMap) GetAalMst added in v1.5.0

func (s *SeriesMap) GetAalMst() []string

func (*SeriesMap) GetSeriesIDs added in v1.5.0

func (s *SeriesMap) GetSeriesIDs(mst string) map[uint64]struct{}

func (*SeriesMap) HasMeasurement added in v1.5.0

func (s *SeriesMap) HasMeasurement(mst string) bool

func (*SeriesMap) IsEmpty added in v1.5.0

func (s *SeriesMap) IsEmpty() bool

type SeriesOffsets

type SeriesOffsets struct {
	// contains filtered or unexported fields
}

func NewSeriesOffsets

func NewSeriesOffsets() *SeriesOffsets

func (*SeriesOffsets) Add

func (o *SeriesOffsets) Add(key uint64, ofs int64)

func (*SeriesOffsets) Get

func (o *SeriesOffsets) Get(sid uint64, dst []int64) []int64

func (*SeriesOffsets) GetAllKeyNoLock

func (o *SeriesOffsets) GetAllKeyNoLock(dst []uint64) []uint64

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

func NewShard

func NewShard(workerID int, info *ShardInfo, freeDuration uint64) *Shard

func (*Shard) ConvertToTSSP added in v1.5.0

func (s *Shard) ConvertToTSSP()

func (*Shard) CreateWal

func (s *Shard) CreateWal() *Wal

func (*Shard) ForceFlush added in v1.4.1

func (s *Shard) ForceFlush()

func (*Shard) Free

func (s *Shard) Free()

func (*Shard) GetWalDir

func (s *Shard) GetWalDir() string

func (*Shard) GetWalReaders

func (s *Shard) GetWalReaders(dst []*Wal, mst string, tr *util.TimeRange) []*Wal

func (*Shard) Load

func (s *Shard) Load()

func (*Shard) Run

func (s *Shard) Run()

func (*Shard) Stop

func (s *Shard) Stop()

func (*Shard) SwitchWal added in v1.5.0

func (s *Shard) SwitchWal(wal *Wal, force bool) *Wal

func (*Shard) SwitchWalIfNeeded added in v1.5.0

func (s *Shard) SwitchWalIfNeeded()

func (*Shard) UpdateWal added in v1.4.1

func (s *Shard) UpdateWal(tr *util.TimeRange) *Wal

func (*Shard) Wait

func (s *Shard) Wait()

func (*Shard) Write

func (s *Shard) Write(wal *Wal, seriesKey, rec []byte) error

type ShardInfo

type ShardInfo struct {
	// contains filtered or unexported fields
}

func NewShardInfo

func NewShardInfo(ident *meta.ShardIdentifier, filePath, walPath string, lock *string,
	tbStore immutable.TablesStore, index Index, options *obs.ObsOptions) *ShardInfo

func (*ShardInfo) Index added in v1.5.0

func (si *ShardInfo) Index() Index

type Wal

type Wal struct {
	util.Reference

	StateSwitching bool
	// contains filtered or unexported fields
}

func NewWal

func NewWal(dir string, lock *string, opt *obs.ObsOptions) *Wal

func (*Wal) AddSeriesOffsets

func (wal *Wal) AddSeriesOffsets(sid uint64, offsets int64)

func (*Wal) AddTargetTSSPFiles added in v1.5.0

func (wal *Wal) AddTargetTSSPFiles(files ...immutable.TSSPFile)

func (*Wal) BackgroundSync

func (wal *Wal) BackgroundSync()

func (*Wal) Clean

func (wal *Wal) Clean()

func (*Wal) EndWrite added in v1.4.1

func (wal *Wal) EndWrite()

func (*Wal) Expired

func (wal *Wal) Expired() bool

func (*Wal) Flush

func (wal *Wal) Flush() error

func (*Wal) ForceUnref added in v1.5.0

func (wal *Wal) ForceUnref()

func (*Wal) FreeMemory added in v1.5.0

func (wal *Wal) FreeMemory()

func (*Wal) GetAllSid

func (wal *Wal) GetAllSid(dst []uint64) []uint64

func (*Wal) HasMeasurement added in v1.5.0

func (wal *Wal) HasMeasurement(mst string) bool

func (*Wal) InMemSize added in v1.5.0

func (wal *Wal) InMemSize() int64

func (*Wal) LoadFromDisk added in v1.5.0

func (wal *Wal) LoadFromDisk() error

func (*Wal) MapSeries added in v1.5.0

func (wal *Wal) MapSeries(mst string, sid uint64)

func (*Wal) MinMaxTime added in v1.5.0

func (wal *Wal) MinMaxTime() (int64, int64, error)

func (*Wal) MustClose

func (wal *Wal) MustClose()

func (*Wal) Name

func (wal *Wal) Name() string

func (*Wal) NeedCreateIndex

func (wal *Wal) NeedCreateIndex() bool

func (*Wal) NeedSwitch added in v1.5.0

func (wal *Wal) NeedSwitch() bool

func (*Wal) Opened

func (wal *Wal) Opened() bool

func (*Wal) Overlaps added in v1.5.0

func (wal *Wal) Overlaps(min, max int64) bool

func (*Wal) PopSeriesKey

func (wal *Wal) PopSeriesKey() ([]byte, int64, int64)

func (*Wal) PreLoad added in v1.5.0

func (wal *Wal) PreLoad(name string)

func (*Wal) ReadBlock

func (wal *Wal) ReadBlock(ctx *WalCtx, ofs int64) ([]byte, error)

func (*Wal) ReadRecord

func (wal *Wal) ReadRecord(ctx *WalCtx, sid uint64, dst *record.Record, keepSchema bool) error

func (*Wal) SizeLimited

func (wal *Wal) SizeLimited() bool

func (*Wal) Sync

func (wal *Wal) Sync() error

func (*Wal) TargetContain added in v1.5.0

func (wal *Wal) TargetContain(files ...immutable.TSSPFile) bool

func (*Wal) UpdateTimeRange added in v1.4.1

func (wal *Wal) UpdateTimeRange(tr *util.TimeRange)

func (*Wal) WriteRecord

func (wal *Wal) WriteRecord(sid uint64, seriesKey []byte, rec []byte) error

func (*Wal) WrittenSize

func (wal *Wal) WrittenSize() int64

type WalBlockHeader

type WalBlockHeader struct {
	// contains filtered or unexported fields
}

func (*WalBlockHeader) Put

func (h *WalBlockHeader) Put(dst []byte)

func (*WalBlockHeader) Set

func (h *WalBlockHeader) Set(size int, mstLen int, typeFlag, compressFlag uint8, sid uint64)

func (*WalBlockHeader) Unmarshal

func (h *WalBlockHeader) Unmarshal(src []byte) error

type WalConsumeReader added in v1.5.0

type WalConsumeReader struct {
	// contains filtered or unexported fields
}

func NewWalConsumeReader added in v1.5.0

func NewWalConsumeReader(wal *Wal, ctx *WalCtx, wantTable string, ofs int64) *WalConsumeReader

func (*WalConsumeReader) ReadBlock added in v1.5.0

func (wcr *WalConsumeReader) ReadBlock() (sid uint64, seriesKey []byte, recData []byte, err error)

func (*WalConsumeReader) Release added in v1.5.0

func (wcr *WalConsumeReader) Release()

type WalCreatedEvent added in v1.5.0

type WalCreatedEvent struct {
	Wal    *Wal
	Info   *ShardInfo
	WorkId int
}

func (*WalCreatedEvent) CreateIterator added in v1.5.0

func (e *WalCreatedEvent) CreateIterator(mst string) record.RecIterator

func (*WalCreatedEvent) Ref added in v1.5.0

func (e *WalCreatedEvent) Ref()

func (*WalCreatedEvent) UnRef added in v1.5.0

func (e *WalCreatedEvent) UnRef()

func (*WalCreatedEvent) UniqueId added in v1.5.0

func (e *WalCreatedEvent) UniqueId() uint64

type WalCtx

type WalCtx struct {
	// contains filtered or unexported fields
}

func NewWalCtx

func NewWalCtx() (*WalCtx, func())

func (*WalCtx) Instance added in v1.4.1

func (ctx *WalCtx) Instance() *WalCtx

func (*WalCtx) MemSize

func (ctx *WalCtx) MemSize() int

func (*WalCtx) Record added in v1.5.0

func (ctx *WalCtx) Record() *record.Record

type WalFile

type WalFile struct {
	WalFileHot
	// contains filtered or unexported fields
}

func NewWalFile

func NewWalFile(name string, lock *string) *WalFile

func (*WalFile) Close

func (wf *WalFile) Close() error

func (*WalFile) Flush

func (wf *WalFile) Flush() error

func (*WalFile) IsClosed added in v1.5.0

func (wf *WalFile) IsClosed() bool

func (*WalFile) LoadIntoMemory

func (wf *WalFile) LoadIntoMemory()

func (*WalFile) Name

func (wf *WalFile) Name() string

func (*WalFile) Open

func (wf *WalFile) Open() error

func (*WalFile) OpenReadonly added in v1.5.0

func (wf *WalFile) OpenReadonly() error

func (*WalFile) Read added in v1.5.0

func (wf *WalFile) Read(dst []byte) (int, error)

func (*WalFile) ReadAt

func (wf *WalFile) ReadAt(dst []byte, ofs int64) (int, error)

func (*WalFile) ReadFull added in v1.5.0

func (wf *WalFile) ReadFull(dst []byte) (int, error)

func (*WalFile) Seek added in v1.5.0

func (wf *WalFile) Seek(offset int64, whence int) (int64, error)

func (*WalFile) Sync

func (wf *WalFile) Sync() error

func (*WalFile) Write

func (wf *WalFile) Write(b []byte) (int, error)

func (*WalFile) WrittenSize

func (wf *WalFile) WrittenSize() int64

type WalFileHot added in v1.5.0

type WalFileHot struct {
	// contains filtered or unexported fields
}

func (*WalFileHot) FreeMemory added in v1.5.0

func (wf *WalFileHot) FreeMemory()

func (*WalFileHot) InMemSize added in v1.5.0

func (wf *WalFileHot) InMemSize() int64

func (*WalFileHot) OpenMemFile added in v1.5.0

func (wf *WalFileHot) OpenMemFile()

type WalIterator added in v1.5.0

type WalIterator struct {
	// contains filtered or unexported fields
}

WalIterator iterator single wal file

func NewWalIterator added in v1.5.0

func NewWalIterator(wal *Wal, mst string, idx Index) *WalIterator

func (*WalIterator) Next added in v1.5.0

func (it *WalIterator) Next() (*record.Record, error)

func (*WalIterator) Release added in v1.5.0

func (it *WalIterator) Release()

type WalIterators added in v1.5.0

type WalIterators struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

WalIterators iterates multiple WAL files

func NewWalIterators added in v1.5.0

func NewWalIterators(walSlice []*Wal, mst string, schema record.Schemas, tagSelected record.Schemas,
	fieldSelected map[string]struct{}, rowLimit int, idx Index) *WalIterators

func (*WalIterators) AddIterator added in v1.5.0

func (its *WalIterators) AddIterator(itr record.RecIterator)

func (*WalIterators) Next added in v1.5.0

func (its *WalIterators) Next() (*record.ConsumeRecord, error)

Next return ConsumeRecord from files of wal

func (*WalIterators) Release added in v1.5.0

func (its *WalIterators) Release()

func (*WalIterators) RemoveFilterColumns added in v1.5.0

func (its *WalIterators) RemoveFilterColumns(rec *record.Record) (*record.Record, error)

RemoveFilterColumns Remove the redundant columns used for filtering

type WalReader added in v1.4.1

type WalReader struct {
	// contains filtered or unexported fields
}

func NewWalReader added in v1.4.1

func NewWalReader(shardID uint64, mst string, tr *util.TimeRange) *WalReader

func (*WalReader) Exclude added in v1.5.0

func (r *WalReader) Exclude(files ...immutable.TSSPFile)

func (*WalReader) Ref added in v1.4.1

func (r *WalReader) Ref()

func (*WalReader) UnRef added in v1.4.1

func (r *WalReader) UnRef()

func (*WalReader) Values added in v1.4.1

func (r *WalReader) Values(_ string, sid uint64, tr util.TimeRange, schema record.Schemas, ascending bool) *record.Record

type WalRecordDecoder added in v1.4.1

type WalRecordDecoder struct {
	// contains filtered or unexported fields
}

func NewWalRecordDecoder added in v1.4.1

func NewWalRecordDecoder() *WalRecordDecoder

func (*WalRecordDecoder) Decode added in v1.4.1

func (c *WalRecordDecoder) Decode(rec *record.Record, buf []byte) error

func (*WalRecordDecoder) DecodeColVal added in v1.4.1

func (c *WalRecordDecoder) DecodeColVal(typ int, col *record.ColVal) error

func (*WalRecordDecoder) DecodeColValOneRowMode added in v1.4.1

func (c *WalRecordDecoder) DecodeColValOneRowMode(typ int, col *record.ColVal)

func (*WalRecordDecoder) DecodeColumnSchema added in v1.4.1

func (c *WalRecordDecoder) DecodeColumnSchema(dst *record.Field)

func (*WalRecordDecoder) Reset added in v1.5.0

func (c *WalRecordDecoder) Reset(buf []byte)

type WalRecordIterator

type WalRecordIterator struct {
	// contains filtered or unexported fields
}

func NewWalRecordIterator added in v1.5.0

func NewWalRecordIterator(wal *Wal) (*WalRecordIterator, func())

func (*WalRecordIterator) Next

func (itr *WalRecordIterator) Next(dst *record.Record) (uint64, error)

func (*WalRecordIterator) SetSeries added in v1.5.0

func (itr *WalRecordIterator) SetSeries(series map[uint64]struct{})

type WriterFlusher

type WriterFlusher interface {
	io.Writer
	Flush() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL