Documentation
¶
Index ¶
- Constants
- func BuildUserID(id int) string
- func NewWorkerManager(cfg Config, grpcClient jobqueue.CompactorClient, ...) (services.Service, error)
- func SchemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool)
- func SetupTable(t *testing.T, path string, commonDBsConfig IndexesConfig, ...)
- func SortTablesByRange(tables []string)
- type Chunk
- type CompactedIndex
- type Compactor
- func (c *Compactor) Handler() (string, http.Handler)
- func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)
- func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, ...) (ring.InstanceState, ring.Tokens)
- func (c *Compactor) OnRingInstanceStopping(_ *ring.BasicLifecycler)
- func (c *Compactor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)
- func (c *Compactor) RegisterIndexCompactor(indexType string, indexCompactor IndexCompactor)
- func (c *Compactor) TablesManager() TablesManager
- type Config
- type DeleteRequestResponse
- type DeletionJobsConfig
- type IndexCompactor
- type IndexFileConfig
- type IndexRecords
- type IndexSet
- type IndexesConfig
- type JobsConfig
- type Limits
- type MakeEmptyUserIndexSetFunc
- type PerUserIndexesConfig
- type TableCompactor
- type TablesManager
Constants ¶
View Source
const ( HorizontalScalingModeDisabled = "disabled" HorizontalScalingModeMain = "main" HorizontalScalingModeWorker = "worker" )
Variables ¶
This section is empty.
Functions ¶
func BuildUserID ¶
func NewWorkerManager ¶ added in v3.6.0
func NewWorkerManager( cfg Config, grpcClient jobqueue.CompactorClient, schemaConfig config.SchemaConfig, chunkClients map[config.DayTime]client.Client, r prometheus.Registerer, ) (services.Service, error)
func SchemaPeriodForTable ¶
func SchemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool)
func SetupTable ¶
func SetupTable(t *testing.T, path string, commonDBsConfig IndexesConfig, perUserDBsConfig PerUserIndexesConfig)
func SortTablesByRange ¶
func SortTablesByRange(tables []string)
Types ¶
type CompactedIndex ¶
type CompactedIndex interface {
// IndexProcessor is used for applying custom retention and processing delete requests.
retention.IndexProcessor
// Cleanup should clean up all the state built during compaction.
// It is typically called at the end or in case of an error.
Cleanup()
// ToIndexFile is used to convert the CompactedIndex to an IndexFile for uploading to the object store.
// Once the IndexFile is uploaded using Index.Reader, the file is closed using Index.Close and removed from disk using Index.Path.
ToIndexFile() (index.Index, error)
}
CompactedIndex is built by TableCompactor for IndexSet after compaction. It would be used for: 1. applying custom retention, processing delete requests using IndexProcessor 2. uploading the compacted index to storage by converting it to index.Index using ToIndexFile After all the operations are successfully done or in case of failure, Cleanup would be called to cleanup the state.
type Compactor ¶
type Compactor struct {
services.Service
DeleteRequestsHandler *deletion.DeleteRequestHandler
DeleteRequestsGRPCHandler *deletion.GRPCRequestHandler
JobQueue *jobqueue.Queue
// contains filtered or unexported fields
}
func NewCompactor ¶
func NewCompactor( cfg Config, objectStoreClients map[config.DayTime]client.ObjectClient, deleteStoreClient client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, indexUpdatePropagationMaxDelay time.Duration, r prometheus.Registerer, metricsNamespace string, ) (*Compactor, error)
func (*Compactor) OnRingInstanceHeartbeat ¶
func (c *Compactor) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)
func (*Compactor) OnRingInstanceRegister ¶
func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens)
func (*Compactor) OnRingInstanceStopping ¶
func (c *Compactor) OnRingInstanceStopping(_ *ring.BasicLifecycler)
func (*Compactor) OnRingInstanceTokens ¶
func (c *Compactor) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)
func (*Compactor) RegisterIndexCompactor ¶
func (c *Compactor) RegisterIndexCompactor(indexType string, indexCompactor IndexCompactor)
func (*Compactor) TablesManager ¶ added in v3.6.0
func (c *Compactor) TablesManager() TablesManager
type Config ¶
type Config struct {
WorkingDirectory string `yaml:"working_directory"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"`
RetentionBackoffConfig backoff.Config `yaml:"retention_backoff_config"`
DeleteRequestStore string `yaml:"delete_request_store"`
DeleteRequestStoreKeyPrefix string `yaml:"delete_request_store_key_prefix"`
DeleteRequestStoreDBType string `yaml:"delete_request_store_db_type"`
BackupDeleteRequestStoreDBType string `yaml:"backup_delete_request_store_db_type"`
DeleteBatchSize int `yaml:"delete_batch_size"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
DeleteMaxInterval time.Duration `yaml:"delete_max_interval"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
UploadParallelism int `yaml:"upload_parallelism"`
CompactorRing lokiring.RingConfig `` /* 210-byte string literal not displayed */
RunOnce bool `yaml:"_" doc:"hidden"`
TablesToCompact int `yaml:"tables_to_compact"`
SkipLatestNTables int `yaml:"skip_latest_n_tables"`
HorizontalScalingMode string `yaml:"horizontal_scaling_mode"`
WorkerConfig jobqueue.WorkerConfig `yaml:"worker_config"`
JobsConfig JobsConfig `yaml:"jobs_config"`
}
func (*Config) RegisterFlags ¶
RegisterFlags registers flags.
type DeleteRequestResponse ¶ added in v3.5.0
type DeletionJobsConfig ¶ added in v3.6.0
type DeletionJobsConfig struct {
DeletionManifestStorePrefix string `yaml:"deletion_manifest_store_prefix"`
ChunkProcessingConcurrency int `yaml:"chunk_processing_concurrency"`
Timeout time.Duration `yaml:"timeout"`
MaxRetries int `yaml:"max_retries"`
}
func (*DeletionJobsConfig) RegisterFlags ¶ added in v3.6.0
func (c *DeletionJobsConfig) RegisterFlags(f *flag.FlagSet)
func (*DeletionJobsConfig) RegisterFlagsWithPrefix ¶ added in v3.6.0
func (c *DeletionJobsConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
type IndexCompactor ¶
type IndexCompactor interface {
// NewTableCompactor returns a new TableCompactor for compacting a table.
// commonIndexSet refers to common index files or in other words multi-tenant index.
// existingUserIndexSet refers to existing user specific index files in the storage.
// makeEmptyUserIndexSetFunc can be used for creating an empty indexSet for a user
// who does not have an index for it in existingUserIndexSet.
// periodConfig holds the PeriodConfig for the table.
NewTableCompactor(
ctx context.Context,
commonIndexSet IndexSet,
existingUserIndexSet map[string]IndexSet,
makeEmptyUserIndexSetFunc MakeEmptyUserIndexSetFunc,
periodConfig config.PeriodConfig,
) TableCompactor
// OpenCompactedIndexFile opens a compressed index file at given path.
OpenCompactedIndexFile(
ctx context.Context,
path,
tableName,
userID,
workingDir string,
periodConfig config.PeriodConfig,
logger log.Logger,
) (
CompactedIndex,
error,
)
}
type IndexFileConfig ¶
type IndexFileConfig struct {
CompressFile bool
}
type IndexRecords ¶
type IndexRecords struct {
Start, NumRecords int
}
type IndexSet ¶
type IndexSet interface {
GetTableName() string
ListSourceFiles() []storage.IndexFile
GetSourceFile(indexFile storage.IndexFile) (string, error)
GetLogger() log.Logger
GetWorkingDir() string
// SetCompactedIndex sets the CompactedIndex for upload/applying retention and making the compactor remove the source files.
// CompactedIndex can be nil only in case of all the source files in common index set being compacted away to per tenant index.
// It would return an error if the CompactedIndex is nil and removeSourceFiles is true in case of user index set since
// compaction should either create new files or can be a noop if there is nothing to compact.
// There is no need to call SetCompactedIndex if no changes were made to the index for this IndexSet.
SetCompactedIndex(compactedIndex CompactedIndex, removeSourceFiles bool) error
}
type IndexesConfig ¶
type IndexesConfig struct {
NumUnCompactedFiles, NumCompactedFiles int
}
func (IndexesConfig) String ¶
func (c IndexesConfig) String() string
type JobsConfig ¶ added in v3.6.0
type JobsConfig struct {
Deletion DeletionJobsConfig `yaml:"deletion"`
}
func (*JobsConfig) RegisterFlags ¶ added in v3.6.0
func (c *JobsConfig) RegisterFlags(f *flag.FlagSet)
func (*JobsConfig) RegisterFlagsWithPrefix ¶ added in v3.6.0
func (c *JobsConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
type PerUserIndexesConfig ¶
type PerUserIndexesConfig struct {
IndexesConfig
NumUsers int
}
func (PerUserIndexesConfig) String ¶
func (c PerUserIndexesConfig) String() string
type TableCompactor ¶
type TableCompactor interface {
// CompactTable compacts the table.
// After compaction is done successfully, it should set the new/updated CompactedIndex for relevant IndexSets.
CompactTable() (err error)
}
type TablesManager ¶ added in v3.6.0
Source Files
¶
Click to show internal directories.
Click to hide internal directories.