queryapi

package
v1.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: AGPL-3.0 Imports: 61 Imported by: 0

README

Key Invariants

  1. Groups are time-disjoint and chronological by index. We rely on ComputeReplayBatchesWithWorkers(...) to produce non-overlapping windows and we use the order they are produced as the coordinator index.
  2. Per-leaf start shifting: For each leaf: • baseStart = startTs - RangeMsFromRange(leaf.Range) (if any) • effStart = baseStart - offsetMs(leaf.Offset) • effEnd = endTs - offsetMs(leaf.Offset) This avoids gaps on the left edge for range functions while respecting offsets.
  3. Within a group we merge multiple worker streams by timestamp; across groups we concatenate by index (no global merge).

Stages

            +----------------------------+
            | EvaluateMetricsQuery       |
            | (entry)                    |
            +-------------+--------------+
                          |
                          v
              enumerateAndLaunchGroups
                          |
                          v
            +----------------------------+      (N groups; bounded by sem)
            | launchOneGroup (per group) |-----> per-worker pushdowns
            +----------------------------+                  |
                          |                                 v
                          |                      MergeSorted(worker chans)
                          |                                 |
                          +--------- register (idx) --------+
                                            |
                                            v
                              runOrderedCoordinator (concat by idx)
                                            |
                                            v
                                      EvalFlow.Run
                                            |
                                            v
                                    streamed EvalResult

Plumbing & Concurrency

• maxParallel = max(3, numWorkers) → prevents underutilization on tiny clusters. • sem := make(chan struct{}, maxParallel) bounds group goroutines. • groupRegs := chan groupReg is a registry; each launched group sends one (idx, start, end, mergedChan) to it as soon as ready, not after completion.

Enumerate & Launch

• For each leaf: • Parse offset, compute baseStart using the leaf’s range, then get effStart/effEnd. • Look up segments per hour (dateIntHoursRange), stamp ExprID. • ComputeReplayBatchesWithWorkers(segments, step, effStart, effEnd, /workers/ cap(sem), reverse=true) We intentionally pass cap(sem) as the “workers” hint so batch sizing roughly matches how many groups can run concurrently. • For each group: • Reserve the semaphore. • Launch launchOneGroup.

launchOneGroup (per group)

• Worker assignment via GetWorkersForSegments. • For each worker, call metricsPushDown → returns a channel of SketchInput. • Wrap each channel in shiftTimestamps if the leaf had an offset. • Local merge of worker channels: promql.MergeSorted(ctx, 1024, /reverse/ false, /limit/ 0, ...). • Register immediately: send groupReg{idx, startTs, endTs, mergedCh} to groupRegs.

If anything fails: we still register a closed/empty channel so the coordinator doesn’t stall.

Ordered Coordinator

• runOrderedCoordinator concatenates groups strictly by ascending idx: • Buffers out-of-order registrations in pending[idx]. • When it has want (starting at 0), it drains that group’s channel fully to coordinated. • Then it increments want and repeats. • Assumption: groups are time-disjoint and idx order matches chronology. If your grouping ever overlaps, switch back to a final MergeSorted across groups.

EvalFlow

• Single EvalFlow.Run(ctx, coordinated) consumes the concatenated stream.

Ordering & Streaming Guarantees

• Within a group: strictly timestamp-sorted (due to MergeSorted). • Across groups: streamed as groups finish registration but emitted in idx order. Practically, you start seeing results as soon as idx=0 is registered and begins producing—even while later groups are still downloading/processing. • No global time sort across groups—by design—for lowest latency and fewer buffers. This is why non-overlap of groups is important.

Range & Offset Handling (Why we mutate per leaf)

• PromQL range functions need previous buckets. If we started each leaf at the global startTs, we’d drop early coverage. • We therefore compute baseStart = startTs - range(leaf) per leaf, not mutate the global startTs. • Then apply the offset to produce effStart/effEnd used for segment discovery.

Backpressure & Buffers

• Channels: • Worker→group merge buffers: 1024 • Coordinator output: 4096 • EvalFlow output to caller: 1024 • If downstream (client) is slow, backpressure propagates and naturally slows ingestion.

Cancellation & Errors

• All goroutines select on ctx.Done(). • On failure, a closed empty channel is registered so the coordinator can advance. • EvalFlow loop closes out cleanly on completion or cancellation.

Tuning Knobs

• Parallelism: computeMaxParallel(len(workers)) → tweak rule if needed. • Per-group merge buffer: MergeSorted(..., 1024, ...). • Coordinator buffer: coordinated := make(chan promql.SketchInput, 4096). • Batch sizing in ComputeReplayBatchesWithWorkers (internally uses a target size derived from segment count & worker hint).

When to Add a Final Merge

Add a final MergeSorted across all group streams if any is true: • Groups could overlap in time. • You must guarantee globally time-sorted emission across the entire query range. • You’re willing to trade slightly higher latency & memory for ordering guarantees.

(Our current implementation assumes disjoint groups and prioritizes streaming latency.)

Documentation

Index

Constants

View Source
const (
	DefaultLogStep = 10 * time.Second
)
View Source
const DefaultMaxSegmentsPerWorkerPerWave = 20

DefaultMaxSegmentsPerWorkerPerWave is the default hard cap per worker per wave.

View Source
const (
	DefaultSpansStep = 10 * time.Second
)

Variables

This section is empty.

Functions

func GetMaxSegmentsPerWorkerPerWave added in v1.7.0

func GetMaxSegmentsPerWorkerPerWave() int

GetMaxSegmentsPerWorkerPerWave returns the current max segments per worker per wave setting.

func GetOrgIDFromContext

func GetOrgIDFromContext(ctx context.Context) (uuid.UUID, bool)

GetOrgIDFromContext retrieves the organization ID from the context

func IngestExemplarLogsJSONToDuckDB added in v1.3.4

func IngestExemplarLogsJSONToDuckDB(
	ctx context.Context,
	db *sql.DB,
	tableName string,
	exemplarData map[string]any,
) (int, error)

IngestExemplarLogsJSONToDuckDB ingests a map[string]any exemplar into DuckDB. It returns the number of rows inserted.

func PushDownStream

func PushDownStream[T any](
	ctx context.Context,
	worker Worker,
	request PushDownRequest,
	parse func(typ string, data json.RawMessage) (T, bool, error),
) (<-chan T, error)

func SetMaxSegmentsPerWorkerPerWave added in v1.7.0

func SetMaxSegmentsPerWorkerPerWave(value int)

SetMaxSegmentsPerWorkerPerWave configures the max segments per worker per wave. If value is <= 0, the default is used.

func StepForQueryDuration

func StepForQueryDuration(startMs, endMs int64) time.Duration

func TargetSize

func TargetSize(totalSegments, workers int) int

TargetSize computes the approximate number of segments per SegmentGroup ("wave"). It enforces a hard per-worker-per-wave cap:

waveSegments <= workers * MaxSegmentsPerWorkerPerWave

func ValidateEqualityMatcherRequirement added in v1.4.7

func ValidateEqualityMatcherRequirement(ast logql.LogAST) error

ValidateEqualityMatcherRequirement checks that the LogQL query has at least one equality matcher

func ValidateRangeSelector added in v1.4.8

func ValidateRangeSelector(ast logql.LogAST, expectedDur time.Duration) error

ValidateRangeSelector checks that all range selectors in the LogQL query match the expected duration calculated from the query time range

func ValidateStreamAttributeRequirement added in v1.4.7

func ValidateStreamAttributeRequirement(ast logql.LogAST, streamAttribute string) error

ValidateStreamAttributeRequirement checks that the LogQL query has an equality matcher for the specified stream attribute in all log selectors

func WithOrgID

func WithOrgID(ctx context.Context, orgID uuid.UUID) context.Context

WithOrgID returns a new context with the organization ID stored in it

Types

type APIError added in v1.4.2

type APIError struct {
	Status  int          `json:"status"`
	Code    APIErrorCode `json:"code"`
	Message string       `json:"message"`
}

type APIErrorCode added in v1.4.2

type APIErrorCode string
const (
	InvalidJSON           APIErrorCode = "INVALID_JSON"
	ErrInvalidExpr        APIErrorCode = "INVALID_EXPR"
	ValidationFailed      APIErrorCode = "VALIDATION_FAILED"
	ErrCompileError       APIErrorCode = "COMPILE_ERROR"
	ErrRewriteUnsupported APIErrorCode = "REWRITE_UNSUPPORTED"
	ErrInternalError      APIErrorCode = "INTERNAL_ERROR"
	ErrClientClosed       APIErrorCode = "CLIENT_CLOSED"
	ErrDeadlineExceeded   APIErrorCode = "DEADLINE_EXCEEDED"
	ErrServiceUnavailable APIErrorCode = "SERVICE_UNAVAILABLE"
	ErrForbidden          APIErrorCode = "FORBIDDEN"
	ErrUnauthorized       APIErrorCode = "UNAUTHORIZED"
	ErrNotFound           APIErrorCode = "NOT_FOUND"
	ErrRateLimited        APIErrorCode = "RATE_LIMITED"
	ErrSSEUnsupported     APIErrorCode = "SSE_UNSUPPORTED"
)

type BaseWorkerDiscovery added in v1.3.6

type BaseWorkerDiscovery struct {
	// contains filtered or unexported fields
}

BaseWorkerDiscovery provides common functionality for worker discovery implementations

func (*BaseWorkerDiscovery) GetAllWorkers added in v1.3.6

func (b *BaseWorkerDiscovery) GetAllWorkers() ([]Worker, error)

GetAllWorkers returns a copy of all currently discovered workers

func (*BaseWorkerDiscovery) GetWorkers added in v1.3.6

func (b *BaseWorkerDiscovery) GetWorkers() []Worker

GetWorkers returns a copy of the current workers (for internal use)

func (*BaseWorkerDiscovery) GetWorkersForSegments added in v1.3.6

func (b *BaseWorkerDiscovery) GetWorkersForSegments(organizationID uuid.UUID, segmentIDs []int64) ([]SegmentWorkerMapping, error)

GetWorkersForSegments returns worker assignments for the given segments using consistent hashing

func (*BaseWorkerDiscovery) IsRunning added in v1.3.6

func (b *BaseWorkerDiscovery) IsRunning() bool

IsRunning returns whether the discovery is currently running

func (*BaseWorkerDiscovery) SetRunning added in v1.3.6

func (b *BaseWorkerDiscovery) SetRunning(running bool)

SetRunning sets the running state

func (*BaseWorkerDiscovery) SetWorkers added in v1.3.6

func (b *BaseWorkerDiscovery) SetWorkers(workers []Worker)

SetWorkers updates the worker list in a thread-safe manner

type DateIntHours

type DateIntHours struct {
	DateInt int      // e.g. 20250814
	Hours   []string // "00".."23"
}

type EcsClientInterface added in v1.3.6

type EcsClientInterface interface {
	ListTasks(ctx context.Context, params *ecs.ListTasksInput, optFns ...func(*ecs.Options)) (*ecs.ListTasksOutput, error)
	DescribeTasks(ctx context.Context, params *ecs.DescribeTasksInput, optFns ...func(*ecs.Options)) (*ecs.DescribeTasksOutput, error)
}

EcsClientInterface defines the ECS client methods needed for worker discovery

type EcsWorkerDiscovery added in v1.3.6

type EcsWorkerDiscovery struct {
	BaseWorkerDiscovery
	// contains filtered or unexported fields
}

func NewEcsWorkerDiscovery added in v1.3.6

func NewEcsWorkerDiscovery(cfg EcsWorkerDiscoveryConfig) (*EcsWorkerDiscovery, error)

func NewEcsWorkerDiscoveryWithClient added in v1.3.6

func NewEcsWorkerDiscoveryWithClient(cfg EcsWorkerDiscoveryConfig, ecsClient EcsClientInterface) (*EcsWorkerDiscovery, error)

func (*EcsWorkerDiscovery) Start added in v1.3.6

func (e *EcsWorkerDiscovery) Start(ctx context.Context) error

func (*EcsWorkerDiscovery) Stop added in v1.3.6

func (e *EcsWorkerDiscovery) Stop() error

type EcsWorkerDiscoveryConfig added in v1.3.6

type EcsWorkerDiscoveryConfig struct {
	ServiceName string        // ECS service name for workers
	ClusterName string        // ECS cluster name
	WorkerPort  int           // Port workers listen on (default 8081)
	Interval    time.Duration // Polling interval (default 10s)
}

type EvalFlow

type EvalFlow struct {
	// contains filtered or unexported fields
}

EvalFlow connects a stream of SketchInput -> aggregator -> root.Eval, and returns a channel of evaluated results (one map per flushed time-bucket).

func NewEvalFlow

func NewEvalFlow(
	root promql.ExecNode,
	leaves []promql.BaseExpr,
	step time.Duration,
	opts EvalFlowOptions,
) *EvalFlow

NewEvalFlow builds a flow for a compiled plan. `leaves` are used to build a BaseExpr lookup by ID for the aggregator.

func (*EvalFlow) Run

func (f *EvalFlow) Run(
	ctx context.Context,
	in <-chan promql.SketchInput,
) <-chan map[string]promql.EvalResult

