xcap

package
v3.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Overview

Package xcap provides a utility to capture statistical information about the lifetime of a query.

xcap can be used in two ways, both starting with a Capture:

Standalone: regions and log summaries

Use StartRegion when you only need observation aggregation and structured log output, without creating OTel spans.

// Create a capture to collect observations.
ctx, capture := xcap.NewCapture(ctx, nil)
defer capture.End()

// Create a named region to scope observations.
ctx, region := xcap.StartRegion(ctx, "DataObjScan")
defer region.End()

// Record observations — multiple calls aggregate by statistic.
region.Record(xcap.StatDatasetPagesScanned.Observe(1))
region.Record(xcap.StatDatasetPagesScanned.Observe(1))
// pages.scanned is now 2 (sum aggregation)

// After the capture ends, summarise all observations as log values.
logValues := xcap.SummaryLogValues(capture)
level.Info(logger).Log(logValues...)

With OTel tracing: spans, observations and log summaries

Use StartSpan when you want observations flushed as OTel span attributes in addition to the log summary. StartSpan takes a standard trace.Tracer and returns a Span whose End method writes the aggregated observations as span attributes before ending the span. The observations are also registered with the Capture for log summaries.

// Create a capture to collect observations.
ctx, capture := xcap.NewCapture(ctx, nil)
defer capture.End()

// Start a span — this also creates a linked region.
ctx, span := xcap.StartSpan(ctx, otel.Tracer("engine"), "DataObjScan",
    trace.WithAttributes(attribute.Int("num_targets", 5)),
)
defer span.End()

// Deep in the call stack, retrieve the region from context.
region := xcap.RegionFromContext(ctx)
region.Record(xcap.StatDatasetPagesScanned.Observe(1))
region.Record(xcap.StatDatasetPagesScanned.Observe(1))
// When span.End() is called:
//   1. pages.scanned=2 is set as a span attribute.
//   2. The observation is also available via SummaryLogValues(capture).

Index

Constants

This section is empty.

Variables

View Source
var (
	StatPipelineRowsOut      = NewStatisticInt64("rows.out", AggregationTypeSum)
	StatPipelineReadCalls    = NewStatisticInt64("read.calls", AggregationTypeSum)
	StatPipelineReadDuration = NewStatisticFloat64("read.duration", AggregationTypeSum)
	StatPipelineExecDuration = NewStatisticFloat64("exec.duration", AggregationTypeSum)
)

Common pipeline statistics tracked across executor nodes.

View Source
var (
	// Dataset column statistics.
	StatDatasetPrimaryColumns       = NewStatisticInt64("primary.columns", AggregationTypeSum)
	StatDatasetSecondaryColumns     = NewStatisticInt64("secondary.columns", AggregationTypeSum)
	StatDatasetPrimaryColumnPages   = NewStatisticInt64("primary.column.pages", AggregationTypeSum)
	StatDatasetSecondaryColumnPages = NewStatisticInt64("secondary.column.pages", AggregationTypeSum)

	// Dataset row statistics.
	StatDatasetMaxRows           = NewStatisticInt64("rows.max", AggregationTypeSum)
	StatDatasetRowsAfterPruning  = NewStatisticInt64("rows.after.pruning", AggregationTypeSum)
	StatDatasetPrimaryRowsRead   = NewStatisticInt64("primary.rows.read", AggregationTypeSum)
	StatDatasetSecondaryRowsRead = NewStatisticInt64("secondary.rows.read", AggregationTypeSum)
	StatDatasetPrimaryRowBytes   = NewStatisticInt64("primary.row.read.bytes", AggregationTypeSum)
	StatDatasetSecondaryRowBytes = NewStatisticInt64("secondary.row.read.bytes", AggregationTypeSum)

	// Dataset page scan statistics.
	StatDatasetPagesScanned         = NewStatisticInt64("pages.scanned", AggregationTypeSum)
	StatDatasetPagesFoundInCache    = NewStatisticInt64("pages.cache.hit", AggregationTypeSum)
	StatDatasetPageDownloadRequests = NewStatisticInt64("pages.download.requests", AggregationTypeSum)
	StatDatasetPageDownloadTime     = NewStatisticFloat64("pages.download.duration", AggregationTypeSum)

	// Dataset page download byte statistics.
	StatDatasetPrimaryPagesDownloaded           = NewStatisticInt64("primary.pages.downloaded", AggregationTypeSum)
	StatDatasetSecondaryPagesDownloaded         = NewStatisticInt64("secondary.pages.downloaded", AggregationTypeSum)
	StatDatasetPrimaryColumnBytes               = NewStatisticInt64("primary.pages.compressed.bytes", AggregationTypeSum)
	StatDatasetSecondaryColumnBytes             = NewStatisticInt64("secondary.pages.compressed.bytes", AggregationTypeSum)
	StatDatasetPrimaryColumnUncompressedBytes   = NewStatisticInt64("primary.column.uncompressed.bytes", AggregationTypeSum)
	StatDatasetSecondaryColumnUncompressedBytes = NewStatisticInt64("secondary.column.uncompressed.bytes", AggregationTypeSum)

	// Dataset read operation statistics.
	StatDatasetReadCalls = NewStatisticInt64("dataset.read.calls", AggregationTypeSum)
)
View Source
var (
	StatRangeIOInputCount     = NewStatisticInt64("input.ranges", AggregationTypeSum)
	StatRangeIOInputSize      = NewStatisticInt64("input.ranges.size.bytes", AggregationTypeSum)
	StatRangeIOOptimizedCount = NewStatisticInt64("optimized.ranges", AggregationTypeSum)
	StatRangeIOOptimizedSize  = NewStatisticInt64("optimized.ranges.size.bytes", AggregationTypeSum)
	StatRangeIOThroughput     = NewStatisticFloat64("optimized.ranges.min.throughput", AggregationTypeMin)
)

Range IO statistics.

View Source
var (
	StatBucketGet        = NewStatisticInt64("bucket.get", AggregationTypeSum)
	StatBucketGetRange   = NewStatisticInt64("bucket.getrange", AggregationTypeSum)
	StatBucketIter       = NewStatisticInt64("bucket.iter", AggregationTypeSum)
	StatBucketAttributes = NewStatisticInt64("bucket.attributes", AggregationTypeSum)
)

Bucket operation statistics.

View Source
var (
	StatMetastoreIndexObjects            = NewStatisticInt64("metastore.index.objects", AggregationTypeSum)
	StatMetastoreSectionsResolved        = NewStatisticInt64("metastore.sections.resolved", AggregationTypeSum)
	StatMetastoreStreamsRead             = NewStatisticInt64("metastore.sections.streams.read", AggregationTypeSum)
	StatMetastoreStreamsReadTime         = NewStatisticFloat64("metastore.sections.streams.read.duration", AggregationTypeSum)
	StatMetastoreSectionPointersRead     = NewStatisticInt64("metastore.sections.pointers.read", AggregationTypeSum)
	StatMetastoreSectionPointersReadTime = NewStatisticFloat64("metastore.sections.pointers.read.duration", AggregationTypeSum)
)

Metastore statistics.

View Source
var (
	StatTaskCount = NewStatisticInt64("task.count", AggregationTypeFirst)

	// Task queue duration (queued to assignment) in seconds.
	StatTaskMaxQueueDuration = NewStatisticFloat64("task.max.queue.duration", AggregationTypeMax)

	// Time from workflow start until last task assignment in seconds.
	StatTaskAssignmentTailDuration = NewStatisticFloat64("task.assignment.tail.duration", AggregationTypeMax)

	// Time spent waiting for task admission (before being queued) in seconds.
	StatTaskAdmissionWaitDuration = NewStatisticFloat64("task.admission.wait.duration", AggregationTypeSum)
)

Task scheduling statistics.

View Source
var (
	TaskRecvDuration            = NewStatisticFloat64("task.recv.duration", AggregationTypeSum)
	TaskSendDuration            = NewStatisticFloat64("task.send.duration", AggregationTypeSum)
	TaskRecordsSent             = NewStatisticInt64("task.records.sent", AggregationTypeSum)
	TaskRowsSent                = NewStatisticInt64("task.rows.sent", AggregationTypeSum)
	TaskDrainRecordsReceived    = NewStatisticInt64("task.drain.records.received", AggregationTypeSum)
	TaskBatchingRecordsReceived = NewStatisticInt64("task.batching.records.received", AggregationTypeSum)
	TaskBatchingRowsReceived    = NewStatisticInt64("task.batching.rows.received", AggregationTypeSum)
	TaskBatchingBatchesProduced = NewStatisticInt64("task.batching.batches.produced", AggregationTypeSum)
	TaskBatchingRowsWritten     = NewStatisticInt64("task.batching.rows.written", AggregationTypeSum)
	TaskExternalSourcesCount    = NewStatisticInt64("task.external.sources.count", AggregationTypeFirst)
	TaskExternalSinksCount      = NewStatisticInt64("task.external.sinks.count", AggregationTypeFirst)
)

Task statistics.

View Source
var (
	StatCompatCollisionFound = NewStatisticFlag("collision.found")
)

ColumnCompat statistics.

View Source
var (
	// Track number of predicates applied to enforce delete request filtering.
	StatDeletePredicates = NewStatisticInt64("delete.request.predicates", AggregationTypeFirst)
)

Functions

func ContextWithRegion

func ContextWithRegion(ctx context.Context, region *Region) context.Context

ContextWithRegion returns a new context with the given Region.

func ContextWithSpan

func ContextWithSpan(ctx context.Context, span trace.Span) context.Context

ContextWithSpan injects span into ctx via trace.ContextWithSpan. If span is an *Span with a linked Region, the Region is also injected so that RegionFromContext returns it downstream.

func SummaryLogValues

func SummaryLogValues(capture *Capture) []any

SummaryLogValues exports a Capture as a structured log line with aggregated statistics.

Types

type AggregatedObservation

type AggregatedObservation struct {
	Statistic Statistic
	Value     any
	Count     int // number of observations aggregated
}

AggregatedObservation holds an aggregated value for a statistic within a region.

func (*AggregatedObservation) Bool

func (a *AggregatedObservation) Bool() bool

func (*AggregatedObservation) Float64

func (a *AggregatedObservation) Float64() (float64, bool)

func (*AggregatedObservation) Int64

func (a *AggregatedObservation) Int64() (int64, bool)

func (*AggregatedObservation) Merge

Merge aggregates another AggregatedObservation into this one.

func (*AggregatedObservation) Record

func (a *AggregatedObservation) Record(obs Observation)

Record aggregates a new observation into this aggregated observation. It updates the value according to the statistic's aggregation type.

type AggregationType

type AggregationType int

AggregationType specifies how to combine multiple observations of the same statistic.

const (
	// AggregationTypeInvalid represents an invalid/unspecified aggregation type.
	AggregationTypeInvalid AggregationType = iota
	// AggregationTypeSum sums all observations together into a
	// final value.
	AggregationTypeSum
	// AggregationTypeMin uses the smallest value of all observations.
	AggregationTypeMin
	// AggregationTypeMax uses the largest value of all observations.
	AggregationTypeMax
	// AggregationTypeLast uses the last recorded observation value.
	AggregationTypeLast
	// AggregationTypeFirst uses the first recorded observation value.
	AggregationTypeFirst
)

type Capture

type Capture struct {
	// contains filtered or unexported fields
}

Capture captures statistical information about the lifetime of a query.

func CaptureFromContext

func CaptureFromContext(ctx context.Context) *Capture

CaptureFromContext returns the Capture from the context, or nil if no Capture is present.

func NewCapture

func NewCapture(ctx context.Context, attributes []attribute.KeyValue) (context.Context, *Capture)

NewCapture creates a new Capture and attaches it to the provided context.Context

func (*Capture) AddRegion

func (c *Capture) AddRegion(r *Region)

AddRegion adds a region to this capture. This is called by Region when it is created.

func (*Capture) End

func (c *Capture) End()

End marks the end of the capture. After End is called, no new Regions can be created from this Capture.

func (*Capture) LinkParent

func (c *Capture) LinkParent(parent *Region)

LinkParent assigns the provided region as the parent to all root regions of the capture.

func (*Capture) MarshalBinary

func (c *Capture) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler for Capture. It serializes the Capture to its protobuf representation and returns the binary data.

func (*Capture) Regions

func (c *Capture) Regions() []*Region

Regions returns all regions in this capture.

func (*Capture) ToStatsSummary

func (c *Capture) ToStatsSummary(execTime, queueTime time.Duration, totalEntriesReturned int) stats.Result

ToStatsSummary computes a stats.Result from observations in the capture.

func (*Capture) UnmarshalBinary

func (c *Capture) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler for Capture. It deserializes binary data into a Capture from its protobuf representation.

type DataType

type DataType int

DataType specifies the data type of a statistic's values.

const (
	// DataTypeInvalid represents an invalid/unspecified data type.
	DataTypeInvalid DataType = iota
	// DataTypeInt64 represents a signed 64-bit integer statistic.
	DataTypeInt64
	// DataTypeFloat64 represents a double precision floating point statistic.
	DataTypeFloat64
	// DataTypeBool represents a boolean statistic (flag).
	DataTypeBool
)

type ID

type ID = identifier

ID is an exported alias for identifier.

func NewID

func NewID() ID

NewID returns a new random ID. The ID is guaranteed to be non-zero. This is the exported version of newID for use in other packages.

type Observation

type Observation interface {
	// contains filtered or unexported methods
}

Observation holds a value for a particular statistic. Observations are created from statistics and then recorded into a Region using Region.Record.

type Region

type Region struct {
	// contains filtered or unexported fields
}

Region captures the lifetime of a specific operation within a capture.

A Region may be created standalone (via StartRegion) for observation collection only, or paired with an OTel span via [Tracer.Start].

func RegionFromContext

func RegionFromContext(ctx context.Context) *Region

RegionFromContext returns the current Region from the context, or nil if no Region is present.

func StartRegion

func StartRegion(ctx context.Context, name string) (context.Context, *Region)

StartRegion creates a new Region to record observations for a specific operation. It returns the new Region and a context containing the Region.

The Region is registered with the Capture found in the context. If no Capture is found, it returns the original context and a nil region.

StartRegion does not create an OTel span. Use [Tracer.Start] when both a span and observation aggregation are needed.

func (*Region) End

func (r *Region) End()

End completes the Region. Updates to the Region are ignored after calling End.

func (*Region) Observations

func (r *Region) Observations() []AggregatedObservation

Observations returns all aggregated observations recorded in the region.

func (*Region) Record

func (r *Region) Record(o Observation)

Record records the statistic Observation o into the region. Calling Record multiple times for the same Statistic aggregates values based on the aggregation type of the Statistic.

type Span

type Span struct {
	trace.Span
	// contains filtered or unexported fields
}

Span wraps a trace.Span and an optional Region.

All trace.Span methods are automatically delegated to the inner span via embedding. Only [End] is overridden to flush the Region's aggregated observations as span attributes before ending the inner span.

func StartSpan

func StartSpan(ctx context.Context, t trace.Tracer, name string, opts ...trace.SpanStartOption) (context.Context, *Span)

StartSpan creates a new OTel span using the given tracer and pairs it with a Region for aggregated observation recording.

The returned trace.Span is an xcap.Span whose End method flushes Region observations as span attributes before ending the underlying OTel span.

If a Capture is present in ctx, the Region is registered with it for summary aggregation via SummaryLogValues. If no Capture is found, a span is still created but no Region is attached (observation recording is a no-op).

The Region is stored in the returned context and can be retrieved with RegionFromContext for recording observations.

func (*Span) End

func (s *Span) End(options ...trace.SpanEndOption)

