ingester

package
v1.21.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: Apache-2.0 Imports: 72 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// RingKey is the key under which we store the ingesters ring in the KVStore.
	RingKey = "ring"
)

Variables

View Source
var UserStatsTmpl *template.Template

Functions

func AllUserStatsRender added in v1.19.0

func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDStats, rf, queriedIngesterNum int)

AllUserStatsRender render data for all users or return in json format.

Types

type ActiveQueriedSeries

type ActiveQueriedSeries struct {
	// contains filtered or unexported fields
}

ActiveQueriedSeries tracks unique queried series using time-windowed HyperLogLog. It maintains multiple HyperLogLog sketches in a circular buffer, one per time window. It can track up to the maximum configured window duration and query for specific window durations.

func NewActiveQueriedSeries

func NewActiveQueriedSeries(windowsToQuery []time.Duration, windowDuration time.Duration, sampleRate float64, logger log.Logger) *ActiveQueriedSeries

NewActiveQueriedSeries creates a new ActiveQueriedSeries tracker.

func (*ActiveQueriedSeries) GetSeriesQueried

func (a *ActiveQueriedSeries) GetSeriesQueried(now time.Time, queryWindow time.Duration) (uint64, error)

GetSeriesQueried returns the estimated cardinality of active queried series by merging all non-expired windows within the specified time range. If queryWindow is 0, it uses the full tracking period. This method uses caching to efficiently merge only new windows when possible.

func (*ActiveQueriedSeries) Purge

func (a *ActiveQueriedSeries) Purge(now time.Time)

Purge rotates expired windows and clears them.

func (*ActiveQueriedSeries) SampleRequest

func (a *ActiveQueriedSeries) SampleRequest() bool

SampleRequest returns whether this request should be sampled based on sampling. This should be called before collecting hashes to avoid unnecessary work. Uses the global rand source which is safe for concurrent use, avoiding the data race that occurs when multiple goroutines access a shared *rand.Rand.

func (*ActiveQueriedSeries) UpdateSeriesBatch

func (a *ActiveQueriedSeries) UpdateSeriesBatch(hashes []uint64, now time.Time)

UpdateSeriesBatch adds multiple series hashes to the current active window in a single batch. This is more efficient than calling UpdateSeries multiple times as it: - Only acquires the lock once - Only rotates windows once Note: This method should be called from the centralized worker goroutines. Sampling should be checked before calling this method.

type ActiveQueriedSeriesService

type ActiveQueriedSeriesService struct {
	*services.BasicService
	// contains filtered or unexported fields
}

ActiveQueriedSeriesService manages centralized worker goroutines for processing active queried series updates. It implements the services.Service interface to handle lifecycle management.

func NewActiveQueriedSeriesService

func NewActiveQueriedSeriesService(logger log.Logger, registerer prometheus.Registerer) *ActiveQueriedSeriesService

NewActiveQueriedSeriesService creates a new ActiveQueriedSeriesService service.

func (*ActiveQueriedSeriesService) UpdateSeriesBatch

func (m *ActiveQueriedSeriesService) UpdateSeriesBatch(activeQueriedSeries *ActiveQueriedSeries, hashes []uint64, now time.Time, userID string)

UpdateSeriesBatch sends an update to the update channel for processing. This method is non-blocking and will drop updates if the channel is full.

type ActiveSeries added in v1.5.0

type ActiveSeries struct {
	// contains filtered or unexported fields
}

ActiveSeries is keeping track of recently active series for a single tenant.

func NewActiveSeries added in v1.5.0

func NewActiveSeries() *ActiveSeries

func (*ActiveSeries) Active added in v1.5.0

func (c *ActiveSeries) Active() int

func (*ActiveSeries) ActiveNativeHistogram added in v1.20.0

func (c *ActiveSeries) ActiveNativeHistogram() int

func (*ActiveSeries) Purge added in v1.5.0

func (c *ActiveSeries) Purge(keepUntil time.Time)

Purge removes expired entries from the cache. This function should be called periodically to avoid memory leaks.

func (*ActiveSeries) UpdateSeries added in v1.5.0

func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels)

Updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet.

type Config

type Config struct {
	LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler"`

	// Config for metadata purging.
	MetadataRetainPeriod time.Duration `yaml:"metadata_retain_period"`

	RateUpdatePeriod            time.Duration `yaml:"rate_update_period"`
	UserTSDBConfigsUpdatePeriod time.Duration `yaml:"user_tsdb_configs_update_period"`

	ActiveSeriesMetricsEnabled      bool          `yaml:"active_series_metrics_enabled"`
	ActiveSeriesMetricsUpdatePeriod time.Duration `yaml:"active_series_metrics_update_period"`
	ActiveSeriesMetricsIdleTimeout  time.Duration `yaml:"active_series_metrics_idle_timeout"`

	ActiveQueriedSeriesMetricsEnabled        bool                     `yaml:"active_queried_series_metrics_enabled"`
	ActiveQueriedSeriesMetricsUpdatePeriod   time.Duration            `yaml:"active_queried_series_metrics_update_period"`
	ActiveQueriedSeriesMetricsWindowDuration time.Duration            `yaml:"active_queried_series_metrics_window_duration"`
	ActiveQueriedSeriesMetricsSampleRate     float64                  `yaml:"active_queried_series_metrics_sample_rate"`
	ActiveQueriedSeriesMetricsWindows        cortex_tsdb.DurationList `yaml:"active_queried_series_metrics_windows"`

	// Use blocks storage.
	BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"`

	// UploadCompactedBlocksEnabled enables uploading compacted blocks.
	UploadCompactedBlocksEnabled bool `yaml:"upload_compacted_blocks_enabled"`

	// Injected at runtime and read from the distributor config, required
	// to accurately apply global limits.
	DistributorShardingStrategy string `yaml:"-"`
	DistributorShardByAllLabels bool   `yaml:"-"`

	// Injected at runtime and read from querier config.
	QueryIngestersWithin time.Duration `yaml:"-"`

	DefaultLimits    InstanceLimits         `yaml:"instance_limits"`
	InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

	IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names"`

	// For admin contact details
	AdminLimitMessage string `yaml:"admin_limit_message"`

	LabelsStringInterningEnabled bool `yaml:"labels_string_interning_enabled"`

	// DisableChunkTrimming allows to disable trimming of matching series chunks based on query Start and End time.
	// When disabled, the result may contain samples outside the queried time range but Select() performances
	// may be improved.
	DisableChunkTrimming bool `yaml:"disable_chunk_trimming"`

	// Maximum number of entries in the matchers cache. 0 to disable.
	MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`

	// If enabled, the metadata API returns all metadata regardless of the limits.
	SkipMetadataLimits bool `yaml:"skip_metadata_limits"`

	// When enabled, matchers with low selectivity are applied lazily during series scanning
	// instead of being used for postings selection.
	EnableMatcherOptimization bool `yaml:"enable_matcher_optimization"`

	// Enable regex matcher limits and metrics collection for unoptimized regex queries.
	// When enabled, the ingester will track pattern length, label cardinality, and total value length
	// for unoptimized regex matchers, and enforce per-tenant limits if configured.
	EnableRegexMatcherLimits bool `yaml:"enable_regex_matcher_limits"`

	QueryProtection configs.QueryProtection `yaml:"query_protection"`
	// contains filtered or unexported fields
}

Config for an Ingester.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*Config) Validate added in v1.17.0

func (cfg *Config) Validate(monitoredResources flagext.StringSliceCSV) error

type Ingester

type Ingester struct {
	*services.BasicService

	// Prometheus block storage
	TSDBState TSDBState
	// contains filtered or unexported fields
}

Ingester deals with "in flight" chunks. Based on Prometheus 1.x MemorySeriesStorage.

func New

func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger, resourceMonitor *resource.Monitor) (*Ingester, error)

New returns a new Ingester that uses Cortex block storage instead of chunks storage.

func NewForFlusher added in v1.0.0

func NewForFlusher(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error)

NewForFlusher constructs a new Ingester to be used by flusher target. Compared to the 'New' method:

  • Always replays the WAL.
  • Does not start the lifecycler.

this is a special version of ingester used by Flusher. This ingester is not ingesting anything, its only purpose is to react on Flush method and flush all opened TSDBs when called.

func (*Ingester) AllUserStats

AllUserStats returns ingestion statistics for all users known to this ingester.

func (*Ingester) AllUserStatsHandler added in v1.19.0

func (i *Ingester) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)

AllUserStatsHandler shows stats for all users.

func (*Ingester) CheckReady added in v0.7.0

func (i *Ingester) CheckReady(ctx context.Context) error

CheckReady is the readiness handler used to indicate to k8s when the ingesters are ready for the addition or removal of another ingester.

func (*Ingester) Flush

func (i *Ingester) Flush()

Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.

func (*Ingester) FlushHandler

func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request)

FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.

func (*Ingester) LabelNames

func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error)

LabelNames return all the label names.

func (*Ingester) LabelNamesStream added in v1.13.0

func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) (err error)

LabelNamesStream return all the label names.

func (*Ingester) LabelValues

func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error)

LabelValues returns all label values that are associated with a given label name.

func (*Ingester) LabelValuesStream added in v1.13.0

func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) (err error)

LabelValuesStream returns all label values that are associated with a given label name.

func (*Ingester) MetricsForLabelMatchers

func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (result *client.MetricsForLabelMatchersResponse, err error)

MetricsForLabelMatchers returns all the metrics which match a set of matchers.

func (*Ingester) MetricsForLabelMatchersStream added in v1.13.0

func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) (err error)

func (*Ingester) MetricsMetadata added in v1.1.0

func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error)

MetricsMetadata returns all the metric metadata of a user.

func (*Ingester) ModeHandler added in v1.19.0

func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request)

ModeHandler Change mode of ingester.

func (*Ingester) Push

Push adds metrics to a block

func (*Ingester) PushStream added in v1.20.0

func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error

func (*Ingester) QueryExemplars added in v1.10.0

func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error)

QueryExemplars implements service.IngesterServer

func (*Ingester) QueryStream

func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error)

QueryStream implements service.IngesterServer Streams metrics from a TSDB. This implements the client.IngesterServer interface

func (*Ingester) RenewTokenHandler added in v1.18.0

func (i *Ingester) RenewTokenHandler(w http.ResponseWriter, r *http.Request)

func (*Ingester) ShutdownHandler added in v0.4.0

func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request)

ShutdownHandler triggers the following set of operations in order:

  • Change the state of ring to stop accepting writes.
  • Flush all the chunks.

func (*Ingester) UserStats

UserStats returns ingestion statistics for the current user.

type InstanceLimits added in v1.9.0

type InstanceLimits struct {
	MaxIngestionRate         float64 `yaml:"max_ingestion_rate"`
	MaxInMemoryTenants       int64   `yaml:"max_tenants"`
	MaxInMemorySeries        int64   `yaml:"max_series"`
	MaxInflightPushRequests  int64   `yaml:"max_inflight_push_requests"`
	MaxInflightQueryRequests int64   `yaml:"max_inflight_query_requests"`
}

InstanceLimits describes limits used by ingester. Reaching any of these will result in error response to the call.

func (*InstanceLimits) RegisterFlagsWithPrefix added in v1.20.0

func (cfg *InstanceLimits) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)

func (*InstanceLimits) UnmarshalYAML added in v1.9.0

func (l *InstanceLimits) UnmarshalYAML(unmarshal func(any) error) error

UnmarshalYAML implements the yaml.Unmarshaler interface. If give

type Limiter added in v1.1.0

type Limiter struct {
	AdminLimitMessage string
	// contains filtered or unexported fields
}

Limiter implements primitives to get the maximum number of series an ingester can handle for a specific tenant

func NewLimiter added in v1.1.0

func NewLimiter(
	limits *validation.Overrides,
	ring RingCount,
	shardingStrategy string,
	shardByAllLabels bool,
	replicationFactor int,
	zoneAwarenessEnabled bool,
	AdminLimitMessage string,
) *Limiter

NewLimiter makes a new in-memory series limiter

func (*Limiter) AssertMaxMetadataPerMetric added in v1.1.0

func (l *Limiter) AssertMaxMetadataPerMetric(userID string, metadata int) error

AssertMaxMetadataPerMetric limit has not been reached compared to the current number of metadata per metric in input and returns an error if so.

func (*Limiter) AssertMaxMetricsWithMetadataPerUser added in v1.1.0

func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int) error

AssertMaxMetricsWithMetadataPerUser limit has not been reached compared to the current number of metrics with metadata in input and returns an error if so.

func (*Limiter) AssertMaxNativeHistogramSeriesPerUser added in v1.20.0

func (l *Limiter) AssertMaxNativeHistogramSeriesPerUser(userID string, series int) error

AssertMaxNativeHistogramSeriesPerUser limit has not been reached compared to the current number of native histogram series in input and returns an error if so.

func (*Limiter) AssertMaxSeriesPerLabelSet added in v1.18.0

func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error

AssertMaxSeriesPerLabelSet limit has not been reached compared to the current number of metrics with metadata in input and returns an error if so.

func (*Limiter) AssertMaxSeriesPerMetric added in v1.1.0

func (l *Limiter) AssertMaxSeriesPerMetric(userID string, series int) error

AssertMaxSeriesPerMetric limit has not been reached compared to the current number of series in input and returns an error if so.

func (*Limiter) AssertMaxSeriesPerUser added in v1.1.0

func (l *Limiter) AssertMaxSeriesPerUser(userID string, series int) error

AssertMaxSeriesPerUser limit has not been reached compared to the current number of series in input and returns an error if so.

func (*Limiter) FormatError added in v1.9.0

func (l *Limiter) FormatError(userID string, err error, lbls labels.Labels) error

FormatError returns the input error enriched with the actual limits for the given user. It acts as pass-through if the input error is unknown.

type RingCount added in v0.4.0

type RingCount interface {
	HealthyInstancesCount() int
	ZonesCount() int
}

RingCount is the interface exposed by a ring implementation which allows to count members

type Shipper added in v0.7.0

type Shipper interface {
	Sync(ctx context.Context) (uploaded int, err error)
}

Shipper interface is used to have an easy way to mock it in tests.

type TSDBState added in v0.4.0

type TSDBState struct {
	// contains filtered or unexported fields
}

TSDBState holds data structures used by the TSDB storage engine

type UserIDStats added in v1.19.0

type UserIDStats struct {
	UserID string `json:"userID"`
	UserStats
}

UserIDStats models ingestion statistics for one user, including the user ID

type UserStats added in v1.19.0

type UserStats struct {
	IngestionRate     float64 `json:"ingestionRate"`
	NumSeries         uint64  `json:"numSeries"`
	APIIngestionRate  float64 `json:"APIIngestionRate"`
	RuleIngestionRate float64 `json:"RuleIngestionRate"`
	ActiveSeries      uint64  `json:"activeSeries"`
	LoadedBlocks      uint64  `json:"loadedBlocks"`
	QueriedIngesters  uint64  `json:"queriedIngesters"`
}

UserStats models ingestion statistics for one user.

type UserStatsByTimeseries added in v1.19.0

type UserStatsByTimeseries []UserIDStats

func (UserStatsByTimeseries) Len added in v1.19.0

func (s UserStatsByTimeseries) Len() int

func (UserStatsByTimeseries) Less added in v1.19.0

func (s UserStatsByTimeseries) Less(i, j int) bool

func (UserStatsByTimeseries) Swap added in v1.19.0

func (s UserStatsByTimeseries) Swap(i, j int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL