Documentation
¶
Index ¶
- func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, ...) *rateStore
- type Config
- type DataObjTee
- type DataObjTeeConfig
- type Distributor
- func (d *Distributor) HealthyInstancesCount() int
- func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request)
- func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
- func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)
- func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRequest, ...) (*logproto.PushResponse, error)
- func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)
- type FieldDetector
- type KafkaProducer
- type KeyedStream
- type Limits
- type PushTracker
- type RateBatcherConfig
- type RateStore
- type RateStoreConfig
- type ReadLifecycler
- type RingConfig
- type ShardTracker
- type Tee
- type TeeErrorCodes
- type Validator
- func (v Validator) IsAggregatedMetricStream(ls labels.Labels) bool
- func (v Validator) IsInternalStream(ls labels.Labels) bool
- func (v Validator) IsPatternStream(ls labels.Labels) bool
- func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error)
- func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, ...) error
- func (v Validator) ValidateLabels(vCtx validationContext, ls labels.Labels, stream logproto.Stream, ...) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRateStore ¶
func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, registerer prometheus.Registerer) *rateStore
Types ¶
type Config ¶
type Config struct {
// Distributors ring
DistributorRing RingConfig `yaml:"ring,omitempty"`
PushWorkerCount int `yaml:"push_worker_count"`
// Request parser
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxDecompressedSize int64 `yaml:"max_decompressed_size"`
// RateStore customizes the rate storing used by stream sharding.
RateStore RateStoreConfig `yaml:"rate_store"`
// WriteFailuresLoggingCfg customizes write failures logging behavior.
WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Customize the logging of write failures."`
OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`
// DefaultPolicyStreamMappings contains the default policy stream mappings that are merged with per-tenant mappings.
DefaultPolicyStreamMappings validation.PolicyStreamMapping `` /* 128-byte string literal not displayed */
KafkaEnabled bool `yaml:"kafka_writes_enabled"`
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
IngestLimitsEnabled bool `yaml:"ingest_limits_enabled"`
IngestLimitsDryRunEnabled bool `yaml:"ingest_limits_dry_run_enabled"`
KafkaConfig kafka.Config `yaml:"-"`
DataObjTeeConfig DataObjTeeConfig `yaml:"dataobj_tee"`
// contains filtered or unexported fields
}
Config for a Distributor.
func (*Config) RegisterFlags ¶
RegisterFlags registers distributor-related flags.
type DataObjTee ¶ added in v3.7.0
type DataObjTee struct {
// contains filtered or unexported fields
}
DataObjTee is a tee that duplicates streams to the data object topic. It is a temporary solution while we work on segmentation keys.
func NewDataObjTee ¶ added in v3.7.0
func NewDataObjTee( cfg *DataObjTeeConfig, resolver *segmentationPartitionResolver, limitsClient *ingestLimits, limits Limits, kafkaClient *kgo.Client, logger log.Logger, r prometheus.Registerer, ) (*DataObjTee, error)
NewDataObjTee returns a new DataObjTee.
func (*DataObjTee) Duplicate ¶ added in v3.7.0
func (t *DataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream, pushTracker *PushTracker)
Duplicate implements the Tee interface.
func (*DataObjTee) RateBatcher ¶ added in v3.7.0
func (t *DataObjTee) RateBatcher() services.Service
RateBatcher returns the rate batcher service if batching is enabled, nil otherwise. This is used to add the batcher to the distributor's subservices for lifecycle management.
func (*DataObjTee) Register ¶ added in v3.7.0
func (t *DataObjTee) Register(_ context.Context, _ string, streams []KeyedStream, pushTracker *PushTracker)
type DataObjTeeConfig ¶ added in v3.7.0
type DataObjTeeConfig struct {
Enabled bool `yaml:"enabled"`
Topic string `yaml:"topic"`
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
PerPartitionRateBytes int `yaml:"per_partition_rate_bytes"`
DebugMetricsEnabled bool `yaml:"debug_metrics_enabled"`
RateBatchWindow time.Duration `yaml:"rate_batch_window"`
}
func (*DataObjTeeConfig) RegisterFlags ¶ added in v3.7.0
func (c *DataObjTeeConfig) RegisterFlags(f *flag.FlagSet)
func (*DataObjTeeConfig) Validate ¶ added in v3.7.0
func (c *DataObjTeeConfig) Validate() error
type Distributor ¶
type Distributor struct {
services.Service
RequestParserWrapper push.RequestParserWrapper
// contains filtered or unexported fields
}
Distributor coordinates replicates and distribution of log streams.
func New ¶
func New( cfg Config, ingesterCfg ingester.Config, clientCfg ingester_client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, partitionRing ring.PartitionRingReader, overrides Limits, registerer prometheus.Registerer, metricsNamespace string, tee Tee, usageTracker push.UsageTracker, limitsFrontendCfg limits_frontend_client.Config, limitsFrontendRing ring.ReadRing, numMetadataPartitions int, dataObjConsumerPartitionRing ring.PartitionRingReader, logger log.Logger, ) (*Distributor, error)
New a distributor creates.
func (*Distributor) HealthyInstancesCount ¶
func (d *Distributor) HealthyInstancesCount() int
HealthyInstancesCount implements the ReadLifecycler interface.
We use a ring lifecycler delegate to count the number of members of the ring. The count is then used to enforce rate limiting correctly for each distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES.
func (*Distributor) OTLPPushHandler ¶
func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request)
func (*Distributor) Push ¶
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
func (*Distributor) PushHandler ¶
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)
PushHandler reads a snappy-compressed proto from the HTTP body.
func (*Distributor) PushWithResolver ¶ added in v3.5.0
func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRequest, streamResolver *requestScopedStreamResolver, format string) (*logproto.PushResponse, error)
Push a set of streams. The returned error is the last one seen.
func (*Distributor) ServeHTTP ¶
func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements the distributor ring status page.
If the rate limiting strategy is local instead of global, no ring is used by the distributor and as such, no ring status is returned from this function.
type FieldDetector ¶ added in v3.4.0
type FieldDetector struct {
// contains filtered or unexported fields
}
type KafkaProducer ¶ added in v3.3.0
type KeyedStream ¶
type Limits ¶
type Limits interface {
retention.Limits
MaxLineSize(userID string) int
MaxLineSizeTruncate(userID string) bool
MaxLineSizeTruncateIdentifier(userID string) string
MaxLabelNamesPerSeries(userID string) int
MaxLabelNameLength(userID string) int
MaxLabelValueLength(userID string) int
CreationGracePeriod(userID string) time.Duration
RejectOldSamples(userID string) bool
RejectOldSamplesMaxAge(userID string) time.Duration
IncrementDuplicateTimestamps(userID string) bool
DiscoverServiceName(userID string) []string
DiscoverGenericFields(userID string) map[string][]string
DiscoverLogLevels(userID string) bool
LogLevelFields(userID string) []string
LogLevelFromJSONMaxDepth(userID string) int
ShardStreams(userID string) shardstreams.Config
IngestionRateStrategy() string
IngestionRateBytes(userID string) float64
IngestionBurstSizeBytes(userID string) int
AllowStructuredMetadata(userID string) bool
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig
BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
BlockIngestionPolicyUntil(userID string, policy string) time.Time
EnforcedLabels(userID string) []string
PolicyEnforcedLabels(userID string, policy string) []string
IngestionPartitionsTenantShardSize(userID string) int
SimulatedPushLatency(userID string) time.Duration
}
Limits is an interface for distributor limits/related configs
type PushTracker ¶ added in v3.7.0
type PushTracker struct {
// contains filtered or unexported fields
}
TODO taken from Cortex, see if we can refactor out an usable interface.
type RateBatcherConfig ¶ added in v3.7.0
type RateBatcherConfig struct {
// BatchWindow is the duration to accumulate rate updates before flushing.
BatchWindow time.Duration
}
RateBatcherConfig contains the configuration for the RateBatcher.
type RateStore ¶
RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
type RateStoreConfig ¶
type RateStoreConfig struct {
MaxParallelism int `yaml:"max_request_parallelism"`
StreamRateUpdateInterval time.Duration `yaml:"stream_rate_update_interval"`
IngesterReqTimeout time.Duration `yaml:"ingester_request_timeout"`
Debug bool `yaml:"debug"`
}
func (*RateStoreConfig) RegisterFlagsWithPrefix ¶
func (cfg *RateStoreConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet)
type ReadLifecycler ¶
type ReadLifecycler interface {
HealthyInstancesCount() int
}
ReadLifecycler represents the read interface to the lifecycler.
type RingConfig ¶
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
// Instance details
InstanceID string `yaml:"instance_id" doc:"hidden"`
InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
InstancePort int `yaml:"instance_port" doc:"hidden"`
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`
EnableIPv6 bool `yaml:"instance_enable_ipv6" doc:"hidden"`
// Injected internally
ListenPort int `yaml:"-"`
}
RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToBasicLifecyclerConfig ¶
func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)
ToBasicLifecyclerConfig returns a BasicLifecyclerConfig based on the distributor ring config.
func (*RingConfig) ToRingConfig ¶
func (cfg *RingConfig) ToRingConfig() ring.Config
type ShardTracker ¶
type ShardTracker struct {
// contains filtered or unexported fields
}
ShardTracker is a data structure to keep track of the last pushed shard number for a given stream hash. This allows the distributor to evenly shard streams across pushes even when any given push has fewer entries than the calculated number of shards
func NewShardTracker ¶
func NewShardTracker() *ShardTracker
func (*ShardTracker) LastShardNum ¶
func (t *ShardTracker) LastShardNum(tenant string, streamHash uint64) int
func (*ShardTracker) SetLastShardNum ¶
func (t *ShardTracker) SetLastShardNum(tenant string, streamHash uint64, shardNum int)
type Tee ¶
type Tee interface {
Duplicate(ctx context.Context, tenant string, streams []KeyedStream, pushTracker *PushTracker)
// Register is a prehook to allow Tee's to register its pending streams, allowing distributors to wait for them before concluding a push request.
// If pending streams are registered, one should make sure `pushTracker.doneWithResult` is invoked for the same number of streams added.
Register(ctx context.Context, tenant string, streams []KeyedStream, pushTracker *PushTracker)
}
Tee implementations can duplicate the log streams to another endpoint.
type TeeErrorCodes ¶ added in v3.7.0
type TeeErrorCodes int
const ( // Since Tee is such a specific part of our write path its error codes start at 1000. TeeCouldntSolvePartitionError TeeErrorCodes = 1000 TeeCouldntEncodeStreamError TeeErrorCodes = 1001 TeeCouldntProduceRecordsError TeeErrorCodes = 1002 )
type Validator ¶
type Validator struct {
Limits
// contains filtered or unexported fields
}
func NewValidator ¶
func NewValidator(l Limits, t push.UsageTracker) (*Validator, error)
func (Validator) IsAggregatedMetricStream ¶ added in v3.5.0
func (Validator) IsInternalStream ¶ added in v3.6.0
func (Validator) IsPatternStream ¶ added in v3.6.0
func (Validator) ShouldBlockIngestion ¶ added in v3.2.0
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error)
ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code. priority is: Per-tenant block > named policy block > Global policy block
func (Validator) ValidateEntry ¶
func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry, retentionHours string, policy, format string) error
ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.