storegateway

package
v0.30.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2022 License: Apache-2.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RingKey is the key under which we store the store gateways ring in the KVStore.
	RingKey = "store-gateway"

	// RingNameForServer is the name of the ring used by the store gateway server.
	RingNameForServer = "store-gateway"

	// RingNameForClient is the name of the ring used by the store gateway client (we need
	// a different name to avoid clashing Prometheus metrics when running in single-binary).
	RingNameForClient = "store-gateway-client"

	// We use a safe default instead of exposing to config option to the user
	// in order to simplify the config.
	RingNumTokens = 512
)

Variables

View Source
var (
	// BlocksOwnerSync is the operation used to check the authoritative owners of a block
	// (replicas included).
	BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, func(s ring.InstanceState) bool {

		return s == ring.LEAVING
	})

	// BlocksOwnerRead is the operation used to check the authoritative owners of a block
	// (replicas included) that are available for queries (a store-gateway is available for
	// queries only when ACTIVE).
	BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

	// BlocksRead is the operation run by the querier to query blocks via the store-gateway.
	BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {

		return s != ring.ACTIVE
	})
)

Functions

func GetShuffleShardingSubring

func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing

GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.

func NewShardingMetadataFilterAdapter

func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter

Types

type BucketIndexMetadataFetcher

type BucketIndexMetadataFetcher struct {
	// contains filtered or unexported fields
}

BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Cortex bucket index.

func NewBucketIndexMetadataFetcher

func NewBucketIndexMetadataFetcher(
	userID string,
	bkt objstore.Bucket,
	strategy ShardingStrategy,
	cfgProvider bucket.TenantConfigProvider,
	logger log.Logger,
	reg prometheus.Registerer,
	filters []block.MetadataFilter,
) *BucketIndexMetadataFetcher

func (*BucketIndexMetadataFetcher) Fetch

func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)

Fetch implements block.MetadataFetcher. Not goroutine-safe.

func (*BucketIndexMetadataFetcher) UpdateOnChange

func (f *BucketIndexMetadataFetcher) UpdateOnChange(callback func([]metadata.Meta, error))

type BucketStoreMetrics

type BucketStoreMetrics struct {
	// contains filtered or unexported fields
}

BucketStoreMetrics aggregates metrics exported by Thanos Bucket Store and re-exports those aggregates as Cortex metrics.

func NewBucketStoreMetrics

func NewBucketStoreMetrics() *BucketStoreMetrics

func (*BucketStoreMetrics) AddUserRegistry

func (m *BucketStoreMetrics) AddUserRegistry(user string, reg *prometheus.Registry)

func (*BucketStoreMetrics) Collect

func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric)

func (*BucketStoreMetrics) Describe

func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc)

func (*BucketStoreMetrics) RemoveUserRegistry

func (m *BucketStoreMetrics) RemoveUserRegistry(user string)

type BucketStores

type BucketStores struct {
	// contains filtered or unexported fields
}

BucketStores is a multi-tenant wrapper of Thanos BucketStore.

func NewBucketStores

func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)

NewBucketStores makes a new BucketStores.

func (*BucketStores) InitialSync

func (u *BucketStores) InitialSync(ctx context.Context) error

InitialSync does an initial synchronization of blocks for all users.

func (*BucketStores) LabelNames

LabelNames implements the Storegateway proto service.

func (*BucketStores) LabelValues

LabelValues implements the Storegateway proto service.

func (*BucketStores) Series

Series makes a series request to the underlying user bucket store.

func (*BucketStores) SyncBlocks

func (u *BucketStores) SyncBlocks(ctx context.Context) error

SyncBlocks synchronizes the stores state with the Bucket store for every user.

type Config

type Config struct {
	ShardingEnabled  bool       `yaml:"sharding_enabled"`
	ShardingRing     RingConfig `` /* 127-byte string literal not displayed */
	ShardingStrategy string     `yaml:"sharding_strategy"`
}

Config holds the store gateway config.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the Config flags.

func (*Config) Validate

func (cfg *Config) Validate(limits validation.Limits) error

Validate the Config.

type DefaultShardingStrategy

type DefaultShardingStrategy struct {
	// contains filtered or unexported fields
}

DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways. Not go-routine safe.

func NewDefaultShardingStrategy

func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger) *DefaultShardingStrategy

NewDefaultShardingStrategy creates DefaultShardingStrategy.

func (*DefaultShardingStrategy) FilterBlocks

func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error

FilterBlocks implements ShardingStrategy.

func (*DefaultShardingStrategy) FilterUsers

func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string

FilterUsers implements ShardingStrategy.

type IgnoreDeletionMarkFilter

type IgnoreDeletionMarkFilter struct {
	// contains filtered or unexported fields
}

IgnoreDeletionMarkFilter is like the Thanos IgnoreDeletionMarkFilter, but it also implements the MetadataFilterWithBucketIndex interface.

func NewIgnoreDeletionMarkFilter

func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter

NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.

func (*IgnoreDeletionMarkFilter) DeletionMarkBlocks

func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark

DeletionMarkBlocks returns blocks that were marked for deletion.

func (*IgnoreDeletionMarkFilter) Filter

func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error

Filter implements block.MetadataFilter.

func (*IgnoreDeletionMarkFilter) FilterWithBucketIndex

func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*metadata.Meta, idx *bucketindex.Index, synced block.GaugeVec) error

FilterWithBucketIndex implements MetadataFilterWithBucketIndex.

type MetadataFetcherMetrics

type MetadataFetcherMetrics struct {
	// contains filtered or unexported fields
}

This struct aggregates metrics exported by Thanos MetaFetcher and re-exports those aggregates as Cortex metrics.

func NewMetadataFetcherMetrics

func NewMetadataFetcherMetrics() *MetadataFetcherMetrics

func (*MetadataFetcherMetrics) AddUserRegistry

func (m *MetadataFetcherMetrics) AddUserRegistry(user string, reg *prometheus.Registry)

func (*MetadataFetcherMetrics) Collect

func (m *MetadataFetcherMetrics) Collect(out chan<- prometheus.Metric)

func (*MetadataFetcherMetrics) Describe

func (m *MetadataFetcherMetrics) Describe(out chan<- *prometheus.Desc)

func (*MetadataFetcherMetrics) RemoveUserRegistry

func (m *MetadataFetcherMetrics) RemoveUserRegistry(user string)

type MetadataFilterWithBucketIndex

type MetadataFilterWithBucketIndex interface {
	// FilterWithBucketIndex is like Thanos MetadataFilter.Filter() but it provides in input the bucket index too.
	FilterWithBucketIndex(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, idx *bucketindex.Index, synced block.GaugeVec) error
}

type NoShardingStrategy

type NoShardingStrategy struct{}

NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out.

func NewNoShardingStrategy

func NewNoShardingStrategy() *NoShardingStrategy

func (*NoShardingStrategy) FilterBlocks

func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]struct{}, _ block.GaugeVec) error

func (*NoShardingStrategy) FilterUsers

func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string

type ReplicaLabelRemover

type ReplicaLabelRemover struct {
	// contains filtered or unexported fields
}

ReplicaLabelRemover is a BaseFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.

func NewReplicaLabelRemover

func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaLabelRemover

NewReplicaLabelRemover creates a ReplicaLabelRemover.

func (*ReplicaLabelRemover) Filter

Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.

type RingConfig

type RingConfig struct {
	KVStore              kv.Config     `` /* 206-byte string literal not displayed */
	HeartbeatPeriod      time.Duration `yaml:"heartbeat_period"`
	HeartbeatTimeout     time.Duration `yaml:"heartbeat_timeout"`
	ReplicationFactor    int           `yaml:"replication_factor"`
	TokensFilePath       string        `yaml:"tokens_file_path"`
	ZoneAwarenessEnabled bool          `yaml:"zone_awareness_enabled"`

	// Wait ring stability.
	WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"`
	WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"hidden"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names"`
	InstancePort           int      `yaml:"instance_port" doc:"hidden"`
	InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`
	InstanceZone           string   `yaml:"instance_availability_zone"`

	// Injected internally
	ListenPort      int           `yaml:"-"`
	RingCheckPeriod time.Duration `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the store gateways ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig

func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

func (*RingConfig) ToRingConfig

func (cfg *RingConfig) ToRingConfig() ring.Config

type ShardingLimits

type ShardingLimits interface {
	StoreGatewayTenantShardSize(userID string) int
}

ShardingLimits is the interface that should be implemented by the limits provider, limiting the scope of the limits to the ones required by sharding strategies.

type ShardingStrategy

type ShardingStrategy interface {
	// FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs
	// that should be synced by the store-gateway.
	FilterUsers(ctx context.Context, userIDs []string) []string

	// FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway.
	// The provided loaded map contains blocks which have been previously returned by this function and
	// are now loaded or loading in the store-gateway.
	FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error
}

type ShuffleShardingStrategy

type ShuffleShardingStrategy struct {
	// contains filtered or unexported fields
}

ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.

func NewShuffleShardingStrategy

func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy

NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.

func (*ShuffleShardingStrategy) FilterBlocks

func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error

FilterBlocks implements ShardingStrategy.

func (*ShuffleShardingStrategy) FilterUsers

func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string

FilterUsers implements ShardingStrategy.

type StoreGateway

type StoreGateway struct {
	services.Service
	// contains filtered or unexported fields
}

StoreGateway is the Cortex service responsible to expose an API over the bucket where blocks are stored, supporting blocks sharding and replication across a pool of store gateway instances (optional).

func NewStoreGateway

func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error)

func (*StoreGateway) LabelNames

LabelNames implements the Storegateway proto service.

func (*StoreGateway) LabelValues

LabelValues implements the Storegateway proto service.

func (*StoreGateway) OnRingInstanceHeartbeat

func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)

func (*StoreGateway) OnRingInstanceRegister

func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens)

func (*StoreGateway) OnRingInstanceStopping

func (g *StoreGateway) OnRingInstanceStopping(_ *ring.BasicLifecycler)

func (*StoreGateway) OnRingInstanceTokens

func (g *StoreGateway) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)

func (*StoreGateway) RingHandler

func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)

func (*StoreGateway) Series

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL