Documentation
¶
Index ¶
- Constants
- func GetMaxSegmentsPerWorkerPerWave() int
- func GetOrgIDFromContext(ctx context.Context) (uuid.UUID, bool)
- func IngestExemplarLogsJSONToDuckDB(ctx context.Context, db *sql.DB, tableName string, exemplarData map[string]any) (int, error)
- func PushDownStream[T any](ctx context.Context, worker Worker, request PushDownRequest, ...) (<-chan T, error)
- func SetMaxSegmentsPerWorkerPerWave(value int)
- func StepForQueryDuration(startMs, endMs int64) time.Duration
- func TargetSize(totalSegments, workers int) int
- func ValidateEqualityMatcherRequirement(ast logql.LogAST) error
- func ValidateRangeSelector(ast logql.LogAST, expectedDur time.Duration) error
- func ValidateStreamAttributeRequirement(ast logql.LogAST, streamAttribute string) error
- func WithOrgID(ctx context.Context, orgID uuid.UUID) context.Context
- type APIError
- type APIErrorCode
- type BaseWorkerDiscovery
- func (b *BaseWorkerDiscovery) GetAllWorkers() ([]Worker, error)
- func (b *BaseWorkerDiscovery) GetWorkers() []Worker
- func (b *BaseWorkerDiscovery) GetWorkersForSegments(organizationID uuid.UUID, segmentIDs []int64) ([]SegmentWorkerMapping, error)
- func (b *BaseWorkerDiscovery) IsRunning() bool
- func (b *BaseWorkerDiscovery) SetRunning(running bool)
- func (b *BaseWorkerDiscovery) SetWorkers(workers []Worker)
- type DateIntHours
- type EcsClientInterface
- type EcsWorkerDiscovery
- type EcsWorkerDiscoveryConfig
- type EvalFlow
- type EvalFlowOptions
- type FieldValidationError
- type KubernetesWorkerDiscovery
- type KubernetesWorkerDiscoveryConfig
- type LocalDevDiscovery
- type LokiSeriesResponse
- type PushDownRequest
- type QuerierService
- func (q *QuerierService) EvaluateLogTagNamesQuery(ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, ...) (<-chan promql.Timestamped, error)
- func (q *QuerierService) EvaluateLogTagValuesQuery(ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, ...) (<-chan promql.Timestamped, error)
- func (q *QuerierService) EvaluateLogsQuery(ctx context.Context, orgID uuid.UUID, startTs, endTs int64, reverse bool, ...) (<-chan promql.Timestamped, error)
- func (q *QuerierService) EvaluateMetricTagNamesQuery(ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, ...) (<-chan promql.Timestamped, error)
- func (q *QuerierService) EvaluateMetricTagValuesQuery(ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, ...) (<-chan promql.TagValue, error)
- func (q *QuerierService) EvaluateMetricsQuery(ctx context.Context, orgID uuid.UUID, startTs, endTs int64, ...) (<-chan map[string]promql.EvalResult, error)
- func (q *QuerierService) EvaluateMetricsSummary(ctx context.Context, orgID uuid.UUID, startTs, endTs int64, ...) ([]SeriesSummary, error)
- func (q *QuerierService) EvaluateSpanTagNamesQuery(ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, ...) (<-chan promql.Timestamped, error)
- func (q *QuerierService) EvaluateSpanTagValuesQuery(ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, ...) (<-chan promql.Timestamped, error)
- func (q *QuerierService) EvaluateSpansQuery(ctx context.Context, orgID uuid.UUID, startTs, endTs int64, reverse bool, ...) (<-chan promql.Timestamped, error)
- func (q *QuerierService) Run(doneCtx context.Context) error
- type SegmentGroup
- type SegmentInfo
- type SegmentKey
- type SegmentLookupFunc
- type SegmentWorkerMapping
- type SeriesSummary
- type StageCheck
- type TraceSegmentLookupFunc
- type TrigramQuery
- type ValidateOption
- type ValidateResult
- type Worker
- type WorkerDiscovery
Constants ¶
const (
DefaultLogStep = 10 * time.Second
)
const DefaultMaxSegmentsPerWorkerPerWave = 20
DefaultMaxSegmentsPerWorkerPerWave is the default hard cap per worker per wave.
const (
DefaultSpansStep = 10 * time.Second
)
Variables ¶
This section is empty.
Functions ¶
func GetMaxSegmentsPerWorkerPerWave ¶ added in v1.7.0
func GetMaxSegmentsPerWorkerPerWave() int
GetMaxSegmentsPerWorkerPerWave returns the current max segments per worker per wave setting.
func GetOrgIDFromContext ¶
GetOrgIDFromContext retrieves the organization ID from the context
func IngestExemplarLogsJSONToDuckDB ¶ added in v1.3.4
func IngestExemplarLogsJSONToDuckDB( ctx context.Context, db *sql.DB, tableName string, exemplarData map[string]any, ) (int, error)
IngestExemplarLogsJSONToDuckDB ingests a map[string]any exemplar into DuckDB. It returns the number of rows inserted.
func PushDownStream ¶
func SetMaxSegmentsPerWorkerPerWave ¶ added in v1.7.0
func SetMaxSegmentsPerWorkerPerWave(value int)
SetMaxSegmentsPerWorkerPerWave configures the max segments per worker per wave. If value is <= 0, the default is used.
func StepForQueryDuration ¶
func TargetSize ¶
TargetSize computes the approximate number of segments per SegmentGroup ("wave"). It enforces a hard per-worker-per-wave cap:
waveSegments <= workers * MaxSegmentsPerWorkerPerWave
func ValidateEqualityMatcherRequirement ¶ added in v1.4.7
ValidateEqualityMatcherRequirement checks that the LogQL query has at least one equality matcher
func ValidateRangeSelector ¶ added in v1.4.8
ValidateRangeSelector checks that all range selectors in the LogQL query match the expected duration calculated from the query time range
func ValidateStreamAttributeRequirement ¶ added in v1.4.7
ValidateStreamAttributeRequirement checks that the LogQL query has an equality matcher for the specified stream attribute in all log selectors
Types ¶
type APIError ¶ added in v1.4.2
type APIError struct {
Status int `json:"status"`
Code APIErrorCode `json:"code"`
Message string `json:"message"`
}
type APIErrorCode ¶ added in v1.4.2
type APIErrorCode string
const ( InvalidJSON APIErrorCode = "INVALID_JSON" ErrInvalidExpr APIErrorCode = "INVALID_EXPR" ValidationFailed APIErrorCode = "VALIDATION_FAILED" ErrCompileError APIErrorCode = "COMPILE_ERROR" ErrRewriteUnsupported APIErrorCode = "REWRITE_UNSUPPORTED" ErrInternalError APIErrorCode = "INTERNAL_ERROR" ErrClientClosed APIErrorCode = "CLIENT_CLOSED" ErrDeadlineExceeded APIErrorCode = "DEADLINE_EXCEEDED" ErrForbidden APIErrorCode = "FORBIDDEN" ErrNotFound APIErrorCode = "NOT_FOUND" ErrRateLimited APIErrorCode = "RATE_LIMITED" ErrSSEUnsupported APIErrorCode = "SSE_UNSUPPORTED" )
type BaseWorkerDiscovery ¶ added in v1.3.6
type BaseWorkerDiscovery struct {
// contains filtered or unexported fields
}
BaseWorkerDiscovery provides common functionality for worker discovery implementations
func (*BaseWorkerDiscovery) GetAllWorkers ¶ added in v1.3.6
func (b *BaseWorkerDiscovery) GetAllWorkers() ([]Worker, error)
GetAllWorkers returns a copy of all currently discovered workers
func (*BaseWorkerDiscovery) GetWorkers ¶ added in v1.3.6
func (b *BaseWorkerDiscovery) GetWorkers() []Worker
GetWorkers returns a copy of the current workers (for internal use)
func (*BaseWorkerDiscovery) GetWorkersForSegments ¶ added in v1.3.6
func (b *BaseWorkerDiscovery) GetWorkersForSegments(organizationID uuid.UUID, segmentIDs []int64) ([]SegmentWorkerMapping, error)
GetWorkersForSegments returns worker assignments for the given segments using consistent hashing
func (*BaseWorkerDiscovery) IsRunning ¶ added in v1.3.6
func (b *BaseWorkerDiscovery) IsRunning() bool
IsRunning returns whether the discovery is currently running
func (*BaseWorkerDiscovery) SetRunning ¶ added in v1.3.6
func (b *BaseWorkerDiscovery) SetRunning(running bool)
SetRunning sets the running state
func (*BaseWorkerDiscovery) SetWorkers ¶ added in v1.3.6
func (b *BaseWorkerDiscovery) SetWorkers(workers []Worker)
SetWorkers updates the worker list in a thread-safe manner
type DateIntHours ¶
type EcsClientInterface ¶ added in v1.3.6
type EcsClientInterface interface {
ListTasks(ctx context.Context, params *ecs.ListTasksInput, optFns ...func(*ecs.Options)) (*ecs.ListTasksOutput, error)
DescribeTasks(ctx context.Context, params *ecs.DescribeTasksInput, optFns ...func(*ecs.Options)) (*ecs.DescribeTasksOutput, error)
}
EcsClientInterface defines the ECS client methods needed for worker discovery
type EcsWorkerDiscovery ¶ added in v1.3.6
type EcsWorkerDiscovery struct {
BaseWorkerDiscovery
// contains filtered or unexported fields
}
func NewEcsWorkerDiscovery ¶ added in v1.3.6
func NewEcsWorkerDiscovery(cfg EcsWorkerDiscoveryConfig) (*EcsWorkerDiscovery, error)
func NewEcsWorkerDiscoveryWithClient ¶ added in v1.3.6
func NewEcsWorkerDiscoveryWithClient(cfg EcsWorkerDiscoveryConfig, ecsClient EcsClientInterface) (*EcsWorkerDiscovery, error)
func (*EcsWorkerDiscovery) Start ¶ added in v1.3.6
func (e *EcsWorkerDiscovery) Start(ctx context.Context) error
func (*EcsWorkerDiscovery) Stop ¶ added in v1.3.6
func (e *EcsWorkerDiscovery) Stop() error
type EcsWorkerDiscoveryConfig ¶ added in v1.3.6
type EvalFlow ¶
type EvalFlow struct {
// contains filtered or unexported fields
}
EvalFlow connects a stream of SketchInput -> aggregator -> root.Eval, and returns a channel of evaluated results (one map per flushed time-bucket).
func NewEvalFlow ¶
func NewEvalFlow( root promql.ExecNode, leaves []promql.BaseExpr, step time.Duration, opts EvalFlowOptions, ) *EvalFlow
NewEvalFlow builds a flow for a compiled plan. `leaves` are used to build a BaseExpr lookup by ID for the aggregator.
func (*EvalFlow) Run ¶
func (f *EvalFlow) Run( ctx context.Context, in <-chan promql.SketchInput, ) <-chan map[string]promql.EvalResult
Run consumes a globally merged time-sorted stream of SketchInput and produces a channel of evaluated results (one per flushed time-bucket).
type EvalFlowOptions ¶
type EvalFlowOptions struct {
// NumBuffers is the number of time-buckets the aggregator keeps before flushing
// (small ring buffer). 3 is a good default.
NumBuffers int
// OutBuffer is the channel buffer size for result maps.
OutBuffer int
}
EvalFlowOptions tunes buffering / aggregation behavior.
type FieldValidationError ¶ added in v1.4.5
type KubernetesWorkerDiscovery ¶
type KubernetesWorkerDiscovery struct {
BaseWorkerDiscovery
// contains filtered or unexported fields
}
func NewKubernetesWorkerDiscovery ¶
func NewKubernetesWorkerDiscovery(cfg KubernetesWorkerDiscoveryConfig) (*KubernetesWorkerDiscovery, error)
func (*KubernetesWorkerDiscovery) Start ¶
func (k *KubernetesWorkerDiscovery) Start(ctx context.Context) error
func (*KubernetesWorkerDiscovery) Stop ¶
func (k *KubernetesWorkerDiscovery) Stop() error
type KubernetesWorkerDiscoveryConfig ¶
type KubernetesWorkerDiscoveryConfig struct {
Namespace string // REQUIRED (or via POD_NAMESPACE)
WorkerLabelSelector string // REQUIRED, selector applied to Services
WorkerPort int // Fallback when no port found on ES/Service; defaults to 8080
// If true, include IPv6 endpoints as well.
AllowIPv6 bool
}
type LocalDevDiscovery ¶
type LocalDevDiscovery struct {
}
func NewLocalDevDiscovery ¶
func NewLocalDevDiscovery() *LocalDevDiscovery
func (*LocalDevDiscovery) GetAllWorkers ¶
func (d *LocalDevDiscovery) GetAllWorkers() ([]Worker, error)
func (*LocalDevDiscovery) GetWorkersForSegments ¶
func (d *LocalDevDiscovery) GetWorkersForSegments(organizationID uuid.UUID, segmentIDs []int64) ([]SegmentWorkerMapping, error)
func (*LocalDevDiscovery) Stop ¶
func (d *LocalDevDiscovery) Stop() error
type LokiSeriesResponse ¶ added in v1.6.1
type LokiSeriesResponse struct {
Status string `json:"status"`
Data []map[string]string `json:"data"`
}
LokiSeriesResponse mimics Loki's /loki/api/v1/series response format. This allows clients to reuse existing Loki-compatible code.
type PushDownRequest ¶
type PushDownRequest struct {
OrganizationID uuid.UUID `json:"orgId"`
StartTs int64 `json:"startTs"`
EndTs int64 `json:"endTs"`
Step time.Duration `json:"step"`
Segments []SegmentInfo `json:"segments"`
// dataset specific fields
BaseExpr *promql.BaseExpr `json:"baseExpr"`
LogLeaf *logql.LogLeaf `json:"logLeaf"`
Limit int `json:"limit"`
Reverse bool `json:"reverse"`
Fields []string `json:"fields,omitempty"`
TagName string `json:"tagName"` // Set this to a tag name to get distinct values for that tag
TagNames bool `json:"tagNames"` // Set this to true to get distinct tag names (column names) instead of values
IsSpans bool `json:"isSpans"` // Set this to true for spans queries
IsSummary bool `json:"isSummary"` // Set this to true for summary queries that return DDSketch per series
}
PushDownRequest is sent to a worker.
func (*PushDownRequest) ToOrderString ¶
func (p *PushDownRequest) ToOrderString() string
type QuerierService ¶
type QuerierService struct {
// contains filtered or unexported fields
}
func NewQuerierService ¶
func NewQuerierService(mdb lrdb.StoreFull, workerDiscovery WorkerDiscovery, apiKeyProvider orgapikey.OrganizationAPIKeyProvider) (*QuerierService, error)
func (*QuerierService) EvaluateLogTagNamesQuery ¶ added in v1.9.1
func (q *QuerierService) EvaluateLogTagNamesQuery( ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, queryPlan logql.LQueryPlan, ) (<-chan promql.Timestamped, error)
EvaluateLogTagNamesQuery queries workers to find distinct tag names (column names) that have at least one non-null value in logs matching the filter criteria. This is used for scoped tag discovery.
func (*QuerierService) EvaluateLogTagValuesQuery ¶
func (q *QuerierService) EvaluateLogTagValuesQuery( ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, queryPlan logql.LQueryPlan, ) (<-chan promql.Timestamped, error)
func (*QuerierService) EvaluateLogsQuery ¶
func (q *QuerierService) EvaluateLogsQuery( ctx context.Context, orgID uuid.UUID, startTs, endTs int64, reverse bool, limit int, queryPlan logql.LQueryPlan, fields []string, ) (<-chan promql.Timestamped, error)
func (*QuerierService) EvaluateMetricTagNamesQuery ¶ added in v1.9.1
func (q *QuerierService) EvaluateMetricTagNamesQuery( ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, queryPlan promql.QueryPlan, ) (<-chan promql.Timestamped, error)
EvaluateMetricTagNamesQuery queries workers to find distinct tag names (column names) that have at least one non-null value in metrics matching the filter criteria. This is used for scoped tag discovery.
func (*QuerierService) EvaluateMetricTagValuesQuery ¶
func (*QuerierService) EvaluateMetricsQuery ¶
func (*QuerierService) EvaluateMetricsSummary ¶ added in v1.11.0
func (q *QuerierService) EvaluateMetricsSummary( ctx context.Context, orgID uuid.UUID, startTs, endTs int64, queryPlan promql.QueryPlan, ) ([]SeriesSummary, error)
EvaluateMetricsSummary runs a metrics query and aggregates results into per-series summaries. It first tries the optimized DDSketch-based implementation, falling back to the legacy streaming approach if that fails.
func (*QuerierService) EvaluateSpanTagNamesQuery ¶ added in v1.9.1
func (q *QuerierService) EvaluateSpanTagNamesQuery( ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, queryPlan logql.LQueryPlan, ) (<-chan promql.Timestamped, error)
EvaluateSpanTagNamesQuery queries workers to find distinct tag names (column names) that have at least one non-null value in spans matching the filter criteria. This is used for scoped tag discovery.
func (*QuerierService) EvaluateSpanTagValuesQuery ¶ added in v1.4.1
func (q *QuerierService) EvaluateSpanTagValuesQuery( ctx context.Context, orgID uuid.UUID, startTs int64, endTs int64, queryPlan logql.LQueryPlan, ) (<-chan promql.Timestamped, error)
func (*QuerierService) EvaluateSpansQuery ¶ added in v1.4.0
func (q *QuerierService) EvaluateSpansQuery( ctx context.Context, orgID uuid.UUID, startTs, endTs int64, reverse bool, limit int, queryPlan logql.LQueryPlan, fields []string, ) (<-chan promql.Timestamped, error)
type SegmentGroup ¶
type SegmentGroup struct {
StartTs int64
EndTs int64
Segments []SegmentInfo
}
func ComputeReplayBatches ¶
func ComputeReplayBatches( segments []SegmentInfo, step time.Duration, queryStartTs, queryEndTs int64, targetSize int, reverseSort bool, ) []SegmentGroup
func ComputeReplayBatchesWithWorkers ¶
func ComputeReplayBatchesWithWorkers( segments []SegmentInfo, step time.Duration, queryStartTs, queryEndTs int64, workers int, reverseSort bool, ) []SegmentGroup
ComputeReplayBatchesWithWorkers public entrypoint. Computes a per-group target size from total #segments and worker count, then delegates.
type SegmentInfo ¶
type SegmentInfo struct {
DateInt int `json:"dateInt"`
Hour string `json:"hour"`
SegmentID int64 `json:"segmentId"`
StartTs int64 `json:"startTs"`
EndTs int64 `json:"endTs"`
EffectiveStartTs int64 `json:"effectiveStartTs"`
EffectiveEndTs int64 `json:"effectiveEndTs"`
ExprID string `json:"exprId"`
OrganizationID uuid.UUID `json:"organizationID"`
InstanceNum int16 `json:"instanceNum"`
Frequency int64 `json:"frequency"`
AggFields []string `json:"aggFields,omitempty"`
}
func (SegmentInfo) Key ¶ added in v1.8.0
func (s SegmentInfo) Key() SegmentKey
Key returns the SegmentKey for this SegmentInfo.
type SegmentKey ¶ added in v1.8.0
SegmentKey is a comparable key for deduplicating segments in maps.
type SegmentLookupFunc ¶
type SegmentLookupFunc func(context.Context, lrdb.ListLogSegmentsForQueryParams) ([]lrdb.ListLogSegmentsForQueryRow, error)
type SegmentWorkerMapping ¶
SegmentWorkerMapping represents the assignment of a segment to a worker
type SeriesSummary ¶ added in v1.11.0
type SeriesSummary struct {
Label string `json:"label"`
Tags map[string]any `json:"tags"`
Min float64 `json:"min"`
Max float64 `json:"max"`
Avg float64 `json:"avg"`
Sum float64 `json:"sum"`
Count int64 `json:"count"`
P50 *float64 `json:"p50,omitempty"`
P90 *float64 `json:"p90,omitempty"`
P95 *float64 `json:"p95,omitempty"`
P99 *float64 `json:"p99,omitempty"`
}
SeriesSummary contains aggregate statistics for a single time series.
type StageCheck ¶ added in v1.4.2
type StageCheck struct {
Name string
SQL string
RowCount int
OK bool
Error error
FieldsExpected []string
MissingFields []string
}
StageCheck summarizes one validation step.
type TraceSegmentLookupFunc ¶ added in v1.4.0
type TraceSegmentLookupFunc func(context.Context, lrdb.ListTraceSegmentsForQueryParams) ([]lrdb.ListTraceSegmentsForQueryRow, error)
type TrigramQuery ¶
type TrigramQuery struct {
Op index.QueryOp
Trigram []string
Sub []*TrigramQuery
// contains filtered or unexported fields
}
func (*TrigramQuery) String ¶ added in v1.7.0
func (t *TrigramQuery) String() string
String returns a string representation of the trigram query for logging
type ValidateOption ¶ added in v1.3.4
type ValidateOption func(*validateConfig)
func WithAggStep ¶ added in v1.3.4
func WithAggStep(step time.Duration) ValidateOption
WithAggStep sets the step used when building aggregate worker SQL; default 10s.
func WithDB ¶ added in v1.3.4
func WithDB(db *sql.DB) ValidateOption
WithDB allows injecting a DB (e.g., for tests). If provided, caller manages its lifecycle.
func WithTable ¶ added in v1.3.4
func WithTable(name string) ValidateOption
WithTable sets the target table name; default "logs".
type ValidateResult ¶ added in v1.3.4
type ValidateResult struct {
WorkerSQL string // the SQL that was executed
StartMillis int64 // resolved start placeholder
EndMillis int64 // resolved end placeholder
InsertedRows int // rows ingested from exemplar
Rows []rowstruct // rows returned from worker SQL
IsAggregate bool // whether query was aggregate path (PromQL rewrite)
}
ValidateResult is what the util returns for assertions in tests and for the API.
func ValidateLogQLAgainstExemplar ¶ added in v1.3.4
func ValidateLogQLAgainstExemplar(ctx context.Context, query string, exemplarData map[string]any, opts ...ValidateOption) (*ValidateResult, error)
type WorkerDiscovery ¶
type WorkerDiscovery interface {
Start(ctx context.Context) error
Stop() error
GetWorkersForSegments(organizationID uuid.UUID, segmentIDs []int64) ([]SegmentWorkerMapping, error)
GetAllWorkers() ([]Worker, error)
}
func CreateWorkerDiscovery ¶
func CreateWorkerDiscovery() (WorkerDiscovery, error)
CreateWorkerDiscovery creates the appropriate WorkerDiscovery implementation based on the EXECUTION_ENVIRONMENT environment variable.
Supported values:
- "local": Creates LocalDevDiscovery for local development
- "kubernetes": Creates KubernetesWorkerDiscovery for Kubernetes environments
- "ecs": Creates EcsWorkerDiscovery for ECS environments
- unset or other values: Returns an error
Source Files
¶
- base_discovery.go
- dates.go
- ddb_harness.go
- discovery.go
- ecs_discovery.go
- eval_flow.go
- kubernetes_discovery.go
- local_dev_discovery.go
- logql_tags_handler.go
- logql_validator.go
- logs_evaluator.go
- logs_series_handler.go
- metrics_evaluator.go
- middleware.go
- promql_tags_handler.go
- promql_validator.go
- querier.go
- querier_service.go
- span_tags_handler.go
- spans_evaluator.go
- sse.go
- stage_wise_validation.go
- tag_values.go
- tag_values_evaluator.go
- time_grouper.go