Documentation
¶
Index ¶
- Constants
- Variables
- func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDStats, ...)
- type ActiveQueriedSeries
- type ActiveQueriedSeriesService
- type ActiveSeries
- type Config
- type Ingester
- func (i *Ingester) AllUserStats(_ context.Context, _ *client.UserStatsRequest) (*client.UsersStatsResponse, error)
- func (i *Ingester) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) CheckReady(ctx context.Context) error
- func (i *Ingester) Flush()
- func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error)
- func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) (err error)
- func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error)
- func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) (err error)
- func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (result *client.MetricsForLabelMatchersResponse, err error)
- func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, ...) (err error)
- func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error)
- func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
- func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error
- func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error)
- func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error)
- func (i *Ingester) RenewTokenHandler(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request)
- func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error)
- type InstanceLimits
- type Limiter
- func (l *Limiter) AssertMaxMetadataPerMetric(userID string, metadata int) error
- func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int) error
- func (l *Limiter) AssertMaxNativeHistogramSeriesPerUser(userID string, series int) error
- func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, ...) error
- func (l *Limiter) AssertMaxSeriesPerMetric(userID string, series int) error
- func (l *Limiter) AssertMaxSeriesPerUser(userID string, series int) error
- func (l *Limiter) FormatError(userID string, err error, lbls labels.Labels) error
- type RingCount
- type Shipper
- type TSDBState
- type UserIDStats
- type UserStats
- type UserStatsByTimeseries
Constants ¶
const (
// RingKey is the key under which we store the ingesters ring in the KVStore.
RingKey = "ring"
)
Variables ¶
var UserStatsTmpl *template.Template
Functions ¶
func AllUserStatsRender ¶ added in v1.19.0
func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDStats, rf, queriedIngesterNum int)
AllUserStatsRender render data for all users or return in json format.
Types ¶
type ActiveQueriedSeries ¶
type ActiveQueriedSeries struct {
// contains filtered or unexported fields
}
ActiveQueriedSeries tracks unique queried series using time-windowed HyperLogLog. It maintains multiple HyperLogLog sketches in a circular buffer, one per time window. It can track up to the maximum configured window duration and query for specific window durations.
func NewActiveQueriedSeries ¶
func NewActiveQueriedSeries(windowsToQuery []time.Duration, windowDuration time.Duration, sampleRate float64, logger log.Logger) *ActiveQueriedSeries
NewActiveQueriedSeries creates a new ActiveQueriedSeries tracker.
func (*ActiveQueriedSeries) GetSeriesQueried ¶
func (a *ActiveQueriedSeries) GetSeriesQueried(now time.Time, queryWindow time.Duration) (uint64, error)
GetSeriesQueried returns the estimated cardinality of active queried series by merging all non-expired windows within the specified time range. If queryWindow is 0, it uses the full tracking period. This method uses caching to efficiently merge only new windows when possible.
func (*ActiveQueriedSeries) Purge ¶
func (a *ActiveQueriedSeries) Purge(now time.Time)
Purge rotates expired windows and clears them.
func (*ActiveQueriedSeries) SampleRequest ¶
func (a *ActiveQueriedSeries) SampleRequest() bool
SampleRequest returns whether this request should be sampled based on sampling. This should be called before collecting hashes to avoid unnecessary work. Uses the global rand source which is safe for concurrent use, avoiding the data race that occurs when multiple goroutines access a shared *rand.Rand.
func (*ActiveQueriedSeries) UpdateSeriesBatch ¶
func (a *ActiveQueriedSeries) UpdateSeriesBatch(hashes []uint64, now time.Time)
UpdateSeriesBatch adds multiple series hashes to the current active window in a single batch. This is more efficient than calling UpdateSeries multiple times as it: - Only acquires the lock once - Only rotates windows once Note: This method should be called from the centralized worker goroutines. Sampling should be checked before calling this method.
type ActiveQueriedSeriesService ¶
type ActiveQueriedSeriesService struct {
*services.BasicService
// contains filtered or unexported fields
}
ActiveQueriedSeriesService manages centralized worker goroutines for processing active queried series updates. It implements the services.Service interface to handle lifecycle management.
func NewActiveQueriedSeriesService ¶
func NewActiveQueriedSeriesService(logger log.Logger, registerer prometheus.Registerer) *ActiveQueriedSeriesService
NewActiveQueriedSeriesService creates a new ActiveQueriedSeriesService service.
func (*ActiveQueriedSeriesService) UpdateSeriesBatch ¶
func (m *ActiveQueriedSeriesService) UpdateSeriesBatch(activeQueriedSeries *ActiveQueriedSeries, hashes []uint64, now time.Time, userID string)
UpdateSeriesBatch sends an update to the update channel for processing. This method is non-blocking and will drop updates if the channel is full.
type ActiveSeries ¶ added in v1.5.0
type ActiveSeries struct {
// contains filtered or unexported fields
}
ActiveSeries is keeping track of recently active series for a single tenant.
func NewActiveSeries ¶ added in v1.5.0
func NewActiveSeries() *ActiveSeries
func (*ActiveSeries) Active ¶ added in v1.5.0
func (c *ActiveSeries) Active() int
func (*ActiveSeries) ActiveNativeHistogram ¶ added in v1.20.0
func (c *ActiveSeries) ActiveNativeHistogram() int
func (*ActiveSeries) Purge ¶ added in v1.5.0
func (c *ActiveSeries) Purge(keepUntil time.Time)
Purge removes expired entries from the cache. This function should be called periodically to avoid memory leaks.
func (*ActiveSeries) UpdateSeries ¶ added in v1.5.0
func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels)
Updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet.
type Config ¶
type Config struct {
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler"`
// Config for metadata purging.
MetadataRetainPeriod time.Duration `yaml:"metadata_retain_period"`
RateUpdatePeriod time.Duration `yaml:"rate_update_period"`
UserTSDBConfigsUpdatePeriod time.Duration `yaml:"user_tsdb_configs_update_period"`
ActiveSeriesMetricsEnabled bool `yaml:"active_series_metrics_enabled"`
ActiveSeriesMetricsUpdatePeriod time.Duration `yaml:"active_series_metrics_update_period"`
ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout"`
ActiveQueriedSeriesMetricsEnabled bool `yaml:"active_queried_series_metrics_enabled"`
ActiveQueriedSeriesMetricsUpdatePeriod time.Duration `yaml:"active_queried_series_metrics_update_period"`
ActiveQueriedSeriesMetricsWindowDuration time.Duration `yaml:"active_queried_series_metrics_window_duration"`
ActiveQueriedSeriesMetricsSampleRate float64 `yaml:"active_queried_series_metrics_sample_rate"`
ActiveQueriedSeriesMetricsWindows cortex_tsdb.DurationList `yaml:"active_queried_series_metrics_windows"`
// Use blocks storage.
BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"`
// UploadCompactedBlocksEnabled enables uploading compacted blocks.
UploadCompactedBlocksEnabled bool `yaml:"upload_compacted_blocks_enabled"`
// Injected at runtime and read from the distributor config, required
// to accurately apply global limits.
DistributorShardingStrategy string `yaml:"-"`
DistributorShardByAllLabels bool `yaml:"-"`
// Injected at runtime and read from querier config.
QueryIngestersWithin time.Duration `yaml:"-"`
DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`
IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names"`
// For admin contact details
AdminLimitMessage string `yaml:"admin_limit_message"`
LabelsStringInterningEnabled bool `yaml:"labels_string_interning_enabled"`
// DisableChunkTrimming allows to disable trimming of matching series chunks based on query Start and End time.
// When disabled, the result may contain samples outside the queried time range but Select() performances
// may be improved.
DisableChunkTrimming bool `yaml:"disable_chunk_trimming"`
// Maximum number of entries in the matchers cache. 0 to disable.
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
// If enabled, the metadata API returns all metadata regardless of the limits.
SkipMetadataLimits bool `yaml:"skip_metadata_limits"`
// When enabled, matchers with low selectivity are applied lazily during series scanning
// instead of being used for postings selection.
EnableMatcherOptimization bool `yaml:"enable_matcher_optimization"`
// Enable regex matcher limits and metrics collection for unoptimized regex queries.
// When enabled, the ingester will track pattern length, label cardinality, and total value length
// for unoptimized regex matchers, and enforce per-tenant limits if configured.
EnableRegexMatcherLimits bool `yaml:"enable_regex_matcher_limits"`
QueryProtection configs.QueryProtection `yaml:"query_protection"`
// contains filtered or unexported fields
}
Config for an Ingester.
func (*Config) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet
type Ingester ¶
type Ingester struct {
*services.BasicService
// Prometheus block storage
TSDBState TSDBState
// contains filtered or unexported fields
}
Ingester deals with "in flight" chunks. Based on Prometheus 1.x MemorySeriesStorage.
func New ¶
func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger, resourceMonitor *resource.Monitor) (*Ingester, error)
New returns a new Ingester that uses Cortex block storage instead of chunks storage.
func NewForFlusher ¶ added in v1.0.0
func NewForFlusher(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error)
NewForFlusher constructs a new Ingester to be used by flusher target. Compared to the 'New' method:
- Always replays the WAL.
- Does not start the lifecycler.
this is a special version of ingester used by Flusher. This ingester is not ingesting anything, its only purpose is to react on Flush method and flush all opened TSDBs when called.
func (*Ingester) AllUserStats ¶
func (i *Ingester) AllUserStats(_ context.Context, _ *client.UserStatsRequest) (*client.UsersStatsResponse, error)
AllUserStats returns ingestion statistics for all users known to this ingester.
func (*Ingester) AllUserStatsHandler ¶ added in v1.19.0
func (i *Ingester) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)
AllUserStatsHandler shows stats for all users.
func (*Ingester) CheckReady ¶ added in v0.7.0
CheckReady is the readiness handler used to indicate to k8s when the ingesters are ready for the addition or removal of another ingester.
func (*Ingester) Flush ¶
func (i *Ingester) Flush()
Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.
func (*Ingester) FlushHandler ¶
func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request)
FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.
func (*Ingester) LabelNames ¶
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error)
LabelNames return all the label names.
func (*Ingester) LabelNamesStream ¶ added in v1.13.0
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) (err error)
LabelNamesStream return all the label names.
func (*Ingester) LabelValues ¶
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error)
LabelValues returns all label values that are associated with a given label name.
func (*Ingester) LabelValuesStream ¶ added in v1.13.0
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) (err error)
LabelValuesStream returns all label values that are associated with a given label name.
func (*Ingester) MetricsForLabelMatchers ¶
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (result *client.MetricsForLabelMatchersResponse, err error)
MetricsForLabelMatchers returns all the metrics which match a set of matchers.
func (*Ingester) MetricsForLabelMatchersStream ¶ added in v1.13.0
func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) (err error)
func (*Ingester) MetricsMetadata ¶ added in v1.1.0
func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error)
MetricsMetadata returns all the metric metadata of a user.
func (*Ingester) ModeHandler ¶ added in v1.19.0
func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request)
ModeHandler Change mode of ingester.
func (*Ingester) Push ¶
func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
Push adds metrics to a block
func (*Ingester) PushStream ¶ added in v1.20.0
func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error
func (*Ingester) QueryExemplars ¶ added in v1.10.0
func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error)
QueryExemplars implements service.IngesterServer
func (*Ingester) QueryStream ¶
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error)
QueryStream implements service.IngesterServer Streams metrics from a TSDB. This implements the client.IngesterServer interface
func (*Ingester) RenewTokenHandler ¶ added in v1.18.0
func (i *Ingester) RenewTokenHandler(w http.ResponseWriter, r *http.Request)
func (*Ingester) ShutdownHandler ¶ added in v0.4.0
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request)
ShutdownHandler triggers the following set of operations in order:
- Change the state of ring to stop accepting writes.
- Flush all the chunks.
func (*Ingester) UserStats ¶
func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error)
UserStats returns ingestion statistics for the current user.
type InstanceLimits ¶ added in v1.9.0
type InstanceLimits struct {
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInMemoryTenants int64 `yaml:"max_tenants"`
MaxInMemorySeries int64 `yaml:"max_series"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
MaxInflightQueryRequests int64 `yaml:"max_inflight_query_requests"`
}
InstanceLimits describes limits used by ingester. Reaching any of these will result in error response to the call.
func (*InstanceLimits) RegisterFlagsWithPrefix ¶ added in v1.20.0
func (cfg *InstanceLimits) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)
func (*InstanceLimits) UnmarshalYAML ¶ added in v1.9.0
func (l *InstanceLimits) UnmarshalYAML(unmarshal func(any) error) error
UnmarshalYAML implements the yaml.Unmarshaler interface. If give
type Limiter ¶ added in v1.1.0
type Limiter struct {
AdminLimitMessage string
// contains filtered or unexported fields
}
Limiter implements primitives to get the maximum number of series an ingester can handle for a specific tenant
func NewLimiter ¶ added in v1.1.0
func NewLimiter( limits *validation.Overrides, ring RingCount, shardingStrategy string, shardByAllLabels bool, replicationFactor int, zoneAwarenessEnabled bool, AdminLimitMessage string, ) *Limiter
NewLimiter makes a new in-memory series limiter
func (*Limiter) AssertMaxMetadataPerMetric ¶ added in v1.1.0
AssertMaxMetadataPerMetric limit has not been reached compared to the current number of metadata per metric in input and returns an error if so.
func (*Limiter) AssertMaxMetricsWithMetadataPerUser ¶ added in v1.1.0
AssertMaxMetricsWithMetadataPerUser limit has not been reached compared to the current number of metrics with metadata in input and returns an error if so.
func (*Limiter) AssertMaxNativeHistogramSeriesPerUser ¶ added in v1.20.0
AssertMaxNativeHistogramSeriesPerUser limit has not been reached compared to the current number of native histogram series in input and returns an error if so.
func (*Limiter) AssertMaxSeriesPerLabelSet ¶ added in v1.18.0
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error
AssertMaxSeriesPerLabelSet limit has not been reached compared to the current number of metrics with metadata in input and returns an error if so.
func (*Limiter) AssertMaxSeriesPerMetric ¶ added in v1.1.0
AssertMaxSeriesPerMetric limit has not been reached compared to the current number of series in input and returns an error if so.
func (*Limiter) AssertMaxSeriesPerUser ¶ added in v1.1.0
AssertMaxSeriesPerUser limit has not been reached compared to the current number of series in input and returns an error if so.
type RingCount ¶ added in v0.4.0
RingCount is the interface exposed by a ring implementation which allows to count members
type TSDBState ¶ added in v0.4.0
type TSDBState struct {
// contains filtered or unexported fields
}
TSDBState holds data structures used by the TSDB storage engine
type UserIDStats ¶ added in v1.19.0
UserIDStats models ingestion statistics for one user, including the user ID
type UserStats ¶ added in v1.19.0
type UserStats struct {
IngestionRate float64 `json:"ingestionRate"`
NumSeries uint64 `json:"numSeries"`
APIIngestionRate float64 `json:"APIIngestionRate"`
RuleIngestionRate float64 `json:"RuleIngestionRate"`
ActiveSeries uint64 `json:"activeSeries"`
LoadedBlocks uint64 `json:"loadedBlocks"`
QueriedIngesters uint64 `json:"queriedIngesters"`
}
UserStats models ingestion statistics for one user.
type UserStatsByTimeseries ¶ added in v1.19.0
type UserStatsByTimeseries []UserIDStats
func (UserStatsByTimeseries) Len ¶ added in v1.19.0
func (s UserStatsByTimeseries) Len() int
func (UserStatsByTimeseries) Less ¶ added in v1.19.0
func (s UserStatsByTimeseries) Less(i, j int) bool
func (UserStatsByTimeseries) Swap ¶ added in v1.19.0
func (s UserStatsByTimeseries) Swap(i, j int)