fracmanager

package
v0.61.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: Apache-2.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrSealingFractionSuicided = errors.New("sealing fraction is suicided")

Functions

func NewFracCacheFromDisk

func NewFracCacheFromDisk(filePath string) *sealedFracCache

func NewLoader

func NewLoader(
	config *Config, fracProvider *fractionProvider,
	fracCache *sealedFracCache,
) *loader

func NewSealedFracCache

func NewSealedFracCache(filePath string) *sealedFracCache

Types

type AsyncSearchRequest

type AsyncSearchRequest struct {
	ID        string `json:"-"`
	Params    processor.SearchParams
	Query     string
	Retention time.Duration
	WithDocs  bool
}

type AsyncSearchStatus added in v0.59.0

type AsyncSearchStatus byte
const (
	AsyncSearchStatusDone AsyncSearchStatus = iota
	AsyncSearchStatusInProgress
	AsyncSearchStatusError
	AsyncSearchStatusCanceled
)

type AsyncSearcher

type AsyncSearcher struct {
	// contains filtered or unexported fields
}

func MustStartAsync

func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs List) *AsyncSearcher

func (*AsyncSearcher) CancelSearch added in v0.59.0

func (as *AsyncSearcher) CancelSearch(id string)

func (*AsyncSearcher) DeleteSearch added in v0.59.0

func (as *AsyncSearcher) DeleteSearch(id string)

func (*AsyncSearcher) FetchSearchResult

func (*AsyncSearcher) GetAsyncSearchesList added in v0.59.0

func (as *AsyncSearcher) GetAsyncSearchesList(r GetAsyncSearchesListRequest) []*AsyncSearchesListItem

func (*AsyncSearcher) StartSearch

func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs List) error

type AsyncSearcherConfig

type AsyncSearcherConfig struct {
	DataDir string
	Workers int

	MaxSize           int
	MaxSizePerRequest int
}

type AsyncSearchesListItem added in v0.59.0

type AsyncSearchesListItem struct {
	ID     string
	Status AsyncSearchStatus

	StartedAt  time.Time
	ExpiresAt  time.Time
	CanceledAt time.Time

	FracsDone    int
	FracsInQueue int
	DiskUsage    int

	// Search request info
	AggQueries   []processor.AggQuery
	HistInterval uint64
	Query        string
	From         seq.MID
	To           seq.MID
	Retention    time.Duration
	WithDocs     bool
	Size         int64
}

type CacheMaintainer

type CacheMaintainer struct {
	// contains filtered or unexported fields
}

func NewCacheMaintainer

func NewCacheMaintainer(totalCacheSize, sortCacheSize uint64, metrics *CacheMaintainerMetrics) *CacheMaintainer

func (*CacheMaintainer) CreateDocBlockCache

func (cm *CacheMaintainer) CreateDocBlockCache() *cache.Cache[[]byte]

func (*CacheMaintainer) CreateIndexCache

func (cm *CacheMaintainer) CreateIndexCache() *frac.IndexCache

func (*CacheMaintainer) CreateSortDocsCache

func (cm *CacheMaintainer) CreateSortDocsCache() *cache.Cache[[]byte]

func (*CacheMaintainer) Reset

func (cm *CacheMaintainer) Reset()

Reset is used in tests only

func (*CacheMaintainer) RunCleanLoop

func (cm *CacheMaintainer) RunCleanLoop(done <-chan struct{}, cleanupInterval, gcInterval time.Duration) *sync.WaitGroup

type CacheMaintainerMetrics

type CacheMaintainerMetrics struct {
	HitsTotal       *prometheus.CounterVec
	MissTotal       *prometheus.CounterVec
	PanicsTotal     *prometheus.CounterVec
	LockWaitsTotal  *prometheus.CounterVec
	WaitsTotal      *prometheus.CounterVec
	ReattemptsTotal *prometheus.CounterVec
	SizeRead        *prometheus.CounterVec
	SizeOccupied    *prometheus.CounterVec
	SizeReleased    *prometheus.CounterVec
	MapsRecreated   *prometheus.CounterVec
	MissLatency     *prometheus.CounterVec

	Oldest            *prometheus.GaugeVec
	AddBuckets        *prometheus.CounterVec
	DelBuckets        *prometheus.CounterVec
	CleanGenerations  *prometheus.CounterVec
	ChangeGenerations *prometheus.CounterVec
}

func (*CacheMaintainerMetrics) GetCleanerMetrics

func (m *CacheMaintainerMetrics) GetCleanerMetrics(cleanerLabel string) *cache.CleanerMetrics

func (*CacheMaintainerMetrics) GetLayerMetrics

func (m *CacheMaintainerMetrics) GetLayerMetrics(layerName string) *cache.Metrics

type Config

type Config struct {
	DataDir string

	FracSize  uint64
	TotalSize uint64
	CacheSize uint64

	FracLoadLimit     uint64 // how many sealed fractions should fracmanager load, if 0 then loads all
	ShouldReplay      bool
	MaintenanceDelay  time.Duration
	CacheCleanupDelay time.Duration
	CacheGCDelay      time.Duration
	SealParams        frac.SealParams
	SortCacheSize     uint64 // size for docs cache for active fraction
	Fraction          frac.Config

	OffloadingEnabled   bool
	OffloadingForced    bool
	OffloadingRetention time.Duration
}

func FillConfigWithDefault

func FillConfigWithDefault(config *Config) *Config

type FetchSearchResultRequest

type FetchSearchResultRequest struct {
	ID    string
	Limit int
	Order seq.DocsOrder
}

type FetchSearchResultResponse

type FetchSearchResultResponse struct {
	Status     AsyncSearchStatus
	QPR        seq.QPR
	CanceledAt time.Time
	Error      string

	StartedAt time.Time
	ExpiresAt time.Time

	FracsDone    int
	FracsInQueue int
	DiskUsage    int

	// Stuff that needed seq-db proxy to complete async search response.
	AggQueries   []processor.AggQuery
	HistInterval uint64

	Query     string
	From      seq.MID
	To        seq.MID
	Retention time.Duration
	WithDocs  bool
	Size      int64
}

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields
}

func NewFetcher

func NewFetcher(maxWorkersNum int) *Fetcher

func (*Fetcher) FetchDocs

func (f *Fetcher) FetchDocs(ctx context.Context, fracs List, ids []seq.IDSource) ([][]byte, error)

type FracManager

type FracManager struct {
	// contains filtered or unexported fields
}

func NewFracManager

func NewFracManager(ctx context.Context, cfg *Config, s3cli *s3.Client) *FracManager

func (*FracManager) Active

func (fm *FracManager) Active() frac.Fraction

func (*FracManager) Append

func (fm *FracManager) Append(ctx context.Context, docs, metas storage.DocBlock) error

func (*FracManager) GetAllFracs

func (fm *FracManager) GetAllFracs() (fracs List)

GetAllFracs returns a list of known fracs (local and remote).

While working with this list, it may become irrelevant (factions may, for example, be deleted). This is a valid situation, because access to the data of these factions (search and fetch) occurs under blocking (see DataProvider). This way we avoid the race.

Accessing the deleted faction data just will return an empty result.

func (*FracManager) Load

func (fm *FracManager) Load(ctx context.Context) error

func (*FracManager) Mature

func (fm *FracManager) Mature() bool

func (*FracManager) OffloadForcedForTests added in v0.60.0

func (fm *FracManager) OffloadForcedForTests()

func (*FracManager) OldestCT

func (fm *FracManager) OldestCT() uint64

func (*FracManager) ResetCacheForTests

func (fm *FracManager) ResetCacheForTests()

func (*FracManager) SealForcedForTests

func (fm *FracManager) SealForcedForTests()

func (*FracManager) Start

func (fm *FracManager) Start()

func (*FracManager) Stop

func (fm *FracManager) Stop()

func (*FracManager) WaitIdle

func (fm *FracManager) WaitIdle()

func (*FracManager) Writer

func (fm *FracManager) Writer() *proxyFrac

type FracType added in v0.60.0

type FracType int
const (
	FracTypeLocal FracType = 1 << iota
	FracTypeRemote
)

type GetAsyncSearchesListRequest added in v0.59.0

type GetAsyncSearchesListRequest struct {
	Status *AsyncSearchStatus
	IDs    []string
}

type List

type List []frac.Fraction

func (List) FilterInRange

func (l List) FilterInRange(from, to seq.MID) List

func (List) GetOldestFrac

func (l List) GetOldestFrac() frac.Fraction

func (List) GetTotalSize

func (l List) GetTotalSize() uint64

func (*List) Shift

func (l *List) Shift(n int) []frac.Fraction

func (List) Sort

func (l List) Sort(order seq.DocsOrder)

type MappingProvider

type MappingProvider interface {
	GetMapping() seq.Mapping
}

type Searcher

type Searcher struct {
	// contains filtered or unexported fields
}

func NewSearcher

func NewSearcher(maxWorkersNum int, cfg SearcherCfg) *Searcher

func (*Searcher) SearchDocs

func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params processor.SearchParams) (*seq.QPR, error)

type SearcherCfg

type SearcherCfg struct {
	MaxFractionHits       int // the maximum number of fractions used in the search
	FractionsPerIteration int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL