compactor

package
v3.6.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: AGPL-3.0 Imports: 51 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HorizontalScalingModeDisabled = "disabled"
	HorizontalScalingModeMain     = "main"
	HorizontalScalingModeWorker   = "worker"
)

Variables

This section is empty.

Functions

func BuildUserID

func BuildUserID(id int) string

func NewWorkerManager added in v3.6.0

func NewWorkerManager(
	cfg Config,
	grpcClient jobqueue.CompactorClient,
	schemaConfig config.SchemaConfig,
	chunkClients map[config.DayTime]client.Client,
	r prometheus.Registerer,
) (services.Service, error)

func SchemaPeriodForTable

func SchemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool)

func SetupTable

func SetupTable(t *testing.T, path string, commonDBsConfig IndexesConfig, perUserDBsConfig PerUserIndexesConfig)

func SortTablesByRange

func SortTablesByRange(tables []string)

Types

type Chunk added in v3.6.0

type Chunk interface {
	GetFrom() model.Time
	GetThrough() model.Time
	GetFingerprint() uint64
	GetChecksum() uint32
	GetSize() uint32
	GetEntriesCount() uint32
}

type CompactedIndex

type CompactedIndex interface {
	// IndexProcessor is used for applying custom retention and processing delete requests.
	retention.IndexProcessor
	// Cleanup should clean up all the state built during compaction.
	// It is typically called at the end or in case of an error.
	Cleanup()
	// ToIndexFile is used to convert the CompactedIndex to an IndexFile for uploading to the object store.
	// Once the IndexFile is uploaded using Index.Reader, the file is closed using Index.Close and removed from disk using Index.Path.
	ToIndexFile() (index.Index, error)
}

CompactedIndex is built by TableCompactor for IndexSet after compaction. It would be used for: 1. applying custom retention, processing delete requests using IndexProcessor 2. uploading the compacted index to storage by converting it to index.Index using ToIndexFile After all the operations are successfully done or in case of failure, Cleanup would be called to cleanup the state.

type Compactor

type Compactor struct {
	services.Service

	DeleteRequestsHandler     *deletion.DeleteRequestHandler
	DeleteRequestsGRPCHandler *deletion.GRPCRequestHandler

	JobQueue *jobqueue.Queue
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor(
	cfg Config,
	objectStoreClients map[config.DayTime]client.ObjectClient,
	deleteStoreClient client.ObjectClient,
	schemaConfig config.SchemaConfig,
	limits Limits,
	indexUpdatePropagationMaxDelay time.Duration,
	r prometheus.Registerer,
	metricsNamespace string,
) (*Compactor, error)

func (*Compactor) Handler added in v3.5.0

func (c *Compactor) Handler() (string, http.Handler)

func (*Compactor) OnRingInstanceHeartbeat

func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)

func (*Compactor) OnRingInstanceRegister

func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens)

func (*Compactor) OnRingInstanceStopping

func (c *Compactor) OnRingInstanceStopping(_ *ring.BasicLifecycler)

func (*Compactor) OnRingInstanceTokens

func (c *Compactor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)

func (*Compactor) RegisterIndexCompactor

func (c *Compactor) RegisterIndexCompactor(indexType string, indexCompactor IndexCompactor)

func (*Compactor) TablesManager added in v3.6.0

func (c *Compactor) TablesManager() TablesManager

type Config

type Config struct {
	WorkingDirectory               string                `yaml:"working_directory"`
	CompactionInterval             time.Duration         `yaml:"compaction_interval"`
	ApplyRetentionInterval         time.Duration         `yaml:"apply_retention_interval"`
	RetentionEnabled               bool                  `yaml:"retention_enabled"`
	RetentionDeleteDelay           time.Duration         `yaml:"retention_delete_delay"`
	RetentionDeleteWorkCount       int                   `yaml:"retention_delete_worker_count"`
	RetentionTableTimeout          time.Duration         `yaml:"retention_table_timeout"`
	RetentionBackoffConfig         backoff.Config        `yaml:"retention_backoff_config"`
	DeleteRequestStore             string                `yaml:"delete_request_store"`
	DeleteRequestStoreKeyPrefix    string                `yaml:"delete_request_store_key_prefix"`
	DeleteRequestStoreDBType       string                `yaml:"delete_request_store_db_type"`
	BackupDeleteRequestStoreDBType string                `yaml:"backup_delete_request_store_db_type"`
	DeleteBatchSize                int                   `yaml:"delete_batch_size"`
	DeleteRequestCancelPeriod      time.Duration         `yaml:"delete_request_cancel_period"`
	DeleteMaxInterval              time.Duration         `yaml:"delete_max_interval"`
	MaxCompactionParallelism       int                   `yaml:"max_compaction_parallelism"`
	UploadParallelism              int                   `yaml:"upload_parallelism"`
	CompactorRing                  lokiring.RingConfig   `` /* 210-byte string literal not displayed */
	RunOnce                        bool                  `yaml:"_" doc:"hidden"`
	TablesToCompact                int                   `yaml:"tables_to_compact"`
	SkipLatestNTables              int                   `yaml:"skip_latest_n_tables"`
	HorizontalScalingMode          string                `yaml:"horizontal_scaling_mode"`
	WorkerConfig                   jobqueue.WorkerConfig `yaml:"worker_config"`
	JobsConfig                     JobsConfig            `yaml:"jobs_config"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers flags.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate verifies the config does not contain inappropriate values

type DeleteRequestResponse added in v3.5.0

type DeleteRequestResponse struct {
	RequestID string `json:"request_id"`
	StartTime int64  `json:"start_time"`
	EndTime   int64  `json:"end_time"`
	Query     string `json:"query"`
	Status    string `json:"status"`
	CreatedAt int64  `json:"created_at"`
	UserID    string `json:"user_id"`
}

type DeletionJobsConfig added in v3.6.0

type DeletionJobsConfig struct {
	DeletionManifestStorePrefix string        `yaml:"deletion_manifest_store_prefix"`
	ChunkProcessingConcurrency  int           `yaml:"chunk_processing_concurrency"`
	Timeout                     time.Duration `yaml:"timeout"`
	MaxRetries                  int           `yaml:"max_retries"`
}

func (*DeletionJobsConfig) RegisterFlags added in v3.6.0

func (c *DeletionJobsConfig) RegisterFlags(f *flag.FlagSet)

func (*DeletionJobsConfig) RegisterFlagsWithPrefix added in v3.6.0

func (c *DeletionJobsConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type IndexCompactor

type IndexCompactor interface {
	// NewTableCompactor returns a new TableCompactor for compacting a table.
	// commonIndexSet refers to common index files or in other words multi-tenant index.
	// existingUserIndexSet refers to existing user specific index files in the storage.
	// makeEmptyUserIndexSetFunc can be used for creating an empty indexSet for a user
	// who does not have an index for it in existingUserIndexSet.
	// periodConfig holds the PeriodConfig for the table.
	NewTableCompactor(
		ctx context.Context,
		commonIndexSet IndexSet,
		existingUserIndexSet map[string]IndexSet,
		makeEmptyUserIndexSetFunc MakeEmptyUserIndexSetFunc,
		periodConfig config.PeriodConfig,
	) TableCompactor

	// OpenCompactedIndexFile opens a compressed index file at given path.
	OpenCompactedIndexFile(
		ctx context.Context,
		path,
		tableName,
		userID,
		workingDir string,
		periodConfig config.PeriodConfig,
		logger log.Logger,
	) (
		CompactedIndex,
		error,
	)
}

type IndexFileConfig

type IndexFileConfig struct {
	CompressFile bool
}

type IndexRecords

type IndexRecords struct {
	Start, NumRecords int
}

type IndexSet

type IndexSet interface {
	GetTableName() string
	ListSourceFiles() []storage.IndexFile
	GetSourceFile(indexFile storage.IndexFile) (string, error)
	GetLogger() log.Logger
	GetWorkingDir() string
	// SetCompactedIndex sets the CompactedIndex for upload/applying retention and making the compactor remove the source files.
	// CompactedIndex can be nil only in case of all the source files in common index set being compacted away to per tenant index.
	// It would return an error if the CompactedIndex is nil and removeSourceFiles is true in case of user index set since
	// compaction should either create new files or can be a noop if there is nothing to compact.
	// There is no need to call SetCompactedIndex if no changes were made to the index for this IndexSet.
	SetCompactedIndex(compactedIndex CompactedIndex, removeSourceFiles bool) error
}

type IndexesConfig

type IndexesConfig struct {
	NumUnCompactedFiles, NumCompactedFiles int
}

func (IndexesConfig) String

func (c IndexesConfig) String() string

type JobsConfig added in v3.6.0

type JobsConfig struct {
	Deletion DeletionJobsConfig `yaml:"deletion"`
}

func (*JobsConfig) RegisterFlags added in v3.6.0

func (c *JobsConfig) RegisterFlags(f *flag.FlagSet)

func (*JobsConfig) RegisterFlagsWithPrefix added in v3.6.0

func (c *JobsConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type Limits

type Limits interface {
	deletion.Limits
	retention.Limits
	DefaultLimits() *validation.Limits
}

type MakeEmptyUserIndexSetFunc

type MakeEmptyUserIndexSetFunc func(userID string) (IndexSet, error)

type PerUserIndexesConfig

type PerUserIndexesConfig struct {
	IndexesConfig
	NumUsers int
}

func (PerUserIndexesConfig) String

func (c PerUserIndexesConfig) String() string

type TableCompactor

type TableCompactor interface {
	// CompactTable compacts the table.
	// After compaction is done successfully, it should set the new/updated CompactedIndex for relevant IndexSets.
	CompactTable() (err error)
}

type TablesManager added in v3.6.0

type TablesManager interface {
	CompactTable(ctx context.Context, tableName string, applyRetention bool) error
	ApplyStorageUpdates(ctx context.Context, iterator deletion.StorageUpdatesIterator) error
	IterateTables(ctx context.Context, callback func(string, deletion.Table) error) (err error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL