Documentation
¶
Index ¶
- Constants
- type Config
- type Evictable
- type Evictor
- type IngestLimits
- func (s *IngestLimits) CheckReady(ctx context.Context) error
- func (s *IngestLimits) Collect(m chan<- prometheus.Metric)
- func (s *IngestLimits) Describe(descs chan<- *prometheus.Desc)
- func (s *IngestLimits) Flush()
- func (s *IngestLimits) GetAssignedPartitions(_ context.Context, _ *logproto.GetAssignedPartitionsRequest) (*logproto.GetAssignedPartitionsResponse, error)
- func (s *IngestLimits) GetStreamUsage(_ context.Context, req *logproto.GetStreamUsageRequest) (*logproto.GetStreamUsageResponse, error)
- func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (s *IngestLimits) TransferOut(_ context.Context) error
- type PartitionManager
- func (m *PartitionManager) Assign(_ context.Context, _ *kgo.Client, partitions map[string][]int32)
- func (m *PartitionManager) Has(partitionID int32) bool
- func (m *PartitionManager) List() map[int32]int64
- func (m *PartitionManager) Remove(_ context.Context, _ *kgo.Client, partitions map[string][]int32)
Constants ¶
const ( // Ring RingKey = "ingest-limits" RingName = "ingest-limits" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Enabled enables the ingest limits service.
Enabled bool `yaml:"enabled"`
// WindowSize defines the time window for which stream metadata is considered active.
// Stream metadata older than WindowSize will be evicted from the metadata map.
WindowSize time.Duration `yaml:"window_size"`
// RateWindow defines the time window for rate calculation.
// This should match the window used in Prometheus rate() queries for consistency,
// when using the `loki_ingest_limits_ingested_bytes_total` metric.
// Defaults to 5 minutes if not specified.
RateWindow time.Duration `yaml:"rate_window"`
// BucketDuration defines the granularity of time buckets used for sliding window rate calculation.
// Smaller buckets provide more precise rate tracking but require more memory.
// Defaults to 1 minute if not specified.
BucketDuration time.Duration `yaml:"bucket_duration"`
// LifecyclerConfig is the config to build a ring lifecycler.
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
KafkaConfig kafka.Config `yaml:"-"`
// The number of partitions for the Kafka topic used to read and write stream metadata.
// It is fixed, not a maximum.
NumPartitions int `yaml:"num_partitions"`
}
Config represents the configuration for the ingest limits service.
func (*Config) RegisterFlags ¶
type Evictor ¶
type Evictor struct {
// contains filtered or unexported fields
}
Evictor runs scheduled evictions.
type IngestLimits ¶
IngestLimits is a service that manages stream metadata limits.
func NewIngestLimits ¶
func NewIngestLimits(cfg Config, logger log.Logger, reg prometheus.Registerer) (*IngestLimits, error)
NewIngestLimits creates a new IngestLimits service. It initializes the metadata map and sets up a Kafka client The client is configured to consume stream metadata from a dedicated topic with the metadata suffix.
func (*IngestLimits) CheckReady ¶
func (s *IngestLimits) CheckReady(ctx context.Context) error
func (*IngestLimits) Collect ¶
func (s *IngestLimits) Collect(m chan<- prometheus.Metric)
func (*IngestLimits) Describe ¶
func (s *IngestLimits) Describe(descs chan<- *prometheus.Desc)
func (*IngestLimits) Flush ¶
func (s *IngestLimits) Flush()
Flush implements ring.FlushTransferer. It transfers state to another ingest limits instance.
func (*IngestLimits) GetAssignedPartitions ¶
func (s *IngestLimits) GetAssignedPartitions(_ context.Context, _ *logproto.GetAssignedPartitionsRequest) (*logproto.GetAssignedPartitionsResponse, error)
GetAssignedPartitions implements the logproto.IngestLimitsServer interface. It returns the partitions that the tenant is assigned to and the instance still owns.
func (*IngestLimits) GetStreamUsage ¶
func (s *IngestLimits) GetStreamUsage(_ context.Context, req *logproto.GetStreamUsageRequest) (*logproto.GetStreamUsageResponse, error)
GetStreamUsage implements the logproto.IngestLimitsServer interface. It returns the number of active streams for a tenant and the status of requested streams.
func (*IngestLimits) ServeHTTP ¶
func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements the http.Handler interface. It returns the current stream counts and status per tenant as a JSON response.
func (*IngestLimits) TransferOut ¶
func (s *IngestLimits) TransferOut(_ context.Context) error
TransferOut implements ring.FlushTransferer. It transfers state to another ingest limits instance.
type PartitionManager ¶
type PartitionManager struct {
// contains filtered or unexported fields
}
PartitionManager keeps track of the partitions assigned and for each partition a timestamp of when it was last updated.
func NewPartitionManager ¶
func NewPartitionManager(logger log.Logger) *PartitionManager
NewPartitionManager returns a new PartitionManager.
func (*PartitionManager) Assign ¶
Assign assigns the partitions and sets the last updated timestamp for each partition to the current time.
func (*PartitionManager) Has ¶
func (m *PartitionManager) Has(partitionID int32) bool
Has returns true if the partition is assigned, otherwise false.
func (*PartitionManager) List ¶
func (m *PartitionManager) List() map[int32]int64
List returns a map of all assigned partitions and their last updated timestamps.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package client provides gRPC client implementation for limits service.
|
Package client provides gRPC client implementation for limits service. |
|
Package frontend contains provides a frontend service for ingest limits.
|
Package frontend contains provides a frontend service for ingest limits. |
|
client
Package client provides gRPC client implementation for limits-frontend.
|
Package client provides gRPC client implementation for limits-frontend. |