End flushes aggregated observations from the linked Region as span attributes, then ends the underlying OTel span.

If no Region is linked (nil), or the Region has already been ended, End simply ends the inner span.

func (*Span) Record

func (s *Span) Record(observation Observation)

Record records the given observation into the linked Region.

func (*Span) Region

func (s *Span) Region() *Region

Region returns the linked Region, or nil if no Region is attached.

type Statistic

type Statistic interface {
	Name() string
	DataType() DataType
	Aggregation() AggregationType
	Key() StatisticKey
}

Statistic is the interface that all statistic types implement.

type StatisticFlag

type StatisticFlag struct {
	// contains filtered or unexported fields
}

StatisticFlag is a statistic for bool values.

func NewStatisticFlag

func NewStatisticFlag(name string) *StatisticFlag

NewStatisticFlag creates a new bool statistic (flag) with the given name. Flags always use AggregationTypeMax (true > false).

func (*StatisticFlag) Aggregation

func (s *StatisticFlag) Aggregation() AggregationType

Aggregation returns the aggregation type for this statistic.

func (*StatisticFlag) DataType

func (s *StatisticFlag) DataType() DataType

DataType returns the data type of the statistic.

func (*StatisticFlag) Key

func (s *StatisticFlag) Key() StatisticKey

Key returns a StatisticKey that uniquely identifies this statistic. Statistics with the same definition will have the same key.

func (*StatisticFlag) Name

func (s *StatisticFlag) Name() string

Name returns the name of the statistic.

func (*StatisticFlag) Observe

func (s *StatisticFlag) Observe(value bool) Observation

Observe creates an observation with a bool value.

type StatisticFloat64

type StatisticFloat64 struct {
	// contains filtered or unexported fields
}

StatisticFloat64 is a statistic for float64 values.

func NewStatisticFloat64

func NewStatisticFloat64(name string, aggregation AggregationType) *StatisticFloat64

NewStatisticFloat64 creates a new float64 statistic with the given name and aggregation.

func (*StatisticFloat64) Aggregation

func (s *StatisticFloat64) Aggregation() AggregationType

Aggregation returns the aggregation type for this statistic.

func (*StatisticFloat64) DataType

func (s *StatisticFloat64) DataType() DataType

DataType returns the data type of the statistic.

func (*StatisticFloat64) Key

func (s *StatisticFloat64) Key() StatisticKey

Key returns a StatisticKey that uniquely identifies this statistic. Statistics with the same definition will have the same key.

func (*StatisticFloat64) Name

func (s *StatisticFloat64) Name() string

Name returns the name of the statistic.

func (*StatisticFloat64) Observe

func (s *StatisticFloat64) Observe(value float64) Observation

Observe creates an observation with a float64 value.

type StatisticInt64

type StatisticInt64 struct {
	// contains filtered or unexported fields
}

StatisticInt64 is a statistic for int64 values.

func NewStatisticInt64

func NewStatisticInt64(name string, aggregation AggregationType) *StatisticInt64

NewStatisticInt64 creates a new int64 statistic with the given name and aggregation.

func (*StatisticInt64) Aggregation

func (s *StatisticInt64) Aggregation() AggregationType

Aggregation returns the aggregation type for this statistic.

func (*StatisticInt64) DataType

func (s *StatisticInt64) DataType() DataType

DataType returns the data type of the statistic.

func (*StatisticInt64) Key

func (s *StatisticInt64) Key() StatisticKey

Key returns a StatisticKey that uniquely identifies this statistic. Statistics with the same definition will have the same key.

func (*StatisticInt64) Name

func (s *StatisticInt64) Name() string

Name returns the name of the statistic.

func (*StatisticInt64) Observe

func (s *StatisticInt64) Observe(value int64) Observation

Observe creates an observation with an int64 value.

type StatisticKey

type StatisticKey struct {
	Name        string
	DataType    DataType
	Aggregation AggregationType
}

StatisticKey is a comparable struct that uniquely identifies a statistic by its definition (name, data type, and aggregation type).

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL