Documentation
¶
Index ¶
- Constants
- Variables
- func CalculateStreamsStats(ctx context.Context, userID string, req *logproto.PushRequest, ...) error
- func HTTPError(w http.ResponseWriter, errorStr string, code int, _ log.Logger)
- func OTLPError(w http.ResponseWriter, errorStr string, code int, logger log.Logger)
- func RetentionPeriodToString(retentionPeriod time.Duration) string
- type Action
- type AttributesConfig
- type EmptyLimits
- type ErrorWriter
- type GlobalOTLPConfig
- type Limits
- type OTLPConfig
- func (c *OTLPConfig) ActionForLogAttribute(attribute string) Action
- func (c *OTLPConfig) ActionForResourceAttribute(attribute string) Action
- func (c *OTLPConfig) ActionForScopeAttribute(attribute string) Action
- func (c *OTLPConfig) ApplyGlobalOTLPConfig(config GlobalOTLPConfig)
- func (c *OTLPConfig) Validate() error
- type PolicyWithRetentionWithBytes
- type RequestParser
- type RequestParserWrapper
- type ResourceAttributesConfig
- type Stats
- func NewPushStats() *Stats
- func ParseLokiRequest(userID string, r *http.Request, limits Limits, ...) (*logproto.PushRequest, *Stats, error)
- func ParseOTLPRequest(userID string, r *http.Request, limits Limits, ...) (*logproto.PushRequest, *Stats, error)
- func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, ...) (*logproto.PushRequest, *Stats, error)
- type StreamResolver
- type TenantsRetention
- type UsageTracker
Constants ¶
const ( OTLPSeverityNumber = "severity_number" OTLPSeverityText = "severity_text" )
const ( LabelServiceName = "service_name" ServiceUnknown = "unknown_service" )
Variables ¶
var ( ErrAllLogsFiltered = errors.New("all logs lines filtered during parsing") ErrRequestBodyTooLarge = errors.New("request body too large") )
Functions ¶
func CalculateStreamsStats ¶ added in v3.6.0
func CalculateStreamsStats(ctx context.Context, userID string, req *logproto.PushRequest, streamResolver StreamResolver, tenantConfigs *runtime.TenantConfigs, pushStats *Stats) error
CalculateStreamsStats modifies pushStats with statistics about all the streams from req.
func OTLPError ¶ added in v3.4.0
OTLPError writes an OTLP-compliant error response to the given http.ResponseWriter.
According to the OTLP spec: https://opentelemetry.io/docs/specs/otlp/#failures-1 Re. the error response format > If the processing of the request fails, the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code. > The response body for all HTTP 4xx and HTTP 5xx responses MUST be a Protobuf-encoded Status message that describes the problem. > This specification does not use Status.code field and the server MAY omit Status.code field. > The clients are not expected to alter their behavior based on Status.code field but MAY record it for troubleshooting purposes. > The Status.message field SHOULD contain a developer-facing error message as defined in Status message schema.
Re. retryable errors > The requests that receive a response status code listed in following table SHOULD be retried. > All other 4xx or 5xx response status codes MUST NOT be retried > 429 Too Many Requests > 502 Bad Gateway > 503 Service Unavailable > 504 Gateway Timeout In loki, we expect clients to retry on 500 errors, so we map 500 errors to 503.
func RetentionPeriodToString ¶
Types ¶
type Action ¶
type Action string
Action is the action to be performed on OTLP Resource Attribute.
const ( // IndexLabel stores a Resource Attribute as a label in index to identify streams. IndexLabel Action = "index_label" // StructuredMetadata stores an Attribute as Structured Metadata with each log entry. StructuredMetadata Action = "structured_metadata" // Drop drops Attributes for which the Attribute name does match the regex. Drop Action = "drop" )
type AttributesConfig ¶
type AttributesConfig struct {
Action Action `` /* 251-byte string literal not displayed */
Attributes []string `` /* 147-byte string literal not displayed */
Regex relabel.Regexp `yaml:"regex" json:"regex" doc:"description=Regex to choose attributes to configure how to store them or drop them altogether"`
}
func (AttributesConfig) MarshalJSON ¶ added in v3.7.0
func (c AttributesConfig) MarshalJSON() ([]byte, error)
func (AttributesConfig) MarshalYAML ¶ added in v3.7.0
func (c AttributesConfig) MarshalYAML() (any, error)
func (*AttributesConfig) UnmarshalYAML ¶
func (c *AttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
type EmptyLimits ¶
type EmptyLimits struct{}
func (EmptyLimits) DiscoverServiceName ¶ added in v3.2.0
func (EmptyLimits) DiscoverServiceName(string) []string
func (EmptyLimits) OTLPConfig ¶
func (EmptyLimits) OTLPConfig(string) OTLPConfig
type ErrorWriter ¶ added in v3.4.0
type GlobalOTLPConfig ¶
type GlobalOTLPConfig struct {
DefaultOTLPResourceAttributesAsIndexLabels []string `yaml:"default_resource_attributes_as_index_labels"`
}
func (*GlobalOTLPConfig) RegisterFlags ¶
func (cfg *GlobalOTLPConfig) RegisterFlags(fs *flag.FlagSet)
RegisterFlags registers distributor-related flags.
type Limits ¶
type Limits interface {
OTLPConfig(userID string) OTLPConfig
DiscoverServiceName(userID string) []string
}
type OTLPConfig ¶
type OTLPConfig struct {
ResourceAttributes ResourceAttributesConfig `` /* 206-byte string literal not displayed */
ScopeAttributes []AttributesConfig `` /* 181-byte string literal not displayed */
LogAttributes []AttributesConfig `` /* 191-byte string literal not displayed */
SeverityTextAsLabel bool `` /* 266-byte string literal not displayed */
}
func DefaultOTLPConfig ¶
func DefaultOTLPConfig(cfg GlobalOTLPConfig) OTLPConfig
func (*OTLPConfig) ActionForLogAttribute ¶
func (c *OTLPConfig) ActionForLogAttribute(attribute string) Action
func (*OTLPConfig) ActionForResourceAttribute ¶
func (c *OTLPConfig) ActionForResourceAttribute(attribute string) Action
func (*OTLPConfig) ActionForScopeAttribute ¶
func (c *OTLPConfig) ActionForScopeAttribute(attribute string) Action
func (*OTLPConfig) ApplyGlobalOTLPConfig ¶
func (c *OTLPConfig) ApplyGlobalOTLPConfig(config GlobalOTLPConfig)
ApplyGlobalOTLPConfig applies global otlp config, specifically DefaultOTLPResourceAttributesAsIndexLabels for the start.
func (*OTLPConfig) Validate ¶
func (c *OTLPConfig) Validate() error
type PolicyWithRetentionWithBytes ¶ added in v3.5.0
type RequestParser ¶
type RequestParser func(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, maxDecompressedSize int64, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error)
type RequestParserWrapper ¶
type RequestParserWrapper func(inner RequestParser) RequestParser
type ResourceAttributesConfig ¶
type ResourceAttributesConfig struct {
IgnoreDefaults bool `` /* 314-byte string literal not displayed */
AttributesConfig []AttributesConfig `yaml:"attributes_config,omitempty" json:"attributes_config,omitempty"`
}
type Stats ¶
type Stats struct {
Errs []error
PolicyNumLines map[string]int64
// LogLinesBytes holds the total size of all log lines, per policy per retention. Used in billing.
LogLinesBytes PolicyWithRetentionWithBytes
// StructuredMetadataBytes holds the size of the original structured metadata (but after it was enriched by OLTP
// parser) per policy per retention. Used in billing.
StructuredMetadataBytes PolicyWithRetentionWithBytes
// ResourceAndSourceMetadataLabels holds structured metadata that was added by OLTP parser (scope and resource attributes)
ResourceAndSourceMetadataLabels map[string]map[time.Duration]push.LabelsAdapter
// StreamLabelsSize holds the total size of stream labels after sanitization (empty labels removed and
// non-meaningful whitespaces removed). Not used in billing.
StreamLabelsSize int64
MostRecentEntryTimestamp time.Time
MostRecentEntryTimestampPerStream map[string]time.Time
// StreamSizeBytes holds the total size of log lines and structured metadata. Is used only when logPushRequestStreams is true.
StreamSizeBytes map[string]int64
HashOfAllStreams uint64
ContentType string
ContentEncoding string
BodySize int64
// Extra is a place for a wrapped parser to record any interesting stats as key-value pairs to be logged
Extra []any
HasInternalStreams bool // True if any of the streams has aggregated metrics or is a pattern stream
}
func NewPushStats ¶ added in v3.5.0
func NewPushStats() *Stats
func ParseLokiRequest ¶
func ParseLokiRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, maxDecompressedSize int64, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error)
func ParseOTLPRequest ¶
func ParseOTLPRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, maxDecompressedSize int64, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error)
func ParseRequest ¶
func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, maxDecompressedSize int64, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, pushRequestParser RequestParser, tracker UsageTracker, streamResolver StreamResolver, presumedAgentIP, format string) (*logproto.PushRequest, *Stats, error)
type StreamResolver ¶ added in v3.5.0
type StreamResolver interface {
RetentionPeriodFor(lbs labels.Labels) time.Duration
RetentionHoursFor(lbs labels.Labels) string
PolicyFor(ctx context.Context, lbs labels.Labels) string
}
StreamResolver is a request-scoped interface that provides retention period and policy for a given stream. The values returned by the resolver will not chance thought the handling of the request
type TenantsRetention ¶
type UsageTracker ¶
type UsageTracker interface {
// ReceivedBytesAdd records ingested bytes by tenant, retention period and labels.
ReceivedBytesAdd(ctx context.Context, tenant string, retentionPeriod time.Duration, labels labels.Labels, value float64, format string)
// DiscardedBytesAdd records discarded bytes by tenant and labels.
DiscardedBytesAdd(ctx context.Context, tenant, reason string, labels labels.Labels, value float64, format string)
}