push

package
v3.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: AGPL-3.0 Imports: 42 Imported by: 1

Documentation

Index

Constants

View Source
const (
	OTLPSeverityNumber = "severity_number"
	OTLPSeverityText   = "severity_text"
)
View Source
const (
	LabelServiceName = "service_name"
	ServiceUnknown   = "unknown_service"
)

Variables

View Source
var (
	ErrAllLogsFiltered     = errors.New("all logs lines filtered during parsing")
	ErrRequestBodyTooLarge = errors.New("request body too large")
)

Functions

func CalculateStreamsStats added in v3.6.0

func CalculateStreamsStats(ctx context.Context, userID string, req *logproto.PushRequest, streamResolver StreamResolver, tenantConfigs *runtime.TenantConfigs, pushStats *Stats) error

CalculateStreamsStats modifies pushStats with statistics about all the streams from req.

func HTTPError added in v3.4.0

func HTTPError(w http.ResponseWriter, errorStr string, code int, _ log.Logger)

func OTLPError added in v3.4.0

func OTLPError(w http.ResponseWriter, errorStr string, code int, logger log.Logger)

OTLPError writes an OTLP-compliant error response to the given http.ResponseWriter.

According to the OTLP spec: https://opentelemetry.io/docs/specs/otlp/#failures-1 Re. the error response format > If the processing of the request fails, the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code. > The response body for all HTTP 4xx and HTTP 5xx responses MUST be a Protobuf-encoded Status message that describes the problem. > This specification does not use Status.code field and the server MAY omit Status.code field. > The clients are not expected to alter their behavior based on Status.code field but MAY record it for troubleshooting purposes. > The Status.message field SHOULD contain a developer-facing error message as defined in Status message schema.

Re. retryable errors > The requests that receive a response status code listed in following table SHOULD be retried. > All other 4xx or 5xx response status codes MUST NOT be retried > 429 Too Many Requests > 502 Bad Gateway > 503 Service Unavailable > 504 Gateway Timeout In loki, we expect clients to retry on 500 errors, so we map 500 errors to 503.

func RetentionPeriodToString

func RetentionPeriodToString(retentionPeriod time.Duration) string

Types

type Action

type Action string

Action is the action to be performed on OTLP Resource Attribute.

const (
	// IndexLabel stores a Resource Attribute as a label in index to identify streams.
	IndexLabel Action = "index_label"
	// StructuredMetadata stores an Attribute as Structured Metadata with each log entry.
	StructuredMetadata Action = "structured_metadata"
	// Drop drops Attributes for which the Attribute name does match the regex.
	Drop Action = "drop"
)

type AttributesConfig

type AttributesConfig struct {
	Action     Action         `` /* 251-byte string literal not displayed */
	Attributes []string       `` /* 147-byte string literal not displayed */
	Regex      relabel.Regexp `yaml:"regex" json:"regex" doc:"description=Regex to choose attributes to configure how to store them or drop them altogether"`
}

func (AttributesConfig) MarshalJSON added in v3.7.0

func (c AttributesConfig) MarshalJSON() ([]byte, error)

func (AttributesConfig) MarshalYAML added in v3.7.0

func (c AttributesConfig) MarshalYAML() (any, error)

func (*AttributesConfig) UnmarshalYAML

func (c *AttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

type EmptyLimits

type EmptyLimits struct{}

func (EmptyLimits) DiscoverServiceName added in v3.2.0

func (EmptyLimits) DiscoverServiceName(string) []string

func (EmptyLimits) OTLPConfig

func (EmptyLimits) OTLPConfig(string) OTLPConfig

func (EmptyLimits) PolicyFor added in v3.5.0

func (EmptyLimits) PolicyFor(_ string, _ labels.Labels) string

type ErrorWriter added in v3.4.0

type ErrorWriter func(w http.ResponseWriter, errorStr string, code int, logger log.Logger)

type GlobalOTLPConfig

type GlobalOTLPConfig struct {
	DefaultOTLPResourceAttributesAsIndexLabels []string `yaml:"default_resource_attributes_as_index_labels"`
}

func (*GlobalOTLPConfig) RegisterFlags

func (cfg *GlobalOTLPConfig) RegisterFlags(fs *flag.FlagSet)

RegisterFlags registers distributor-related flags.

type Limits

type Limits interface {
	OTLPConfig(userID string) OTLPConfig
	DiscoverServiceName(userID string) []string
}

type OTLPConfig

type OTLPConfig struct {
	ResourceAttributes  ResourceAttributesConfig `` /* 206-byte string literal not displayed */
	ScopeAttributes     []AttributesConfig       `` /* 181-byte string literal not displayed */
	LogAttributes       []AttributesConfig       `` /* 191-byte string literal not displayed */
	SeverityTextAsLabel bool                     `` /* 266-byte string literal not displayed */
}

func DefaultOTLPConfig

func DefaultOTLPConfig(cfg GlobalOTLPConfig) OTLPConfig

func (*OTLPConfig) ActionForLogAttribute

func (c *OTLPConfig) ActionForLogAttribute(attribute string) Action

func (*OTLPConfig) ActionForResourceAttribute

func (c *OTLPConfig) ActionForResourceAttribute(attribute string) Action

func (*OTLPConfig) ActionForScopeAttribute

func (c *OTLPConfig) ActionForScopeAttribute(attribute string) Action

func (*OTLPConfig) ApplyGlobalOTLPConfig

func (c *OTLPConfig) ApplyGlobalOTLPConfig(config GlobalOTLPConfig)

ApplyGlobalOTLPConfig applies global otlp config, specifically DefaultOTLPResourceAttributesAsIndexLabels for the start.

func (*OTLPConfig) Validate

func (c *OTLPConfig) Validate() error

type PolicyWithRetentionWithBytes added in v3.5.0

type PolicyWithRetentionWithBytes map[string]map[time.Duration]int64

type RequestParser

type RequestParser func(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, maxDecompressedSize int64, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error)

type RequestParserWrapper

type RequestParserWrapper func(inner RequestParser) RequestParser

type ResourceAttributesConfig

type ResourceAttributesConfig struct {
	IgnoreDefaults   bool               `` /* 314-byte string literal not displayed */
	AttributesConfig []AttributesConfig `yaml:"attributes_config,omitempty" json:"attributes_config,omitempty"`
}

type Stats

type Stats struct {
	Errs           []error
	PolicyNumLines map[string]int64

	// LogLinesBytes holds the total size of all log lines, per policy per retention. Used in billing.
	LogLinesBytes PolicyWithRetentionWithBytes

	// StructuredMetadataBytes holds the size of the original structured metadata (but after it was enriched by OLTP
	// parser) per policy per retention. Used in billing.
	StructuredMetadataBytes PolicyWithRetentionWithBytes

	// ResourceAndSourceMetadataLabels holds structured metadata that was added by OLTP parser (scope and resource attributes)
	ResourceAndSourceMetadataLabels map[string]map[time.Duration]push.LabelsAdapter

	// StreamLabelsSize holds the total size of stream labels after sanitization (empty labels removed and
	// non-meaningful whitespaces removed). Not used in billing.
	StreamLabelsSize int64

	MostRecentEntryTimestamp          time.Time
	MostRecentEntryTimestampPerStream map[string]time.Time

	// StreamSizeBytes holds the total size of log lines and structured metadata. Is used only when logPushRequestStreams is true.
	StreamSizeBytes map[string]int64

	HashOfAllStreams uint64
	ContentType      string
	ContentEncoding  string

	BodySize int64
	// Extra is a place for a wrapped parser to record any interesting stats as key-value pairs to be logged
	Extra []any

	HasInternalStreams bool // True if any of the streams has aggregated metrics or is a pattern stream
}

func NewPushStats added in v3.5.0

func NewPushStats() *Stats

func ParseLokiRequest

func ParseLokiRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, maxDecompressedSize int64, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error)

func ParseOTLPRequest

func ParseOTLPRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, maxDecompressedSize int64, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error)

func ParseRequest

func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, maxDecompressedSize int64, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, pushRequestParser RequestParser, tracker UsageTracker, streamResolver StreamResolver, presumedAgentIP, format string) (*logproto.PushRequest, *Stats, error)

type StreamResolver added in v3.5.0

type StreamResolver interface {
	RetentionPeriodFor(lbs labels.Labels) time.Duration
	RetentionHoursFor(lbs labels.Labels) string
	PolicyFor(ctx context.Context, lbs labels.Labels) string
}

StreamResolver is a request-scoped interface that provides retention period and policy for a given stream. The values returned by the resolver will not chance thought the handling of the request

type TenantsRetention

type TenantsRetention interface {
	RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration
}

type UsageTracker

type UsageTracker interface {

	// ReceivedBytesAdd records ingested bytes by tenant, retention period and labels.
	ReceivedBytesAdd(ctx context.Context, tenant string, retentionPeriod time.Duration, labels labels.Labels, value float64, format string)

	// DiscardedBytesAdd records discarded bytes by tenant and labels.
	DiscardedBytesAdd(ctx context.Context, tenant, reason string, labels labels.Labels, value float64, format string)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL