distributor

package
v3.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: AGPL-3.0 Imports: 73 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRateStore

func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, registerer prometheus.Registerer) *rateStore

Types

type Config

type Config struct {
	// Distributors ring
	DistributorRing RingConfig `yaml:"ring,omitempty"`
	PushWorkerCount int        `yaml:"push_worker_count"`

	// Request parser
	MaxRecvMsgSize      int   `yaml:"max_recv_msg_size"`
	MaxDecompressedSize int64 `yaml:"max_decompressed_size"`

	// RateStore customizes the rate storing used by stream sharding.
	RateStore RateStoreConfig `yaml:"rate_store"`

	// WriteFailuresLoggingCfg customizes write failures logging behavior.
	WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Customize the logging of write failures."`

	OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`

	// DefaultPolicyStreamMappings contains the default policy stream mappings that are merged with per-tenant mappings.
	DefaultPolicyStreamMappings validation.PolicyStreamMapping `` /* 128-byte string literal not displayed */

	KafkaEnabled              bool `yaml:"kafka_writes_enabled"`
	IngesterEnabled           bool `yaml:"ingester_writes_enabled"`
	IngestLimitsEnabled       bool `yaml:"ingest_limits_enabled"`
	IngestLimitsDryRunEnabled bool `yaml:"ingest_limits_dry_run_enabled"`

	KafkaConfig kafka.Config `yaml:"-"`

	DataObjTeeConfig DataObjTeeConfig `yaml:"dataobj_tee"`
	// contains filtered or unexported fields
}

Config for a Distributor.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(fs *flag.FlagSet)

RegisterFlags registers distributor-related flags.

func (*Config) Validate added in v3.3.0

func (cfg *Config) Validate() error

type DataObjTee added in v3.7.0

type DataObjTee struct {
	// contains filtered or unexported fields
}

DataObjTee is a tee that duplicates streams to the data object topic. It is a temporary solution while we work on segmentation keys.

func NewDataObjTee added in v3.7.0

func NewDataObjTee(
	cfg *DataObjTeeConfig,
	resolver *segmentationPartitionResolver,
	limitsClient *ingestLimits,
	limits Limits,
	kafkaClient *kgo.Client,
	logger log.Logger,
	r prometheus.Registerer,
) (*DataObjTee, error)

NewDataObjTee returns a new DataObjTee.

func (*DataObjTee) Duplicate added in v3.7.0

func (t *DataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream, pushTracker *PushTracker)

Duplicate implements the Tee interface.

func (*DataObjTee) RateBatcher added in v3.7.0

func (t *DataObjTee) RateBatcher() services.Service

RateBatcher returns the rate batcher service if batching is enabled, nil otherwise. This is used to add the batcher to the distributor's subservices for lifecycle management.

func (*DataObjTee) Register added in v3.7.0

func (t *DataObjTee) Register(_ context.Context, _ string, streams []KeyedStream, pushTracker *PushTracker)

type DataObjTeeConfig added in v3.7.0

type DataObjTeeConfig struct {
	Enabled               bool          `yaml:"enabled"`
	Topic                 string        `yaml:"topic"`
	MaxBufferedBytes      int           `yaml:"max_buffered_bytes"`
	PerPartitionRateBytes int           `yaml:"per_partition_rate_bytes"`
	DebugMetricsEnabled   bool          `yaml:"debug_metrics_enabled"`
	RateBatchWindow       time.Duration `yaml:"rate_batch_window"`
}

func (*DataObjTeeConfig) RegisterFlags added in v3.7.0

func (c *DataObjTeeConfig) RegisterFlags(f *flag.FlagSet)

func (*DataObjTeeConfig) Validate added in v3.7.0

func (c *DataObjTeeConfig) Validate() error

type Distributor

type Distributor struct {
	services.Service

	RequestParserWrapper push.RequestParserWrapper
	// contains filtered or unexported fields
}

Distributor coordinates replicates and distribution of log streams.

func New

func New(
	cfg Config,
	ingesterCfg ingester.Config,
	clientCfg ingester_client.Config,
	configs *runtime.TenantConfigs,
	ingestersRing ring.ReadRing,
	partitionRing ring.PartitionRingReader,
	overrides Limits,
	registerer prometheus.Registerer,
	metricsNamespace string,
	tee Tee,
	usageTracker push.UsageTracker,
	limitsFrontendCfg limits_frontend_client.Config,
	limitsFrontendRing ring.ReadRing,
	numMetadataPartitions int,
	dataObjConsumerPartitionRing ring.PartitionRingReader,
	logger log.Logger,
) (*Distributor, error)

New a distributor creates.

func (*Distributor) HealthyInstancesCount

func (d *Distributor) HealthyInstancesCount() int

HealthyInstancesCount implements the ReadLifecycler interface.

We use a ring lifecycler delegate to count the number of members of the ring. The count is then used to enforce rate limiting correctly for each distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES.

func (*Distributor) OTLPPushHandler

func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request)

func (*Distributor) Push

func (*Distributor) PushHandler

func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)

PushHandler reads a snappy-compressed proto from the HTTP body.

func (*Distributor) PushWithResolver added in v3.5.0

func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRequest, streamResolver *requestScopedStreamResolver, format string) (*logproto.PushResponse, error)

Push a set of streams. The returned error is the last one seen.

func (*Distributor) ServeHTTP

func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements the distributor ring status page.

If the rate limiting strategy is local instead of global, no ring is used by the distributor and as such, no ring status is returned from this function.

type FieldDetector added in v3.4.0

type FieldDetector struct {
	// contains filtered or unexported fields
}

type KafkaProducer added in v3.3.0

type KafkaProducer interface {
	ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults
	Close()
}

type KeyedStream

type KeyedStream struct {
	HashKey        uint32
	HashKeyNoShard uint64
	Stream         logproto.Stream
	Policy         string
}

type Limits

type Limits interface {
	retention.Limits
	MaxLineSize(userID string) int
	MaxLineSizeTruncate(userID string) bool
	MaxLineSizeTruncateIdentifier(userID string) string

	MaxLabelNamesPerSeries(userID string) int
	MaxLabelNameLength(userID string) int
	MaxLabelValueLength(userID string) int

	CreationGracePeriod(userID string) time.Duration
	RejectOldSamples(userID string) bool
	RejectOldSamplesMaxAge(userID string) time.Duration

	IncrementDuplicateTimestamps(userID string) bool
	DiscoverServiceName(userID string) []string
	DiscoverGenericFields(userID string) map[string][]string
	DiscoverLogLevels(userID string) bool
	LogLevelFields(userID string) []string
	LogLevelFromJSONMaxDepth(userID string) int

	ShardStreams(userID string) shardstreams.Config
	IngestionRateStrategy() string
	IngestionRateBytes(userID string) float64
	IngestionBurstSizeBytes(userID string) int
	AllowStructuredMetadata(userID string) bool
	MaxStructuredMetadataSize(userID string) int
	MaxStructuredMetadataCount(userID string) int
	OTLPConfig(userID string) push.OTLPConfig

	BlockIngestionUntil(userID string) time.Time
	BlockIngestionStatusCode(userID string) int
	BlockIngestionPolicyUntil(userID string, policy string) time.Time
	EnforcedLabels(userID string) []string
	PolicyEnforcedLabels(userID string, policy string) []string

	IngestionPartitionsTenantShardSize(userID string) int

	SimulatedPushLatency(userID string) time.Duration
}

Limits is an interface for distributor limits/related configs

type PushTracker added in v3.7.0

type PushTracker struct {
	// contains filtered or unexported fields
}

TODO taken from Cortex, see if we can refactor out an usable interface.

type RateBatcherConfig added in v3.7.0

type RateBatcherConfig struct {
	// BatchWindow is the duration to accumulate rate updates before flushing.
	BatchWindow time.Duration
}

RateBatcherConfig contains the configuration for the RateBatcher.

type RateStore

type RateStore interface {
	RateFor(tenantID string, streamHash uint64) (int64, float64)
}

RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.

type RateStoreConfig

type RateStoreConfig struct {
	MaxParallelism           int           `yaml:"max_request_parallelism"`
	StreamRateUpdateInterval time.Duration `yaml:"stream_rate_update_interval"`
	IngesterReqTimeout       time.Duration `yaml:"ingester_request_timeout"`
	Debug                    bool          `yaml:"debug"`
}

func (*RateStoreConfig) RegisterFlagsWithPrefix

func (cfg *RateStoreConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet)

type ReadLifecycler

type ReadLifecycler interface {
	HealthyInstancesCount() int
}

ReadLifecycler represents the read interface to the lifecycler.

type RingConfig

type RingConfig struct {
	KVStore          kv.Config     `yaml:"kvstore"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"hidden"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
	InstancePort           int      `yaml:"instance_port" doc:"hidden"`
	InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`
	EnableIPv6             bool     `yaml:"instance_enable_ipv6" doc:"hidden"`

	// Injected internally
	ListenPort int `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToBasicLifecyclerConfig

func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

ToBasicLifecyclerConfig returns a BasicLifecyclerConfig based on the distributor ring config.

func (*RingConfig) ToRingConfig

func (cfg *RingConfig) ToRingConfig() ring.Config

type ShardTracker

type ShardTracker struct {
	// contains filtered or unexported fields
}

ShardTracker is a data structure to keep track of the last pushed shard number for a given stream hash. This allows the distributor to evenly shard streams across pushes even when any given push has fewer entries than the calculated number of shards

func NewShardTracker

func NewShardTracker() *ShardTracker

func (*ShardTracker) LastShardNum

func (t *ShardTracker) LastShardNum(tenant string, streamHash uint64) int

func (*ShardTracker) SetLastShardNum

func (t *ShardTracker) SetLastShardNum(tenant string, streamHash uint64, shardNum int)

type Tee

type Tee interface {
	Duplicate(ctx context.Context, tenant string, streams []KeyedStream, pushTracker *PushTracker)

	// Register is a prehook to allow Tee's to register its pending streams, allowing distributors to wait for them before concluding a push request.
	// If pending streams are registered, one should make sure `pushTracker.doneWithResult` is invoked for the same number of streams added.
	Register(ctx context.Context, tenant string, streams []KeyedStream, pushTracker *PushTracker)
}

Tee implementations can duplicate the log streams to another endpoint.

func WrapTee

func WrapTee(existing, newTee Tee) Tee

WrapTee wraps a new Tee around an existing Tee.

type TeeErrorCodes added in v3.7.0

type TeeErrorCodes int
const (
	// Since Tee is such a specific part of our write path its error codes start at 1000.
	TeeCouldntSolvePartitionError TeeErrorCodes = 1000
	TeeCouldntEncodeStreamError   TeeErrorCodes = 1001
	TeeCouldntProduceRecordsError TeeErrorCodes = 1002
)

type Validator

type Validator struct {
	Limits
	// contains filtered or unexported fields
}

func NewValidator

func NewValidator(l Limits, t push.UsageTracker) (*Validator, error)

func (Validator) IsAggregatedMetricStream added in v3.5.0

func (v Validator) IsAggregatedMetricStream(ls labels.Labels) bool

func (Validator) IsInternalStream added in v3.6.0

func (v Validator) IsInternalStream(ls labels.Labels) bool

func (Validator) IsPatternStream added in v3.6.0

func (v Validator) IsPatternStream(ls labels.Labels) bool

func (Validator) ShouldBlockIngestion added in v3.2.0

func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error)

ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code. priority is: Per-tenant block > named policy block > Global policy block

func (Validator) ValidateEntry

func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry, retentionHours string, policy, format string) error

ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.

func (Validator) ValidateLabels

func (v Validator) ValidateLabels(vCtx validationContext, ls labels.Labels, stream logproto.Stream, retentionHours, policy, format string) error

Validate labels returns an error if the labels are invalid and if the stream is an aggregated metric stream

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL