Documentation
¶
Overview ¶
Package xcap provides a utility to capture statistical information about the lifetime of a query.
xcap can be used in two ways, both starting with a Capture:
Standalone: regions and log summaries ¶
Use StartRegion when you only need observation aggregation and structured log output, without creating OTel spans.
// Create a capture to collect observations. ctx, capture := xcap.NewCapture(ctx, nil) defer capture.End() // Create a named region to scope observations. ctx, region := xcap.StartRegion(ctx, "DataObjScan") defer region.End() // Record observations — multiple calls aggregate by statistic. region.Record(xcap.StatDatasetPagesScanned.Observe(1)) region.Record(xcap.StatDatasetPagesScanned.Observe(1)) // pages.scanned is now 2 (sum aggregation) // After the capture ends, summarise all observations as log values. logValues := xcap.SummaryLogValues(capture) level.Info(logger).Log(logValues...)
With OTel tracing: spans, observations and log summaries ¶
Use StartSpan when you want observations flushed as OTel span attributes in addition to the log summary. StartSpan takes a standard trace.Tracer and returns a Span whose End method writes the aggregated observations as span attributes before ending the span. The observations are also registered with the Capture for log summaries.
// Create a capture to collect observations.
ctx, capture := xcap.NewCapture(ctx, nil)
defer capture.End()
// Start a span — this also creates a linked region.
ctx, span := xcap.StartSpan(ctx, otel.Tracer("engine"), "DataObjScan",
trace.WithAttributes(attribute.Int("num_targets", 5)),
)
defer span.End()
// Deep in the call stack, retrieve the region from context.
region := xcap.RegionFromContext(ctx)
region.Record(xcap.StatDatasetPagesScanned.Observe(1))
region.Record(xcap.StatDatasetPagesScanned.Observe(1))
// When span.End() is called:
// 1. pages.scanned=2 is set as a span attribute.
// 2. The observation is also available via SummaryLogValues(capture).
Index ¶
- Variables
- func ContextWithRegion(ctx context.Context, region *Region) context.Context
- func ContextWithSpan(ctx context.Context, span trace.Span) context.Context
- func SummaryLogValues(capture *Capture) []any
- type AggregatedObservation
- type AggregationType
- type Capture
- func (c *Capture) AddRegion(r *Region)
- func (c *Capture) End()
- func (c *Capture) LinkParent(parent *Region)
- func (c *Capture) MarshalBinary() ([]byte, error)
- func (c *Capture) Regions() []*Region
- func (c *Capture) ToStatsSummary(execTime, queueTime time.Duration, totalEntriesReturned int) stats.Result
- func (c *Capture) UnmarshalBinary(data []byte) error
- type DataType
- type ID
- type Observation
- type Region
- type Span
- type Statistic
- type StatisticFlag
- type StatisticFloat64
- type StatisticInt64
- type StatisticKey
Constants ¶
This section is empty.
Variables ¶
var ( StatPipelineRowsOut = NewStatisticInt64("rows.out", AggregationTypeSum) StatPipelineReadCalls = NewStatisticInt64("read.calls", AggregationTypeSum) StatPipelineReadDuration = NewStatisticFloat64("read.duration", AggregationTypeSum) StatPipelineExecDuration = NewStatisticFloat64("exec.duration", AggregationTypeSum) )
Common pipeline statistics tracked across executor nodes.
var ( // Dataset column statistics. StatDatasetPrimaryColumns = NewStatisticInt64("primary.columns", AggregationTypeSum) StatDatasetSecondaryColumns = NewStatisticInt64("secondary.columns", AggregationTypeSum) StatDatasetPrimaryColumnPages = NewStatisticInt64("primary.column.pages", AggregationTypeSum) StatDatasetSecondaryColumnPages = NewStatisticInt64("secondary.column.pages", AggregationTypeSum) // Dataset row statistics. StatDatasetMaxRows = NewStatisticInt64("rows.max", AggregationTypeSum) StatDatasetRowsAfterPruning = NewStatisticInt64("rows.after.pruning", AggregationTypeSum) StatDatasetPrimaryRowsRead = NewStatisticInt64("primary.rows.read", AggregationTypeSum) StatDatasetSecondaryRowsRead = NewStatisticInt64("secondary.rows.read", AggregationTypeSum) StatDatasetPrimaryRowBytes = NewStatisticInt64("primary.row.read.bytes", AggregationTypeSum) StatDatasetSecondaryRowBytes = NewStatisticInt64("secondary.row.read.bytes", AggregationTypeSum) // Dataset page scan statistics. StatDatasetPagesScanned = NewStatisticInt64("pages.scanned", AggregationTypeSum) StatDatasetPagesFoundInCache = NewStatisticInt64("pages.cache.hit", AggregationTypeSum) StatDatasetPageDownloadRequests = NewStatisticInt64("pages.download.requests", AggregationTypeSum) StatDatasetPageDownloadTime = NewStatisticFloat64("pages.download.duration", AggregationTypeSum) // Dataset page download byte statistics. StatDatasetPrimaryPagesDownloaded = NewStatisticInt64("primary.pages.downloaded", AggregationTypeSum) StatDatasetSecondaryPagesDownloaded = NewStatisticInt64("secondary.pages.downloaded", AggregationTypeSum) StatDatasetPrimaryColumnBytes = NewStatisticInt64("primary.pages.compressed.bytes", AggregationTypeSum) StatDatasetSecondaryColumnBytes = NewStatisticInt64("secondary.pages.compressed.bytes", AggregationTypeSum) StatDatasetPrimaryColumnUncompressedBytes = NewStatisticInt64("primary.column.uncompressed.bytes", AggregationTypeSum) StatDatasetSecondaryColumnUncompressedBytes = NewStatisticInt64("secondary.column.uncompressed.bytes", AggregationTypeSum) // Dataset read operation statistics. StatDatasetReadCalls = NewStatisticInt64("dataset.read.calls", AggregationTypeSum) )
var ( StatRangeIOInputCount = NewStatisticInt64("input.ranges", AggregationTypeSum) StatRangeIOInputSize = NewStatisticInt64("input.ranges.size.bytes", AggregationTypeSum) StatRangeIOOptimizedCount = NewStatisticInt64("optimized.ranges", AggregationTypeSum) StatRangeIOOptimizedSize = NewStatisticInt64("optimized.ranges.size.bytes", AggregationTypeSum) StatRangeIOThroughput = NewStatisticFloat64("optimized.ranges.min.throughput", AggregationTypeMin) )
Range IO statistics.
var ( StatBucketGet = NewStatisticInt64("bucket.get", AggregationTypeSum) StatBucketGetRange = NewStatisticInt64("bucket.getrange", AggregationTypeSum) StatBucketIter = NewStatisticInt64("bucket.iter", AggregationTypeSum) StatBucketAttributes = NewStatisticInt64("bucket.attributes", AggregationTypeSum) )
Bucket operation statistics.
var ( StatMetastoreIndexObjects = NewStatisticInt64("metastore.index.objects", AggregationTypeSum) StatMetastoreSectionsResolved = NewStatisticInt64("metastore.sections.resolved", AggregationTypeSum) StatMetastoreStreamsRead = NewStatisticInt64("metastore.sections.streams.read", AggregationTypeSum) StatMetastoreStreamsReadTime = NewStatisticFloat64("metastore.sections.streams.read.duration", AggregationTypeSum) StatMetastoreSectionPointersRead = NewStatisticInt64("metastore.sections.pointers.read", AggregationTypeSum) StatMetastoreSectionPointersReadTime = NewStatisticFloat64("metastore.sections.pointers.read.duration", AggregationTypeSum) )
Metastore statistics.
var ( StatTaskCount = NewStatisticInt64("task.count", AggregationTypeFirst) // Task queue duration (queued to assignment) in seconds. StatTaskMaxQueueDuration = NewStatisticFloat64("task.max.queue.duration", AggregationTypeMax) // Time from workflow start until last task assignment in seconds. StatTaskAssignmentTailDuration = NewStatisticFloat64("task.assignment.tail.duration", AggregationTypeMax) // Time spent waiting for task admission (before being queued) in seconds. StatTaskAdmissionWaitDuration = NewStatisticFloat64("task.admission.wait.duration", AggregationTypeSum) )
Task scheduling statistics.
var ( TaskRecvDuration = NewStatisticFloat64("task.recv.duration", AggregationTypeSum) TaskSendDuration = NewStatisticFloat64("task.send.duration", AggregationTypeSum) TaskRecordsSent = NewStatisticInt64("task.records.sent", AggregationTypeSum) TaskRowsSent = NewStatisticInt64("task.rows.sent", AggregationTypeSum) TaskDrainRecordsReceived = NewStatisticInt64("task.drain.records.received", AggregationTypeSum) TaskBatchingRecordsReceived = NewStatisticInt64("task.batching.records.received", AggregationTypeSum) TaskBatchingRowsReceived = NewStatisticInt64("task.batching.rows.received", AggregationTypeSum) TaskBatchingBatchesProduced = NewStatisticInt64("task.batching.batches.produced", AggregationTypeSum) TaskBatchingRowsWritten = NewStatisticInt64("task.batching.rows.written", AggregationTypeSum) TaskExternalSourcesCount = NewStatisticInt64("task.external.sources.count", AggregationTypeFirst) TaskExternalSinksCount = NewStatisticInt64("task.external.sinks.count", AggregationTypeFirst) )
Task statistics.
var (
StatCompatCollisionFound = NewStatisticFlag("collision.found")
)
ColumnCompat statistics.
var ( // Track number of predicates applied to enforce delete request filtering. StatDeletePredicates = NewStatisticInt64("delete.request.predicates", AggregationTypeFirst) )
Functions ¶
func ContextWithRegion ¶
ContextWithRegion returns a new context with the given Region.
func ContextWithSpan ¶
ContextWithSpan injects span into ctx via trace.ContextWithSpan. If span is an *Span with a linked Region, the Region is also injected so that RegionFromContext returns it downstream.
func SummaryLogValues ¶
SummaryLogValues exports a Capture as a structured log line with aggregated statistics.
Types ¶
type AggregatedObservation ¶
type AggregatedObservation struct {
Statistic Statistic
Value any
Count int // number of observations aggregated
}
AggregatedObservation holds an aggregated value for a statistic within a region.
func (*AggregatedObservation) Bool ¶
func (a *AggregatedObservation) Bool() bool
func (*AggregatedObservation) Float64 ¶
func (a *AggregatedObservation) Float64() (float64, bool)
func (*AggregatedObservation) Int64 ¶
func (a *AggregatedObservation) Int64() (int64, bool)
func (*AggregatedObservation) Merge ¶
func (a *AggregatedObservation) Merge(other *AggregatedObservation)
Merge aggregates another AggregatedObservation into this one.
func (*AggregatedObservation) Record ¶
func (a *AggregatedObservation) Record(obs Observation)
Record aggregates a new observation into this aggregated observation. It updates the value according to the statistic's aggregation type.
type AggregationType ¶
type AggregationType int
AggregationType specifies how to combine multiple observations of the same statistic.
const ( // AggregationTypeInvalid represents an invalid/unspecified aggregation type. AggregationTypeInvalid AggregationType = iota // AggregationTypeSum sums all observations together into a // final value. AggregationTypeSum // AggregationTypeMin uses the smallest value of all observations. AggregationTypeMin // AggregationTypeMax uses the largest value of all observations. AggregationTypeMax // AggregationTypeLast uses the last recorded observation value. AggregationTypeLast // AggregationTypeFirst uses the first recorded observation value. AggregationTypeFirst )
type Capture ¶
type Capture struct {
// contains filtered or unexported fields
}
Capture captures statistical information about the lifetime of a query.
func CaptureFromContext ¶
CaptureFromContext returns the Capture from the context, or nil if no Capture is present.
func NewCapture ¶
NewCapture creates a new Capture and attaches it to the provided context.Context
func (*Capture) AddRegion ¶
AddRegion adds a region to this capture. This is called by Region when it is created.
func (*Capture) End ¶
func (c *Capture) End()
End marks the end of the capture. After End is called, no new Regions can be created from this Capture.
func (*Capture) LinkParent ¶
LinkParent assigns the provided region as the parent to all root regions of the capture.
func (*Capture) MarshalBinary ¶
MarshalBinary implements encoding.BinaryMarshaler for Capture. It serializes the Capture to its protobuf representation and returns the binary data.
func (*Capture) ToStatsSummary ¶
func (c *Capture) ToStatsSummary(execTime, queueTime time.Duration, totalEntriesReturned int) stats.Result
ToStatsSummary computes a stats.Result from observations in the capture.
func (*Capture) UnmarshalBinary ¶
UnmarshalBinary implements encoding.BinaryUnmarshaler for Capture. It deserializes binary data into a Capture from its protobuf representation.
type DataType ¶
type DataType int
DataType specifies the data type of a statistic's values.
const ( // DataTypeInvalid represents an invalid/unspecified data type. DataTypeInvalid DataType = iota // DataTypeInt64 represents a signed 64-bit integer statistic. DataTypeInt64 // DataTypeFloat64 represents a double precision floating point statistic. DataTypeFloat64 // DataTypeBool represents a boolean statistic (flag). DataTypeBool )
type Observation ¶
type Observation interface {
// contains filtered or unexported methods
}
Observation holds a value for a particular statistic. Observations are created from statistics and then recorded into a Region using Region.Record.
type Region ¶
type Region struct {
// contains filtered or unexported fields
}
Region captures the lifetime of a specific operation within a capture.
A Region may be created standalone (via StartRegion) for observation collection only, or paired with an OTel span via [Tracer.Start].
func RegionFromContext ¶
RegionFromContext returns the current Region from the context, or nil if no Region is present.
func StartRegion ¶
StartRegion creates a new Region to record observations for a specific operation. It returns the new Region and a context containing the Region.
The Region is registered with the Capture found in the context. If no Capture is found, it returns the original context and a nil region.
StartRegion does not create an OTel span. Use [Tracer.Start] when both a span and observation aggregation are needed.
func (*Region) End ¶
func (r *Region) End()
End completes the Region. Updates to the Region are ignored after calling End.
func (*Region) Observations ¶
func (r *Region) Observations() []AggregatedObservation
Observations returns all aggregated observations recorded in the region.
func (*Region) Record ¶
func (r *Region) Record(o Observation)
Record records the statistic Observation o into the region. Calling Record multiple times for the same Statistic aggregates values based on the aggregation type of the Statistic.
type Span ¶
Span wraps a trace.Span and an optional Region.
All trace.Span methods are automatically delegated to the inner span via embedding. Only [End] is overridden to flush the Region's aggregated observations as span attributes before ending the inner span.
func StartSpan ¶
func StartSpan(ctx context.Context, t trace.Tracer, name string, opts ...trace.SpanStartOption) (context.Context, *Span)
StartSpan creates a new OTel span using the given tracer and pairs it with a Region for aggregated observation recording.
The returned trace.Span is an xcap.Span whose End method flushes Region observations as span attributes before ending the underlying OTel span.
If a Capture is present in ctx, the Region is registered with it for summary aggregation via SummaryLogValues. If no Capture is found, a span is still created but no Region is attached (observation recording is a no-op).
The Region is stored in the returned context and can be retrieved with RegionFromContext for recording observations.
func (*Span) End ¶
func (s *Span) End(options ...trace.SpanEndOption)
End flushes aggregated observations from the linked Region as span attributes, then ends the underlying OTel span.
If no Region is linked (nil), or the Region has already been ended, End simply ends the inner span.
func (*Span) Record ¶
func (s *Span) Record(observation Observation)
Record records the given observation into the linked Region.
type Statistic ¶
type Statistic interface {
Name() string
DataType() DataType
Aggregation() AggregationType
Key() StatisticKey
}
Statistic is the interface that all statistic types implement.
type StatisticFlag ¶
type StatisticFlag struct {
// contains filtered or unexported fields
}
StatisticFlag is a statistic for bool values.
func NewStatisticFlag ¶
func NewStatisticFlag(name string) *StatisticFlag
NewStatisticFlag creates a new bool statistic (flag) with the given name. Flags always use AggregationTypeMax (true > false).
func (*StatisticFlag) Aggregation ¶
func (s *StatisticFlag) Aggregation() AggregationType
Aggregation returns the aggregation type for this statistic.
func (*StatisticFlag) DataType ¶
func (s *StatisticFlag) DataType() DataType
DataType returns the data type of the statistic.
func (*StatisticFlag) Key ¶
func (s *StatisticFlag) Key() StatisticKey
Key returns a StatisticKey that uniquely identifies this statistic. Statistics with the same definition will have the same key.
func (*StatisticFlag) Name ¶
func (s *StatisticFlag) Name() string
Name returns the name of the statistic.
func (*StatisticFlag) Observe ¶
func (s *StatisticFlag) Observe(value bool) Observation
Observe creates an observation with a bool value.
type StatisticFloat64 ¶
type StatisticFloat64 struct {
// contains filtered or unexported fields
}
StatisticFloat64 is a statistic for float64 values.
func NewStatisticFloat64 ¶
func NewStatisticFloat64(name string, aggregation AggregationType) *StatisticFloat64
NewStatisticFloat64 creates a new float64 statistic with the given name and aggregation.
func (*StatisticFloat64) Aggregation ¶
func (s *StatisticFloat64) Aggregation() AggregationType
Aggregation returns the aggregation type for this statistic.
func (*StatisticFloat64) DataType ¶
func (s *StatisticFloat64) DataType() DataType
DataType returns the data type of the statistic.
func (*StatisticFloat64) Key ¶
func (s *StatisticFloat64) Key() StatisticKey
Key returns a StatisticKey that uniquely identifies this statistic. Statistics with the same definition will have the same key.
func (*StatisticFloat64) Name ¶
func (s *StatisticFloat64) Name() string
Name returns the name of the statistic.
func (*StatisticFloat64) Observe ¶
func (s *StatisticFloat64) Observe(value float64) Observation
Observe creates an observation with a float64 value.
type StatisticInt64 ¶
type StatisticInt64 struct {
// contains filtered or unexported fields
}
StatisticInt64 is a statistic for int64 values.
func NewStatisticInt64 ¶
func NewStatisticInt64(name string, aggregation AggregationType) *StatisticInt64
NewStatisticInt64 creates a new int64 statistic with the given name and aggregation.
func (*StatisticInt64) Aggregation ¶
func (s *StatisticInt64) Aggregation() AggregationType
Aggregation returns the aggregation type for this statistic.
func (*StatisticInt64) DataType ¶
func (s *StatisticInt64) DataType() DataType
DataType returns the data type of the statistic.
func (*StatisticInt64) Key ¶
func (s *StatisticInt64) Key() StatisticKey
Key returns a StatisticKey that uniquely identifies this statistic. Statistics with the same definition will have the same key.
func (*StatisticInt64) Name ¶
func (s *StatisticInt64) Name() string
Name returns the name of the statistic.
func (*StatisticInt64) Observe ¶
func (s *StatisticInt64) Observe(value int64) Observation
Observe creates an observation with an int64 value.
type StatisticKey ¶
type StatisticKey struct {
Name string
DataType DataType
Aggregation AggregationType
}
StatisticKey is a comparable struct that uniquely identifies a statistic by its definition (name, data type, and aggregation type).