Run consumes a globally merged time-sorted stream of SketchInput and produces a channel of evaluated results (one per flushed time-bucket).

type EvalFlowOptions

type EvalFlowOptions struct {
	// NumBuffers is the number of time-buckets the aggregator keeps before flushing
	// (small ring buffer). 3 is a good default.
	NumBuffers int
	// OutBuffer is the channel buffer size for result maps.
	OutBuffer int
}

EvalFlowOptions tunes buffering / aggregation behavior.

type FieldValidationError added in v1.4.5

type FieldValidationError struct {
	Field   string `json:"field"`
	Context string `json:"context"` // "label_matcher", "group_by", "metric_name", etc.
	Message string `json:"message"`
}

type KubernetesWorkerDiscovery

type KubernetesWorkerDiscovery struct {
	BaseWorkerDiscovery
	// contains filtered or unexported fields
}

func (*KubernetesWorkerDiscovery) Start

func (*KubernetesWorkerDiscovery) Stop

func (k *KubernetesWorkerDiscovery) Stop() error

type KubernetesWorkerDiscoveryConfig

type KubernetesWorkerDiscoveryConfig struct {
	Namespace           string // REQUIRED (or via POD_NAMESPACE)
	WorkerLabelSelector string // REQUIRED, selector applied to Services
	WorkerPort          int    // Fallback when no port found on ES/Service; defaults to 8080
	// If true, include IPv6 endpoints as well.
	AllowIPv6 bool
}

type LocalDevDiscovery

type LocalDevDiscovery struct {
}

func NewLocalDevDiscovery

func NewLocalDevDiscovery() *LocalDevDiscovery

func (*LocalDevDiscovery) GetAllWorkers

func (d *LocalDevDiscovery) GetAllWorkers() ([]Worker, error)

func (*LocalDevDiscovery) GetWorkersForSegments

func (d *LocalDevDiscovery) GetWorkersForSegments(organizationID uuid.UUID, segmentIDs []int64) ([]SegmentWorkerMapping, error)

func (*LocalDevDiscovery) Start

func (d *LocalDevDiscovery) Start(ctx context.Context) error

func (*LocalDevDiscovery) Stop

func (d *LocalDevDiscovery) Stop() error

type LokiSeriesResponse added in v1.6.1

type LokiSeriesResponse struct {
	Status string              `json:"status"`
	Data   []map[string]string `json:"data"`
}

LokiSeriesResponse mimics Loki's /loki/api/v1/series response format. This allows clients to reuse existing Loki-compatible code.

type PushDownRequest

type PushDownRequest struct {
	OrganizationID uuid.UUID     `json:"orgId"`
	StartTs        int64         `json:"startTs"`
	EndTs          int64         `json:"endTs"`
	Step           time.Duration `json:"step"`
	Segments       []SegmentInfo `json:"segments"`

	// dataset specific fields
	BaseExpr *promql.BaseExpr `json:"baseExpr"`
	LogLeaf  *logql.LogLeaf   `json:"logLeaf"`
	Limit    int              `json:"limit"`
	Reverse  bool             `json:"reverse"`
	Fields   []string         `json:"fields,omitempty"`

	TagName  string `json:"tagName"`  // Set this to a tag name to get distinct values for that tag
	TagNames bool   `json:"tagNames"` // Set this to true to get distinct tag names (column names) instead of values
	IsSpans  bool   `json:"isSpans"`  // Set this to true for spans queries

	IsSummary bool `json:"isSummary"` // Set this to true for summary queries that return DDSketch per series
}

PushDownRequest is sent to a worker.

func (*PushDownRequest) ToOrderString

func (p *PushDownRequest) ToOrderString() string

type QuerierService

type QuerierService struct {
	// contains filtered or unexported fields
}

func NewQuerierService

func NewQuerierService(mdb lrdb.StoreFull, workerDiscovery WorkerDiscovery, apiKeyProvider orgapikey.OrganizationAPIKeyProvider) (*QuerierService, error)

func (*QuerierService) EvaluateLogTagNamesQuery added in v1.9.1

func (q *QuerierService) EvaluateLogTagNamesQuery(
	ctx context.Context,
	orgID uuid.UUID,
	startTs int64,
	endTs int64,
	queryPlan logql.LQueryPlan,
) (<-chan promql.Timestamped, error)

EvaluateLogTagNamesQuery queries workers to find distinct tag names (column names) that have at least one non-null value in logs matching the filter criteria. This is used for scoped tag discovery.

func (*QuerierService) EvaluateLogTagValuesQuery

func (q *QuerierService) EvaluateLogTagValuesQuery(
	ctx context.Context,
	orgID uuid.UUID,
	startTs int64,
	endTs int64,
	queryPlan logql.LQueryPlan,
) (<-chan promql.Timestamped, error)

func (*QuerierService) EvaluateLogsQuery

func (q *QuerierService) EvaluateLogsQuery(
	ctx context.Context,
	orgID uuid.UUID,
	startTs, endTs int64,
	reverse bool,
	limit int,
	queryPlan logql.LQueryPlan,
	fields []string,
) (<-chan promql.Timestamped, error)

func (*QuerierService) EvaluateMetricTagNamesQuery added in v1.9.1

func (q *QuerierService) EvaluateMetricTagNamesQuery(
	ctx context.Context,
	orgID uuid.UUID,
	startTs int64,
	endTs int64,
	queryPlan promql.QueryPlan,
) (<-chan promql.Timestamped, error)

EvaluateMetricTagNamesQuery queries workers to find distinct tag names (column names) that have at least one non-null value in metrics matching the filter criteria. This is used for scoped tag discovery.

func (*QuerierService) EvaluateMetricTagValuesQuery

func (q *QuerierService) EvaluateMetricTagValuesQuery(
	ctx context.Context,
	orgID uuid.UUID,
	startTs int64,
	endTs int64,
	queryPlan promql.QueryPlan,
) (<-chan promql.TagValue, error)

func (*QuerierService) EvaluateMetricsQuery

func (q *QuerierService) EvaluateMetricsQuery(
	ctx context.Context,
	orgID uuid.UUID,
	startTs, endTs int64,
	queryPlan promql.QueryPlan,
) (<-chan map[string]promql.EvalResult, error)

func (*QuerierService) EvaluateMetricsSummary added in v1.11.0

func (q *QuerierService) EvaluateMetricsSummary(
	ctx context.Context,
	orgID uuid.UUID,
	startTs, endTs int64,
	queryPlan promql.QueryPlan,
) ([]SeriesSummary, error)

EvaluateMetricsSummary runs a metrics query and aggregates results into per-series summaries. It first tries the optimized DDSketch-based implementation, falling back to the legacy streaming approach if that fails.

func (*QuerierService) EvaluateSpanTagNamesQuery added in v1.9.1

func (q *QuerierService) EvaluateSpanTagNamesQuery(
	ctx context.Context,
	orgID uuid.UUID,
	startTs int64,
	endTs int64,
	queryPlan logql.LQueryPlan,
) (<-chan promql.Timestamped, error)

EvaluateSpanTagNamesQuery queries workers to find distinct tag names (column names) that have at least one non-null value in spans matching the filter criteria. This is used for scoped tag discovery.

func (*QuerierService) EvaluateSpanTagValuesQuery added in v1.4.1

func (q *QuerierService) EvaluateSpanTagValuesQuery(
	ctx context.Context,
	orgID uuid.UUID,
	startTs int64,
	endTs int64,
	queryPlan logql.LQueryPlan,
) (<-chan promql.Timestamped, error)

func (*QuerierService) EvaluateSpansQuery added in v1.4.0

func (q *QuerierService) EvaluateSpansQuery(
	ctx context.Context,
	orgID uuid.UUID,
	startTs, endTs int64,
	reverse bool,
	limit int,
	queryPlan logql.LQueryPlan,
	fields []string,
) (<-chan promql.Timestamped, error)

func (*QuerierService) Run

func (q *QuerierService) Run(doneCtx context.Context) error

type SegmentGroup

type SegmentGroup struct {
	StartTs  int64
	EndTs    int64
	Segments []SegmentInfo
}

func ComputeReplayBatches

func ComputeReplayBatches(
	segments []SegmentInfo,
	step time.Duration,
	queryStartTs, queryEndTs int64,
	targetSize int,
	reverseSort bool,
) []SegmentGroup

func ComputeReplayBatchesWithWorkers

func ComputeReplayBatchesWithWorkers(
	segments []SegmentInfo,
	step time.Duration,
	queryStartTs, queryEndTs int64,
	workers int,
	reverseSort bool,
) []SegmentGroup

ComputeReplayBatchesWithWorkers public entrypoint. Computes a per-group target size from total #segments and worker count, then delegates.

type SegmentInfo

type SegmentInfo struct {
	DateInt          int       `json:"dateInt"`
	Hour             string    `json:"hour"`
	SegmentID        int64     `json:"segmentId"`
	StartTs          int64     `json:"startTs"`
	EndTs            int64     `json:"endTs"`
	EffectiveStartTs int64     `json:"effectiveStartTs"`
	EffectiveEndTs   int64     `json:"effectiveEndTs"`
	ExprID           string    `json:"exprId"`
	OrganizationID   uuid.UUID `json:"organizationID"`
	InstanceNum      int16     `json:"instanceNum"`
	Frequency        int64     `json:"frequency"`
	AggFields        []string  `json:"aggFields,omitempty"`
}

func (SegmentInfo) Key added in v1.8.0

func (s SegmentInfo) Key() SegmentKey

Key returns the SegmentKey for this SegmentInfo.

type SegmentKey added in v1.8.0

type SegmentKey struct {
	DateInt        int
	SegmentID      int64
	InstanceNum    int16
	OrganizationID uuid.UUID
}

SegmentKey is a comparable key for deduplicating segments in maps.

type SegmentWorkerMapping

type SegmentWorkerMapping struct {
	SegmentID int64
	Worker    Worker
}

SegmentWorkerMapping represents the assignment of a segment to a worker

type SeriesSummary added in v1.11.0

type SeriesSummary struct {
	Label string         `json:"label"`
	Tags  map[string]any `json:"tags"`
	Min   float64        `json:"min"`
	Max   float64        `json:"max"`
	Avg   float64        `json:"avg"`
	Sum   float64        `json:"sum"`
	Count int64          `json:"count"`
	P50   *float64       `json:"p50,omitempty"`
	P90   *float64       `json:"p90,omitempty"`
	P95   *float64       `json:"p95,omitempty"`
	P99   *float64       `json:"p99,omitempty"`
}

SeriesSummary contains aggregate statistics for a single time series.

type StageCheck added in v1.4.2

type StageCheck struct {
	Name           string
	SQL            string
	RowCount       int
	OK             bool
	Error          error
	FieldsExpected []string
	MissingFields  []string
}

StageCheck summarizes one validation step.

type TrigramQuery

type TrigramQuery struct {
	Op      index.QueryOp
	Trigram []string
	Sub     []*TrigramQuery
	// contains filtered or unexported fields
}

func (*TrigramQuery) String added in v1.7.0

func (t *TrigramQuery) String() string

String returns a string representation of the trigram query for logging

type ValidateOption added in v1.3.4

type ValidateOption func(*validateConfig)

func WithAggStep added in v1.3.4

func WithAggStep(step time.Duration) ValidateOption

WithAggStep sets the step used when building aggregate worker SQL; default 10s.

func WithDB added in v1.3.4

func WithDB(db *sql.DB) ValidateOption

WithDB allows injecting a DB (e.g., for tests). If provided, caller manages its lifecycle.

func WithTable added in v1.3.4

func WithTable(name string) ValidateOption

WithTable sets the target table name; default "logs".

type ValidateResult added in v1.3.4

type ValidateResult struct {
	WorkerSQL    string      // the SQL that was executed
	StartMillis  int64       // resolved start placeholder
	EndMillis    int64       // resolved end placeholder
	InsertedRows int         // rows ingested from exemplar
	Rows         []rowstruct // rows returned from worker SQL
	IsAggregate  bool        // whether query was aggregate path (PromQL rewrite)
}

ValidateResult is what the util returns for assertions in tests and for the API.

func ValidateLogQLAgainstExemplar added in v1.3.4

func ValidateLogQLAgainstExemplar(ctx context.Context, query string, exemplarData map[string]any, opts ...ValidateOption) (*ValidateResult, error)

type Worker

type Worker struct {
	IP   string
	Port int
}

Worker represents a discovered worker instance

type WorkerDiscovery

type WorkerDiscovery interface {
	Start(ctx context.Context) error
	Stop() error
	GetWorkersForSegments(organizationID uuid.UUID, segmentIDs []int64) ([]SegmentWorkerMapping, error)
	GetAllWorkers() ([]Worker, error)
}

func CreateWorkerDiscovery

func CreateWorkerDiscovery() (WorkerDiscovery, error)

CreateWorkerDiscovery creates the appropriate WorkerDiscovery implementation based on the EXECUTION_ENVIRONMENT environment variable.

Supported values:

  • "local": Creates LocalDevDiscovery for local development
  • "kubernetes": Creates KubernetesWorkerDiscovery for Kubernetes environments
  • "ecs": Creates EcsWorkerDiscovery for ECS environments
  • unset or other values: Returns an error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL