Documentation
¶
Overview ¶
Package aggregate provides point aggregation and tail-sampling helpers.
Index ¶
- Constants
- Variables
- func AlignNextWallTime(t time.Time, align time.Duration) int64
- func HashCombine(seed, hash uint64) uint64
- func HashToken(token string, hash64 uint64) uint64
- func PickTrace(source string, pts []*point.Point, version int64) map[uint64]*DataPacket
- func SampleStdDev(data []float64) (float64, error)
- func SetLogging(log *logger.Logger)
- type Action
- type AggregateRule
- type AggregationAlgo
- func (*AggregationAlgo) Descriptor() ([]byte, []int)
- func (this *AggregationAlgo) Equal(that interface{}) bool
- func (m *AggregationAlgo) GetAddTags() map[string]string
- func (m *AggregationAlgo) GetExpoOpts() *ExpoHistogramOptions
- func (m *AggregationAlgo) GetHistogramOpts() *HistogramOptions
- func (m *AggregationAlgo) GetMethod() string
- func (m *AggregationAlgo) GetOptions() isAggregationAlgo_Options
- func (m *AggregationAlgo) GetQuantileOpts() *QuantileOptions
- func (m *AggregationAlgo) GetSourceField() string
- func (m *AggregationAlgo) GetWindow() int64
- func (this *AggregationAlgo) GoString() string
- func (m *AggregationAlgo) Marshal() (dAtA []byte, err error)
- func (m *AggregationAlgo) MarshalTo(dAtA []byte) (int, error)
- func (m *AggregationAlgo) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*AggregationAlgo) ProtoMessage()
- func (m *AggregationAlgo) Reset()
- func (m *AggregationAlgo) Size() (n int)
- func (this *AggregationAlgo) String() string
- func (m *AggregationAlgo) Unmarshal(dAtA []byte) error
- func (m *AggregationAlgo) XXX_DiscardUnknown()
- func (m *AggregationAlgo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *AggregationAlgo) XXX_Merge(src proto.Message)
- func (*AggregationAlgo) XXX_OneofWrappers() []interface{}
- func (m *AggregationAlgo) XXX_Size() int
- func (m *AggregationAlgo) XXX_Unmarshal(b []byte) error
- type AggregationAlgoConfig
- type AggregationAlgo_ExpoOpts
- func (this *AggregationAlgo_ExpoOpts) Equal(that interface{}) bool
- func (this *AggregationAlgo_ExpoOpts) GoString() string
- func (m *AggregationAlgo_ExpoOpts) MarshalTo(dAtA []byte) (int, error)
- func (m *AggregationAlgo_ExpoOpts) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (m *AggregationAlgo_ExpoOpts) Size() (n int)
- func (this *AggregationAlgo_ExpoOpts) String() string
- type AggregationAlgo_HistogramOpts
- func (this *AggregationAlgo_HistogramOpts) Equal(that interface{}) bool
- func (this *AggregationAlgo_HistogramOpts) GoString() string
- func (m *AggregationAlgo_HistogramOpts) MarshalTo(dAtA []byte) (int, error)
- func (m *AggregationAlgo_HistogramOpts) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (m *AggregationAlgo_HistogramOpts) Size() (n int)
- func (this *AggregationAlgo_HistogramOpts) String() string
- type AggregationAlgo_QuantileOpts
- func (this *AggregationAlgo_QuantileOpts) Equal(that interface{}) bool
- func (this *AggregationAlgo_QuantileOpts) GoString() string
- func (m *AggregationAlgo_QuantileOpts) MarshalTo(dAtA []byte) (int, error)
- func (m *AggregationAlgo_QuantileOpts) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (m *AggregationAlgo_QuantileOpts) Size() (n int)
- func (this *AggregationAlgo_QuantileOpts) String() string
- type AggregationBatch
- func (*AggregationBatch) Descriptor() ([]byte, []int)
- func (this *AggregationBatch) Equal(that interface{}) bool
- func (m *AggregationBatch) GetAggregationOpts() map[string]*AggregationAlgo
- func (m *AggregationBatch) GetConfigHash() uint64
- func (m *AggregationBatch) GetPickKey() uint64
- func (m *AggregationBatch) GetPoints() *point.PBPoints
- func (m *AggregationBatch) GetRawConfig() []byte
- func (m *AggregationBatch) GetRoutingKey() uint64
- func (this *AggregationBatch) GoString() string
- func (m *AggregationBatch) Marshal() (dAtA []byte, err error)
- func (m *AggregationBatch) MarshalTo(dAtA []byte) (int, error)
- func (m *AggregationBatch) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*AggregationBatch) ProtoMessage()
- func (m *AggregationBatch) Reset()
- func (m *AggregationBatch) Size() (n int)
- func (this *AggregationBatch) String() string
- func (m *AggregationBatch) Unmarshal(dAtA []byte) error
- func (m *AggregationBatch) XXX_DiscardUnknown()
- func (m *AggregationBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *AggregationBatch) XXX_Merge(src proto.Message)
- func (m *AggregationBatch) XXX_Size() int
- func (m *AggregationBatch) XXX_Unmarshal(b []byte) error
- type AggregatorConfigure
- func (ac *AggregatorConfigure) PickPoints(category string, pts []*point.Point) map[uint64]*Batchs
- func (ac *AggregatorConfigure) SelectPoints(pts []*point.Point) (groups [][]*point.Point)
- func (ac *AggregatorConfigure) Setup() error
- func (ac *AggregatorConfigure) UnmarshalTOML(data interface{}) error
- type AlgoMethod
- type Batchs
- func (*Batchs) Descriptor() ([]byte, []int)
- func (this *Batchs) Equal(that interface{}) bool
- func (m *Batchs) GetBatchs() []*AggregationBatch
- func (m *Batchs) GetPickKey() uint64
- func (this *Batchs) GoString() string
- func (m *Batchs) Marshal() (dAtA []byte, err error)
- func (m *Batchs) MarshalTo(dAtA []byte) (int, error)
- func (m *Batchs) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Batchs) ProtoMessage()
- func (m *Batchs) Reset()
- func (m *Batchs) Size() (n int)
- func (this *Batchs) String() string
- func (m *Batchs) Unmarshal(dAtA []byte) error
- func (m *Batchs) XXX_DiscardUnknown()
- func (m *Batchs) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Batchs) XXX_Merge(src proto.Message)
- func (m *Batchs) XXX_Size() int
- func (m *Batchs) XXX_Unmarshal(b []byte) error
- type BuiltinMetricCfg
- type Cache
- type Calculator
- type DataBatch
- func (*DataBatch) Descriptor() ([]byte, []int)
- func (this *DataBatch) Equal(that interface{}) bool
- func (m *DataBatch) GetCollectorId() string
- func (m *DataBatch) GetPackets() []*DataPacket
- func (this *DataBatch) GoString() string
- func (m *DataBatch) Marshal() (dAtA []byte, err error)
- func (m *DataBatch) MarshalTo(dAtA []byte) (int, error)
- func (m *DataBatch) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*DataBatch) ProtoMessage()
- func (m *DataBatch) Reset()
- func (m *DataBatch) Size() (n int)
- func (this *DataBatch) String() string
- func (m *DataBatch) Unmarshal(dAtA []byte) error
- func (m *DataBatch) XXX_DiscardUnknown()
- func (m *DataBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *DataBatch) XXX_Merge(src proto.Message)
- func (m *DataBatch) XXX_Size() int
- func (m *DataBatch) XXX_Unmarshal(b []byte) error
- type DataGroup
- type DataPacket
- func (*DataPacket) Descriptor() ([]byte, []int)
- func (this *DataPacket) Equal(that interface{}) bool
- func (m *DataPacket) GetConfigVersion() int64
- func (m *DataPacket) GetDataType() string
- func (m *DataPacket) GetGroupIdHash() uint64
- func (m *DataPacket) GetGroupKey() string
- func (m *DataPacket) GetHasError() bool
- func (m *DataPacket) GetMaxPointTimeUnixNano() int64
- func (m *DataPacket) GetPointCount() int32
- func (m *DataPacket) GetPointsPayload() []byte
- func (m *DataPacket) GetRawGroupId() string
- func (m *DataPacket) GetSource() string
- func (m *DataPacket) GetToken() string
- func (m *DataPacket) GetTraceEndTimeUnixNano() int64
- func (m *DataPacket) GetTraceStartTimeUnixNano() int64
- func (this *DataPacket) GoString() string
- func (m *DataPacket) Marshal() (dAtA []byte, err error)
- func (m *DataPacket) MarshalTo(dAtA []byte) (int, error)
- func (m *DataPacket) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*DataPacket) ProtoMessage()
- func (m *DataPacket) Reset()
- func (m *DataPacket) Size() (n int)
- func (this *DataPacket) String() string
- func (m *DataPacket) Unmarshal(dAtA []byte) error
- func (packet *DataPacket) WalkRawPBPoints(fn func([]byte) bool) error
- func (m *DataPacket) XXX_DiscardUnknown()
- func (m *DataPacket) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *DataPacket) XXX_Merge(src proto.Message)
- func (m *DataPacket) XXX_Size() int
- func (m *DataPacket) XXX_Unmarshal(b []byte) error
- type DerivedMetric
- type DerivedMetricCollector
- type DerivedMetricDecision
- type DerivedMetricKind
- type DerivedMetricPoints
- type DerivedMetricRecord
- type DerivedMetricStage
- type ExpoHistogramOptions
- func (*ExpoHistogramOptions) Descriptor() ([]byte, []int)
- func (this *ExpoHistogramOptions) Equal(that interface{}) bool
- func (m *ExpoHistogramOptions) GetMaxBuckets() int32
- func (m *ExpoHistogramOptions) GetMaxScale() int32
- func (m *ExpoHistogramOptions) GetRecordMinMax() bool
- func (this *ExpoHistogramOptions) GoString() string
- func (m *ExpoHistogramOptions) Marshal() (dAtA []byte, err error)
- func (m *ExpoHistogramOptions) MarshalTo(dAtA []byte) (int, error)
- func (m *ExpoHistogramOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*ExpoHistogramOptions) ProtoMessage()
- func (m *ExpoHistogramOptions) Reset()
- func (m *ExpoHistogramOptions) Size() (n int)
- func (this *ExpoHistogramOptions) String() string
- func (m *ExpoHistogramOptions) Unmarshal(dAtA []byte) error
- func (m *ExpoHistogramOptions) XXX_DiscardUnknown()
- func (m *ExpoHistogramOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ExpoHistogramOptions) XXX_Merge(src proto.Message)
- func (m *ExpoHistogramOptions) XXX_Size() int
- func (m *ExpoHistogramOptions) XXX_Unmarshal(b []byte) error
- type GlobalSampler
- func (s *GlobalSampler) AdvanceTime() map[uint64]*DataGroup
- func (s *GlobalSampler) GetLoggingConfig(token string) *LoggingTailSampling
- func (s *GlobalSampler) GetRUMConfig(token string) *RUMTailSampling
- func (s *GlobalSampler) GetTraceConfig(token string) *TraceTailSampling
- func (s *GlobalSampler) Ingest(packet *DataPacket)
- func (s *GlobalSampler) TailSamplingData(dataGroups map[uint64]*DataGroup) map[uint64]*DataPacket
- func (s *GlobalSampler) TailSamplingOutcomes(dataGroups map[uint64]*DataGroup) map[uint64]*TailSamplingOutcome
- func (s *GlobalSampler) UpdateConfig(token string, ts *TailSamplingConfigs) error
- type HistogramOptions
- func (*HistogramOptions) Descriptor() ([]byte, []int)
- func (this *HistogramOptions) Equal(that interface{}) bool
- func (m *HistogramOptions) GetBuckets() []float64
- func (this *HistogramOptions) GoString() string
- func (m *HistogramOptions) Marshal() (dAtA []byte, err error)
- func (m *HistogramOptions) MarshalTo(dAtA []byte) (int, error)
- func (m *HistogramOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*HistogramOptions) ProtoMessage()
- func (m *HistogramOptions) Reset()
- func (m *HistogramOptions) Size() (n int)
- func (this *HistogramOptions) String() string
- func (m *HistogramOptions) Unmarshal(dAtA []byte) error
- func (m *HistogramOptions) XXX_DiscardUnknown()
- func (m *HistogramOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *HistogramOptions) XXX_Merge(src proto.Message)
- func (m *HistogramOptions) XXX_Size() int
- func (m *HistogramOptions) XXX_Unmarshal(b []byte) error
- type LoggingGroupDimension
- type LoggingTailSampling
- type MetricBase
- type PipelineAction
- type PipelineType
- type PointsData
- type QuantileOptions
- func (*QuantileOptions) Descriptor() ([]byte, []int)
- func (this *QuantileOptions) Equal(that interface{}) bool
- func (m *QuantileOptions) GetPercentiles() []float64
- func (this *QuantileOptions) GoString() string
- func (m *QuantileOptions) Marshal() (dAtA []byte, err error)
- func (m *QuantileOptions) MarshalTo(dAtA []byte) (int, error)
- func (m *QuantileOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*QuantileOptions) ProtoMessage()
- func (m *QuantileOptions) Reset()
- func (m *QuantileOptions) Size() (n int)
- func (this *QuantileOptions) String() string
- func (m *QuantileOptions) Unmarshal(dAtA []byte) error
- func (m *QuantileOptions) XXX_DiscardUnknown()
- func (m *QuantileOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *QuantileOptions) XXX_Merge(src proto.Message)
- func (m *QuantileOptions) XXX_Size() int
- func (m *QuantileOptions) XXX_Unmarshal(b []byte) error
- type RUMGroupDimension
- type RUMTailSampling
- type RuleSelector
- type SamplingPipeline
- type Shard
- type TailSamplingBuiltinMetric
- type TailSamplingBuiltinMetrics
- func (ms TailSamplingBuiltinMetrics) OnDecision(packet *DataPacket, decision DerivedMetricDecision) []DerivedMetricRecord
- func (ms TailSamplingBuiltinMetrics) OnIngest(packet *DataPacket) []DerivedMetricRecord
- func (ms TailSamplingBuiltinMetrics) OnPreDecision(packet *DataPacket) []DerivedMetricRecord
- type TailSamplingConfigs
- type TailSamplingOutcome
- type TailSamplingProcessor
- func (r *TailSamplingProcessor) AdvanceTime() map[uint64]*DataGroup
- func (r *TailSamplingProcessor) BuiltinMetrics() TailSamplingBuiltinMetrics
- func (r *TailSamplingProcessor) Collector() *DerivedMetricCollector
- func (r *TailSamplingProcessor) FlushDerivedMetrics(now time.Time) []*DerivedMetricPoints
- func (r *TailSamplingProcessor) IngestPacket(packet *DataPacket)
- func (r *TailSamplingProcessor) RecordDecision(packet *DataPacket, decision DerivedMetricDecision)
- func (r *TailSamplingProcessor) Sampler() *GlobalSampler
- func (r *TailSamplingProcessor) TailSamplingData(dataGroups map[uint64]*DataGroup) map[uint64]*DataPacket
- func (r *TailSamplingProcessor) UpdateConfig(token string, cfg *TailSamplingConfigs) error
- type TraceTailSampling
- type Window
- type Windows
Constants ¶
const ( // actions. ActionPassThrough = "passthrough" ActionDrop = "drop" )
const ( GuanceRoutingKey = "Guance-Routing-Key" GuancePickKey = "Guance-Pick-Key" )
const ( Seed1 = uint64(0x9E3779B97F4A7C15) Seed2 = uint64(0x6A09E667F3BCC908) SeedU32 = uint32(0x7F4A7C15) )
const ( PipelineTypeCondition = "condition" PipelineTypeSampling = "probabilistic" PipelineActionKeep = "keep" PipelineActionDrop = "drop" )
const DefaultDerivedMetricFlushWindow = 30 * time.Second
const TailSamplingDerivedMeasurement = "tail_sampling"
Variables ¶
Functions ¶
func HashCombine ¶
HashCombine used to combine 2 u64 hash value, see https://zhuanlan.zhihu.com/p/574573421.
func SampleStdDev ¶
SampleStdDev 计算样本标准差(除以 N-1).
func SetLogging ¶
Types ¶
type AggregateRule ¶
type AggregateRule struct {
Name string `toml:"name" json:"name"`
Selector *RuleSelector `toml:"select" json:"select"`
Groupby []string `toml:"group_by" json:"group_by"`
Algorithms map[string]*AggregationAlgoConfig `toml:"algorithms" json:"algorithms"`
// contains filtered or unexported fields
}
AggregateRule configured a specific aggregate rule.
func (*AggregateRule) GroupbyBatch ¶
func (ar *AggregateRule) GroupbyBatch(ac *AggregatorConfigure, pts []*point.Point) (batches []*AggregationBatch)
GroupbyBatch creates aggregation batches from points based on grouping keys.
func (*AggregateRule) GroupbyPoints ¶
GroupbyPoints groups points by their hash value calculated from grouping keys.
func (*AggregateRule) SelectPoints ¶
func (ar *AggregateRule) SelectPoints(pts []*point.Point) []*point.Point
SelectPoints filters points based on the rule's selector criteria.
type AggregationAlgo ¶
type AggregationAlgo struct {
// 1. which algorithm to apply for specific field?
// Use a readable string such as:
// method = "sum"
// method = "count"
// method = "quantiles"
// Supported values are defined by the package-level AlgoMethod constants.
Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"`
// 2. what the origin field name of the data?
SourceField string `protobuf:"bytes,2,opt,name=source_field,json=sourceField,proto3" json:"source_field,omitempty"`
Window int64 `protobuf:"varint,3,opt,name=window,proto3" json:"window,omitempty"`
// Types that are valid to be assigned to Options:
// *AggregationAlgo_HistogramOpts
// *AggregationAlgo_ExpoOpts
// *AggregationAlgo_QuantileOpts
Options isAggregationAlgo_Options `protobuf_oneof:"options"`
AddTags map[string]string `` /* 170-byte string literal not displayed */
}
func (*AggregationAlgo) Descriptor ¶
func (*AggregationAlgo) Descriptor() ([]byte, []int)
func (*AggregationAlgo) Equal ¶
func (this *AggregationAlgo) Equal(that interface{}) bool
func (*AggregationAlgo) GetAddTags ¶
func (m *AggregationAlgo) GetAddTags() map[string]string
func (*AggregationAlgo) GetExpoOpts ¶
func (m *AggregationAlgo) GetExpoOpts() *ExpoHistogramOptions
func (*AggregationAlgo) GetHistogramOpts ¶
func (m *AggregationAlgo) GetHistogramOpts() *HistogramOptions
func (*AggregationAlgo) GetMethod ¶
func (m *AggregationAlgo) GetMethod() string
func (*AggregationAlgo) GetOptions ¶
func (m *AggregationAlgo) GetOptions() isAggregationAlgo_Options
func (*AggregationAlgo) GetQuantileOpts ¶
func (m *AggregationAlgo) GetQuantileOpts() *QuantileOptions
func (*AggregationAlgo) GetSourceField ¶
func (m *AggregationAlgo) GetSourceField() string
func (*AggregationAlgo) GetWindow ¶
func (m *AggregationAlgo) GetWindow() int64
func (*AggregationAlgo) GoString ¶
func (this *AggregationAlgo) GoString() string
func (*AggregationAlgo) Marshal ¶
func (m *AggregationAlgo) Marshal() (dAtA []byte, err error)
func (*AggregationAlgo) MarshalToSizedBuffer ¶
func (m *AggregationAlgo) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*AggregationAlgo) ProtoMessage ¶
func (*AggregationAlgo) ProtoMessage()
func (*AggregationAlgo) Reset ¶
func (m *AggregationAlgo) Reset()
func (*AggregationAlgo) Size ¶
func (m *AggregationAlgo) Size() (n int)
func (*AggregationAlgo) String ¶
func (this *AggregationAlgo) String() string
func (*AggregationAlgo) Unmarshal ¶
func (m *AggregationAlgo) Unmarshal(dAtA []byte) error
func (*AggregationAlgo) XXX_DiscardUnknown ¶
func (m *AggregationAlgo) XXX_DiscardUnknown()
func (*AggregationAlgo) XXX_Marshal ¶
func (m *AggregationAlgo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*AggregationAlgo) XXX_Merge ¶
func (m *AggregationAlgo) XXX_Merge(src proto.Message)
func (*AggregationAlgo) XXX_OneofWrappers ¶
func (*AggregationAlgo) XXX_OneofWrappers() []interface{}
XXX_OneofWrappers is for the internal use of the proto package.
func (*AggregationAlgo) XXX_Size ¶
func (m *AggregationAlgo) XXX_Size() int
func (*AggregationAlgo) XXX_Unmarshal ¶
func (m *AggregationAlgo) XXX_Unmarshal(b []byte) error
type AggregationAlgoConfig ¶
type AggregationAlgoConfig struct {
Method string `toml:"method" json:"method"`
SourceField string `toml:"source_field" json:"source_field"`
Window int64 `toml:"window" json:"window"`
AddTags map[string]string `toml:"add_tags" json:"add_tags"`
HistogramOpts *HistogramOptions `toml:"histogram_opts" json:"histogram_opts"`
ExpoOpts *ExpoHistogramOptions `toml:"expo_opts" json:"expo_opts"`
QuantileOpts *QuantileOptions `toml:"quantile_opts" json:"quantile_opts"`
}
func (*AggregationAlgoConfig) ToAggregationAlgo ¶
func (cfg *AggregationAlgoConfig) ToAggregationAlgo() *AggregationAlgo
type AggregationAlgo_ExpoOpts ¶
type AggregationAlgo_ExpoOpts struct {
ExpoOpts *ExpoHistogramOptions `protobuf:"bytes,11,opt,name=expo_opts,json=expoOpts,proto3,oneof" json:"expo_opts,omitempty"`
}
func (*AggregationAlgo_ExpoOpts) Equal ¶
func (this *AggregationAlgo_ExpoOpts) Equal(that interface{}) bool
func (*AggregationAlgo_ExpoOpts) GoString ¶
func (this *AggregationAlgo_ExpoOpts) GoString() string
func (*AggregationAlgo_ExpoOpts) MarshalTo ¶
func (m *AggregationAlgo_ExpoOpts) MarshalTo(dAtA []byte) (int, error)
func (*AggregationAlgo_ExpoOpts) MarshalToSizedBuffer ¶
func (m *AggregationAlgo_ExpoOpts) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*AggregationAlgo_ExpoOpts) Size ¶
func (m *AggregationAlgo_ExpoOpts) Size() (n int)
func (*AggregationAlgo_ExpoOpts) String ¶
func (this *AggregationAlgo_ExpoOpts) String() string
type AggregationAlgo_HistogramOpts ¶
type AggregationAlgo_HistogramOpts struct {
HistogramOpts *HistogramOptions `protobuf:"bytes,10,opt,name=histogram_opts,json=histogramOpts,proto3,oneof" json:"histogram_opts,omitempty"`
}
func (*AggregationAlgo_HistogramOpts) Equal ¶
func (this *AggregationAlgo_HistogramOpts) Equal(that interface{}) bool
func (*AggregationAlgo_HistogramOpts) GoString ¶
func (this *AggregationAlgo_HistogramOpts) GoString() string
func (*AggregationAlgo_HistogramOpts) MarshalTo ¶
func (m *AggregationAlgo_HistogramOpts) MarshalTo(dAtA []byte) (int, error)
func (*AggregationAlgo_HistogramOpts) MarshalToSizedBuffer ¶
func (m *AggregationAlgo_HistogramOpts) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*AggregationAlgo_HistogramOpts) Size ¶
func (m *AggregationAlgo_HistogramOpts) Size() (n int)
func (*AggregationAlgo_HistogramOpts) String ¶
func (this *AggregationAlgo_HistogramOpts) String() string
type AggregationAlgo_QuantileOpts ¶
type AggregationAlgo_QuantileOpts struct {
QuantileOpts *QuantileOptions `protobuf:"bytes,12,opt,name=quantile_opts,json=quantileOpts,proto3,oneof" json:"quantile_opts,omitempty"`
}
func (*AggregationAlgo_QuantileOpts) Equal ¶
func (this *AggregationAlgo_QuantileOpts) Equal(that interface{}) bool
func (*AggregationAlgo_QuantileOpts) GoString ¶
func (this *AggregationAlgo_QuantileOpts) GoString() string
func (*AggregationAlgo_QuantileOpts) MarshalTo ¶
func (m *AggregationAlgo_QuantileOpts) MarshalTo(dAtA []byte) (int, error)
func (*AggregationAlgo_QuantileOpts) MarshalToSizedBuffer ¶
func (m *AggregationAlgo_QuantileOpts) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*AggregationAlgo_QuantileOpts) Size ¶
func (m *AggregationAlgo_QuantileOpts) Size() (n int)
func (*AggregationAlgo_QuantileOpts) String ¶
func (this *AggregationAlgo_QuantileOpts) String() string
type AggregationBatch ¶
type AggregationBatch struct {
RoutingKey uint64 `protobuf:"varint,1,opt,name=routing_key,json=routingKey,proto3" json:"routing_key,omitempty"`
ConfigHash uint64 `protobuf:"varint,2,opt,name=config_hash,json=configHash,proto3" json:"config_hash,omitempty"`
// raw toml configures
RawConfig []byte `protobuf:"bytes,3,opt,name=raw_config,json=rawConfig,proto3" json:"raw_config,omitempty"`
AggregationOpts map[string]*AggregationAlgo `` /* 194-byte string literal not displayed */
Points *point.PBPoints `protobuf:"bytes,5,opt,name=points,proto3" json:"points,omitempty"`
PickKey uint64 `protobuf:"varint,6,opt,name=pick_key,json=pickKey,proto3" json:"pick_key,omitempty"`
}
func (*AggregationBatch) Descriptor ¶
func (*AggregationBatch) Descriptor() ([]byte, []int)
func (*AggregationBatch) Equal ¶
func (this *AggregationBatch) Equal(that interface{}) bool
func (*AggregationBatch) GetAggregationOpts ¶
func (m *AggregationBatch) GetAggregationOpts() map[string]*AggregationAlgo
func (*AggregationBatch) GetConfigHash ¶
func (m *AggregationBatch) GetConfigHash() uint64
func (*AggregationBatch) GetPickKey ¶
func (m *AggregationBatch) GetPickKey() uint64
func (*AggregationBatch) GetPoints ¶
func (m *AggregationBatch) GetPoints() *point.PBPoints
func (*AggregationBatch) GetRawConfig ¶
func (m *AggregationBatch) GetRawConfig() []byte
func (*AggregationBatch) GetRoutingKey ¶
func (m *AggregationBatch) GetRoutingKey() uint64
func (*AggregationBatch) GoString ¶
func (this *AggregationBatch) GoString() string
func (*AggregationBatch) Marshal ¶
func (m *AggregationBatch) Marshal() (dAtA []byte, err error)
func (*AggregationBatch) MarshalToSizedBuffer ¶
func (m *AggregationBatch) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*AggregationBatch) ProtoMessage ¶
func (*AggregationBatch) ProtoMessage()
func (*AggregationBatch) Reset ¶
func (m *AggregationBatch) Reset()
func (*AggregationBatch) Size ¶
func (m *AggregationBatch) Size() (n int)
func (*AggregationBatch) String ¶
func (this *AggregationBatch) String() string
func (*AggregationBatch) Unmarshal ¶
func (m *AggregationBatch) Unmarshal(dAtA []byte) error
func (*AggregationBatch) XXX_DiscardUnknown ¶
func (m *AggregationBatch) XXX_DiscardUnknown()
func (*AggregationBatch) XXX_Marshal ¶
func (m *AggregationBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*AggregationBatch) XXX_Merge ¶
func (m *AggregationBatch) XXX_Merge(src proto.Message)
func (*AggregationBatch) XXX_Size ¶
func (m *AggregationBatch) XXX_Size() int
func (*AggregationBatch) XXX_Unmarshal ¶
func (m *AggregationBatch) XXX_Unmarshal(b []byte) error
type AggregatorConfigure ¶
type AggregatorConfigure struct {
DefaultWindow time.Duration `toml:"default_window" json:"default_window"`
AggregateRules []*AggregateRule `toml:"aggregate_rules" json:"aggregate_rules"`
DefaultAction Action `toml:"action" json:"action"`
DeleteRulesPoint bool `toml:"delete_rules_point" json:"delete_rules_point"`
// contains filtered or unexported fields
}
AggregatorConfigure is the top-level aggregator configure on single workspace.
func (*AggregatorConfigure) PickPoints ¶
PickPoints organizes points into batches based on aggregation rules and grouping keys. 多种 category:M,L,RUM,T 类型的数据都可以筛选,返回的一定是指标类型。
func (*AggregatorConfigure) SelectPoints ¶
func (ac *AggregatorConfigure) SelectPoints(pts []*point.Point) (groups [][]*point.Point)
SelectPoints selects points from the input slice based on aggregate rules.
func (*AggregatorConfigure) Setup ¶
func (ac *AggregatorConfigure) Setup() error
Setup initializes the aggregator configuration, validates rules, and prepares calculators.
func (*AggregatorConfigure) UnmarshalTOML ¶
func (ac *AggregatorConfigure) UnmarshalTOML(data interface{}) error
type AlgoMethod ¶
type AlgoMethod string
const ( METHOD_UNSPECIFIED AlgoMethod = "" SUM AlgoMethod = "sum" AVG AlgoMethod = "avg" COUNT AlgoMethod = "count" MIN AlgoMethod = "min" MAX AlgoMethod = "max" HISTOGRAM AlgoMethod = "histogram" EXPO_HISTOGRAM AlgoMethod = "expo_histogram" STDEV AlgoMethod = "stdev" QUANTILES AlgoMethod = "quantiles" COUNT_DISTINCT AlgoMethod = "count_distinct" LAST AlgoMethod = "last" FIRST AlgoMethod = "first" )
func NormalizeAlgoMethod ¶
func NormalizeAlgoMethod(raw string) AlgoMethod
func (AlgoMethod) String ¶
func (m AlgoMethod) String() string
type Batchs ¶
type Batchs struct {
Batchs []*AggregationBatch `protobuf:"bytes,1,rep,name=batchs,proto3" json:"batchs,omitempty"`
PickKey uint64 `protobuf:"varint,2,opt,name=pick_key,json=pickKey,proto3" json:"pick_key,omitempty"`
}
func (*Batchs) Descriptor ¶
func (*Batchs) GetBatchs ¶
func (m *Batchs) GetBatchs() []*AggregationBatch
func (*Batchs) GetPickKey ¶
func (*Batchs) MarshalToSizedBuffer ¶
func (*Batchs) ProtoMessage ¶
func (*Batchs) ProtoMessage()
func (*Batchs) XXX_DiscardUnknown ¶
func (m *Batchs) XXX_DiscardUnknown()
func (*Batchs) XXX_Marshal ¶
func (*Batchs) XXX_Unmarshal ¶
type BuiltinMetricCfg ¶
type Cache ¶
type Cache struct {
// 每一个窗口创建一个对象,针对这个Window 进行add 操作,最终到达容忍时间,整个windows会从map中删除
// key:容忍时间+窗口时间。
WindowsBuckets map[int64]*Windows
Expired time.Duration
// contains filtered or unexported fields
}
func (*Cache) AddBatch ¶
func (c *Cache) AddBatch(token string, batch *AggregationBatch) (n, expN int)
func (*Cache) AddBatchs ¶
func (c *Cache) AddBatchs(token string, batchs []*AggregationBatch) (n, expN int)
func (*Cache) GetAndSetBucket ¶
func (c *Cache) GetAndSetBucket(exp int64, token string, cal Calculator)
func (*Cache) GetExpWidows ¶
type Calculator ¶
type DataBatch ¶
type DataBatch struct {
CollectorId string `protobuf:"bytes,1,opt,name=collector_id,json=collectorId,proto3" json:"collector_id,omitempty"`
Packets []*DataPacket `protobuf:"bytes,2,rep,name=packets,proto3" json:"packets,omitempty"`
}
batch packages
func (*DataBatch) Descriptor ¶
func (*DataBatch) GetCollectorId ¶
func (*DataBatch) GetPackets ¶
func (m *DataBatch) GetPackets() []*DataPacket
func (*DataBatch) MarshalToSizedBuffer ¶
func (*DataBatch) ProtoMessage ¶
func (*DataBatch) ProtoMessage()
func (*DataBatch) XXX_DiscardUnknown ¶
func (m *DataBatch) XXX_DiscardUnknown()
func (*DataBatch) XXX_Marshal ¶
func (*DataBatch) XXX_Unmarshal ¶
type DataGroup ¶
type DataGroup struct {
FirstSeen time.Time
ExpiredTime int64
// contains filtered or unexported fields
}
DataGroup 是时间轮中的 entry, 仅持有原始 point 二进制切片,避免长期持有展开对象图。
type DataPacket ¶
type DataPacket struct {
GroupIdHash uint64 `protobuf:"varint,1,opt,name=group_id_hash,json=groupIdHash,proto3" json:"group_id_hash,omitempty"`
RawGroupId string `protobuf:"bytes,2,opt,name=raw_group_id,json=rawGroupId,proto3" json:"raw_group_id,omitempty"`
Token string `protobuf:"bytes,3,opt,name=token,proto3" json:"token,omitempty"`
Source string `protobuf:"bytes,4,opt,name=source,proto3" json:"source,omitempty"`
DataType string `protobuf:"bytes,5,opt,name=data_type,json=dataType,proto3" json:"data_type,omitempty"`
ConfigVersion int64 `protobuf:"varint,6,opt,name=config_version,json=configVersion,proto3" json:"config_version,omitempty"`
HasError bool `protobuf:"varint,7,opt,name=has_error,json=hasError,proto3" json:"has_error,omitempty"`
GroupKey string `protobuf:"bytes,8,opt,name=group_key,json=groupKey,proto3" json:"group_key,omitempty"`
PointCount int32 `protobuf:"varint,9,opt,name=point_count,json=pointCount,proto3" json:"point_count,omitempty"`
TraceStartTimeUnixNano int64 `` /* 135-byte string literal not displayed */
TraceEndTimeUnixNano int64 `` /* 129-byte string literal not displayed */
PointsPayload []byte `protobuf:"bytes,12,opt,name=points_payload,json=pointsPayload,proto3" json:"points_payload,omitempty"`
MaxPointTimeUnixNano int64 `` /* 129-byte string literal not displayed */
}
数据包:使用连续 PBPoints payload,避免尾采样链路重复解码/编码。
func (*DataPacket) Descriptor ¶
func (*DataPacket) Descriptor() ([]byte, []int)
func (*DataPacket) Equal ¶
func (this *DataPacket) Equal(that interface{}) bool
func (*DataPacket) GetConfigVersion ¶
func (m *DataPacket) GetConfigVersion() int64
func (*DataPacket) GetDataType ¶
func (m *DataPacket) GetDataType() string
func (*DataPacket) GetGroupIdHash ¶
func (m *DataPacket) GetGroupIdHash() uint64
func (*DataPacket) GetGroupKey ¶
func (m *DataPacket) GetGroupKey() string
func (*DataPacket) GetHasError ¶
func (m *DataPacket) GetHasError() bool
func (*DataPacket) GetMaxPointTimeUnixNano ¶
func (m *DataPacket) GetMaxPointTimeUnixNano() int64
func (*DataPacket) GetPointCount ¶
func (m *DataPacket) GetPointCount() int32
func (*DataPacket) GetPointsPayload ¶
func (m *DataPacket) GetPointsPayload() []byte
func (*DataPacket) GetRawGroupId ¶
func (m *DataPacket) GetRawGroupId() string
func (*DataPacket) GetSource ¶
func (m *DataPacket) GetSource() string
func (*DataPacket) GetToken ¶
func (m *DataPacket) GetToken() string
func (*DataPacket) GetTraceEndTimeUnixNano ¶
func (m *DataPacket) GetTraceEndTimeUnixNano() int64
func (*DataPacket) GetTraceStartTimeUnixNano ¶
func (m *DataPacket) GetTraceStartTimeUnixNano() int64
func (*DataPacket) GoString ¶
func (this *DataPacket) GoString() string
func (*DataPacket) Marshal ¶
func (m *DataPacket) Marshal() (dAtA []byte, err error)
func (*DataPacket) MarshalToSizedBuffer ¶
func (m *DataPacket) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*DataPacket) ProtoMessage ¶
func (*DataPacket) ProtoMessage()
func (*DataPacket) Reset ¶
func (m *DataPacket) Reset()
func (*DataPacket) Size ¶
func (m *DataPacket) Size() (n int)
func (*DataPacket) String ¶
func (this *DataPacket) String() string
func (*DataPacket) Unmarshal ¶
func (m *DataPacket) Unmarshal(dAtA []byte) error
func (*DataPacket) WalkRawPBPoints ¶
func (packet *DataPacket) WalkRawPBPoints(fn func([]byte) bool) error
func (*DataPacket) XXX_DiscardUnknown ¶
func (m *DataPacket) XXX_DiscardUnknown()
func (*DataPacket) XXX_Marshal ¶
func (m *DataPacket) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*DataPacket) XXX_Merge ¶
func (m *DataPacket) XXX_Merge(src proto.Message)
func (*DataPacket) XXX_Size ¶
func (m *DataPacket) XXX_Size() int
func (*DataPacket) XXX_Unmarshal ¶
func (m *DataPacket) XXX_Unmarshal(b []byte) error
type DerivedMetric ¶
type DerivedMetric struct {
Name string `toml:"name" json:"name"`
Type AlgoMethod `toml:"type" json:"type"`
Condition string `toml:"condition" json:"condition"`
Groupby []string `toml:"group_by" json:"group_by"`
}
type DerivedMetricCollector ¶
type DerivedMetricCollector struct {
// contains filtered or unexported fields
}
func NewDerivedMetricCollector ¶
func NewDerivedMetricCollector(window time.Duration) *DerivedMetricCollector
func (*DerivedMetricCollector) Add ¶
func (c *DerivedMetricCollector) Add(records []DerivedMetricRecord)
func (*DerivedMetricCollector) Flush ¶
func (c *DerivedMetricCollector) Flush(now time.Time) []*DerivedMetricPoints
func (*DerivedMetricCollector) String ¶
func (c *DerivedMetricCollector) String() string
type DerivedMetricDecision ¶
type DerivedMetricDecision string
const ( DerivedMetricDecisionUnknown DerivedMetricDecision = "" DerivedMetricDecisionKept DerivedMetricDecision = "kept" DerivedMetricDecisionDropped DerivedMetricDecision = "dropped" )
type DerivedMetricKind ¶
type DerivedMetricKind string
const ( DerivedMetricKindSum DerivedMetricKind = "sum" DerivedMetricKindHistogram DerivedMetricKind = "histogram" )
type DerivedMetricPoints ¶
type DerivedMetricRecord ¶
type DerivedMetricRecord struct {
Token string
DataType string
MetricName string
Kind DerivedMetricKind
Stage DerivedMetricStage
Decision DerivedMetricDecision
Measurement string
Tags map[string]string
Value float64
Buckets []float64
Time time.Time
}
DerivedMetricRecord is the lightweight event produced by tail-sampling hooks.
type DerivedMetricStage ¶
type DerivedMetricStage string
const ( DerivedMetricStageIngest DerivedMetricStage = "ingest" DerivedMetricStagePreDecision DerivedMetricStage = "pre_decision" DerivedMetricStageDecision DerivedMetricStage = "decision" )
type ExpoHistogramOptions ¶
type ExpoHistogramOptions struct {
MaxScale int32 `protobuf:"varint,1,opt,name=max_scale,json=maxScale,proto3" json:"max_scale,omitempty"`
MaxBuckets int32 `protobuf:"varint,2,opt,name=max_buckets,json=maxBuckets,proto3" json:"max_buckets,omitempty"`
RecordMinMax bool `protobuf:"varint,3,opt,name=record_min_max,json=recordMinMax,proto3" json:"record_min_max,omitempty"`
}
func (*ExpoHistogramOptions) Descriptor ¶
func (*ExpoHistogramOptions) Descriptor() ([]byte, []int)
func (*ExpoHistogramOptions) Equal ¶
func (this *ExpoHistogramOptions) Equal(that interface{}) bool
func (*ExpoHistogramOptions) GetMaxBuckets ¶
func (m *ExpoHistogramOptions) GetMaxBuckets() int32
func (*ExpoHistogramOptions) GetMaxScale ¶
func (m *ExpoHistogramOptions) GetMaxScale() int32
func (*ExpoHistogramOptions) GetRecordMinMax ¶
func (m *ExpoHistogramOptions) GetRecordMinMax() bool
func (*ExpoHistogramOptions) GoString ¶
func (this *ExpoHistogramOptions) GoString() string
func (*ExpoHistogramOptions) Marshal ¶
func (m *ExpoHistogramOptions) Marshal() (dAtA []byte, err error)
func (*ExpoHistogramOptions) MarshalTo ¶
func (m *ExpoHistogramOptions) MarshalTo(dAtA []byte) (int, error)
func (*ExpoHistogramOptions) MarshalToSizedBuffer ¶
func (m *ExpoHistogramOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*ExpoHistogramOptions) ProtoMessage ¶
func (*ExpoHistogramOptions) ProtoMessage()
func (*ExpoHistogramOptions) Reset ¶
func (m *ExpoHistogramOptions) Reset()
func (*ExpoHistogramOptions) Size ¶
func (m *ExpoHistogramOptions) Size() (n int)
func (*ExpoHistogramOptions) String ¶
func (this *ExpoHistogramOptions) String() string
func (*ExpoHistogramOptions) Unmarshal ¶
func (m *ExpoHistogramOptions) Unmarshal(dAtA []byte) error
func (*ExpoHistogramOptions) XXX_DiscardUnknown ¶
func (m *ExpoHistogramOptions) XXX_DiscardUnknown()
func (*ExpoHistogramOptions) XXX_Marshal ¶
func (m *ExpoHistogramOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ExpoHistogramOptions) XXX_Merge ¶
func (m *ExpoHistogramOptions) XXX_Merge(src proto.Message)
func (*ExpoHistogramOptions) XXX_Size ¶
func (m *ExpoHistogramOptions) XXX_Size() int
func (*ExpoHistogramOptions) XXX_Unmarshal ¶
func (m *ExpoHistogramOptions) XXX_Unmarshal(b []byte) error
type GlobalSampler ¶
type GlobalSampler struct {
// contains filtered or unexported fields
}
GlobalSampler 全局管理器.
func NewGlobalSampler ¶
func NewGlobalSampler(shardCount int, waitTime time.Duration) *GlobalSampler
func (*GlobalSampler) AdvanceTime ¶
func (s *GlobalSampler) AdvanceTime() map[uint64]*DataGroup
AdvanceTime 拨动时间轮,返回当前槽位到期的数据.
func (*GlobalSampler) GetLoggingConfig ¶
func (s *GlobalSampler) GetLoggingConfig(token string) *LoggingTailSampling
func (*GlobalSampler) GetRUMConfig ¶
func (s *GlobalSampler) GetRUMConfig(token string) *RUMTailSampling
func (*GlobalSampler) GetTraceConfig ¶
func (s *GlobalSampler) GetTraceConfig(token string) *TraceTailSampling
func (*GlobalSampler) Ingest ¶
func (s *GlobalSampler) Ingest(packet *DataPacket)
func (*GlobalSampler) TailSamplingData ¶
func (s *GlobalSampler) TailSamplingData(dataGroups map[uint64]*DataGroup) map[uint64]*DataPacket
func (*GlobalSampler) TailSamplingOutcomes ¶
func (s *GlobalSampler) TailSamplingOutcomes(dataGroups map[uint64]*DataGroup) map[uint64]*TailSamplingOutcome
func (*GlobalSampler) UpdateConfig ¶
func (s *GlobalSampler) UpdateConfig(token string, ts *TailSamplingConfigs) error
type HistogramOptions ¶
type HistogramOptions struct {
// for explict histogram(e.g. 10, 50, 100)
Buckets []float64 `protobuf:"fixed64,1,rep,packed,name=buckets,proto3" json:"buckets,omitempty"`
}
func (*HistogramOptions) Descriptor ¶
func (*HistogramOptions) Descriptor() ([]byte, []int)
func (*HistogramOptions) Equal ¶
func (this *HistogramOptions) Equal(that interface{}) bool
func (*HistogramOptions) GetBuckets ¶
func (m *HistogramOptions) GetBuckets() []float64
func (*HistogramOptions) GoString ¶
func (this *HistogramOptions) GoString() string
func (*HistogramOptions) Marshal ¶
func (m *HistogramOptions) Marshal() (dAtA []byte, err error)
func (*HistogramOptions) MarshalToSizedBuffer ¶
func (m *HistogramOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*HistogramOptions) ProtoMessage ¶
func (*HistogramOptions) ProtoMessage()
func (*HistogramOptions) Reset ¶
func (m *HistogramOptions) Reset()
func (*HistogramOptions) Size ¶
func (m *HistogramOptions) Size() (n int)
func (*HistogramOptions) String ¶
func (this *HistogramOptions) String() string
func (*HistogramOptions) Unmarshal ¶
func (m *HistogramOptions) Unmarshal(dAtA []byte) error
func (*HistogramOptions) XXX_DiscardUnknown ¶
func (m *HistogramOptions) XXX_DiscardUnknown()
func (*HistogramOptions) XXX_Marshal ¶
func (m *HistogramOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*HistogramOptions) XXX_Merge ¶
func (m *HistogramOptions) XXX_Merge(src proto.Message)
func (*HistogramOptions) XXX_Size ¶
func (m *HistogramOptions) XXX_Size() int
func (*HistogramOptions) XXX_Unmarshal ¶
func (m *HistogramOptions) XXX_Unmarshal(b []byte) error
type LoggingGroupDimension ¶
type LoggingGroupDimension struct {
// 分组键(如 user_id, order_id, session_id)
GroupKey string `toml:"group_key" json:"group_key"`
// 该分组维度下的采样管道
Pipelines []*SamplingPipeline `toml:"pipelines" json:"pipelines"`
// 该分组特有的派生指标
DerivedMetrics []*DerivedMetric `toml:"derived_metrics" json:"derived_metrics"`
}
func (*LoggingGroupDimension) PickLogging ¶
func (logGroup *LoggingGroupDimension) PickLogging(source string, pts []*point.Point) (map[uint64]*DataPacket, []*point.Point)
type LoggingTailSampling ¶
type LoggingTailSampling struct {
DataTTL time.Duration `toml:"data_ttl" json:"data_ttl"`
Version int64 `toml:"version" json:"version"`
// 内置指标配置,默认全开
BuiltinMetrics []*BuiltinMetricCfg `toml:"builtin_metrics" json:"builtin_metrics"`
// 按分组维度配置(不再是全局管道)
GroupDimensions []*LoggingGroupDimension `toml:"group_dimensions" json:"group_dimensions"`
}
type MetricBase ¶
type MetricBase struct {
// contains filtered or unexported fields
}
func (*MetricBase) String ¶
func (mb *MetricBase) String() string
type PipelineAction ¶
type PipelineAction string
type PipelineType ¶
type PipelineType string
type PointsData ¶
func WindowsToData ¶
func WindowsToData(ws []*Window) []*PointsData
type QuantileOptions ¶
type QuantileOptions struct {
// The target percentiles (e.g. 0.50, 0.99)
Percentiles []float64 `protobuf:"fixed64,1,rep,packed,name=percentiles,proto3" json:"percentiles,omitempty"`
}
func (*QuantileOptions) Descriptor ¶
func (*QuantileOptions) Descriptor() ([]byte, []int)
func (*QuantileOptions) Equal ¶
func (this *QuantileOptions) Equal(that interface{}) bool
func (*QuantileOptions) GetPercentiles ¶
func (m *QuantileOptions) GetPercentiles() []float64
func (*QuantileOptions) GoString ¶
func (this *QuantileOptions) GoString() string
func (*QuantileOptions) Marshal ¶
func (m *QuantileOptions) Marshal() (dAtA []byte, err error)
func (*QuantileOptions) MarshalToSizedBuffer ¶
func (m *QuantileOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*QuantileOptions) ProtoMessage ¶
func (*QuantileOptions) ProtoMessage()
func (*QuantileOptions) Reset ¶
func (m *QuantileOptions) Reset()
func (*QuantileOptions) Size ¶
func (m *QuantileOptions) Size() (n int)
func (*QuantileOptions) String ¶
func (this *QuantileOptions) String() string
func (*QuantileOptions) Unmarshal ¶
func (m *QuantileOptions) Unmarshal(dAtA []byte) error
func (*QuantileOptions) XXX_DiscardUnknown ¶
func (m *QuantileOptions) XXX_DiscardUnknown()
func (*QuantileOptions) XXX_Marshal ¶
func (m *QuantileOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*QuantileOptions) XXX_Merge ¶
func (m *QuantileOptions) XXX_Merge(src proto.Message)
func (*QuantileOptions) XXX_Size ¶
func (m *QuantileOptions) XXX_Size() int
func (*QuantileOptions) XXX_Unmarshal ¶
func (m *QuantileOptions) XXX_Unmarshal(b []byte) error
type RUMGroupDimension ¶
type RUMGroupDimension struct {
GroupKey string `toml:"group_key" json:"group_key"` // session_id, user_id, page_id
Pipelines []*SamplingPipeline `toml:"pipelines" json:"pipelines"`
DerivedMetrics []*DerivedMetric `toml:"derived_metrics" json:"derived_metrics"`
}
func (*RUMGroupDimension) PickRUM ¶
func (rumGroup *RUMGroupDimension) PickRUM(source string, pts []*point.Point) (map[uint64]*DataPacket, []*point.Point)
type RUMTailSampling ¶
type RUMTailSampling struct {
DataTTL time.Duration `toml:"data_ttl" json:"data_ttl"`
Version int64 `toml:"version" json:"version"`
// 内置指标配置,默认全开
BuiltinMetrics []*BuiltinMetricCfg `toml:"builtin_metrics" json:"builtin_metrics"`
// RUM可能也有多个分组维度
GroupDimensions []*RUMGroupDimension `toml:"group_dimensions" json:"group_dimensions"`
}
RUMTailSampling holds the RUM tail-sampling configuration.
type RuleSelector ¶
type RuleSelector struct {
Category string `toml:"category" json:"category"`
Measurements []string `toml:"measurements" json:"measurements"`
MetricName []string `toml:"metric_name" json:"metric_name"`
Condition string `toml:"condition" json:"condition"`
// contains filtered or unexported fields
}
RuleSelector selects measurements and fields among points.
func (*RuleSelector) Setup ¶
func (rs *RuleSelector) Setup() error
Setup initializes the rule selector with validation and prepares whitelists.
type SamplingPipeline ¶
type SamplingPipeline struct {
Name string `toml:"name" json:"name"`
Type PipelineType `toml:"type" json:"type"`
Condition string `toml:"condition,omitempty" json:"condition,omitempty"`
Action PipelineAction `toml:"action,omitempty" json:"action,omitempty"`
Rate float64 `toml:"rate,omitempty" json:"rate,omitempty"`
HashKeys []string `toml:"hash_keys" json:"hash_keys"`
// contains filtered or unexported fields
}
func (*SamplingPipeline) Apply ¶
func (sp *SamplingPipeline) Apply() error
func (*SamplingPipeline) DoAction ¶
func (sp *SamplingPipeline) DoAction(td *DataPacket) (bool, *DataPacket)
type TailSamplingBuiltinMetric ¶
type TailSamplingBuiltinMetric interface {
Name() string
OnIngest(packet *DataPacket) []DerivedMetricRecord
OnPreDecision(packet *DataPacket) []DerivedMetricRecord
OnDecision(packet *DataPacket, decision DerivedMetricDecision) []DerivedMetricRecord
}
type TailSamplingBuiltinMetrics ¶
type TailSamplingBuiltinMetrics []TailSamplingBuiltinMetric
func DefaultTailSamplingBuiltinMetrics ¶
func DefaultTailSamplingBuiltinMetrics() TailSamplingBuiltinMetrics
func (TailSamplingBuiltinMetrics) OnDecision ¶
func (ms TailSamplingBuiltinMetrics) OnDecision(packet *DataPacket, decision DerivedMetricDecision) []DerivedMetricRecord
func (TailSamplingBuiltinMetrics) OnIngest ¶
func (ms TailSamplingBuiltinMetrics) OnIngest(packet *DataPacket) []DerivedMetricRecord
func (TailSamplingBuiltinMetrics) OnPreDecision ¶
func (ms TailSamplingBuiltinMetrics) OnPreDecision(packet *DataPacket) []DerivedMetricRecord
type TailSamplingConfigs ¶
type TailSamplingConfigs struct {
Tracing *TraceTailSampling `toml:"trace" json:"trace"`
Logging *LoggingTailSampling `toml:"logging" json:"logging"`
RUM *RUMTailSampling `toml:"rum" json:"rum"`
Version int64 `toml:"version" json:"version"`
}
func (*TailSamplingConfigs) Init ¶
func (t *TailSamplingConfigs) Init() error
func (*TailSamplingConfigs) ToString ¶
func (t *TailSamplingConfigs) ToString() string
type TailSamplingOutcome ¶
type TailSamplingOutcome struct {
Packet *DataPacket
SourcePacket *DataPacket
Decision DerivedMetricDecision
}
type TailSamplingProcessor ¶
type TailSamplingProcessor struct {
// contains filtered or unexported fields
}
func NewDefaultTailSamplingProcessor ¶
func NewDefaultTailSamplingProcessor(shardCount int, waitTime time.Duration) *TailSamplingProcessor
func NewTailSamplingProcessor ¶
func NewTailSamplingProcessor( sampler *GlobalSampler, collector *DerivedMetricCollector, metrics TailSamplingBuiltinMetrics, ) *TailSamplingProcessor
func (*TailSamplingProcessor) AdvanceTime ¶
func (r *TailSamplingProcessor) AdvanceTime() map[uint64]*DataGroup
func (*TailSamplingProcessor) BuiltinMetrics ¶
func (r *TailSamplingProcessor) BuiltinMetrics() TailSamplingBuiltinMetrics
func (*TailSamplingProcessor) Collector ¶
func (r *TailSamplingProcessor) Collector() *DerivedMetricCollector
func (*TailSamplingProcessor) FlushDerivedMetrics ¶
func (r *TailSamplingProcessor) FlushDerivedMetrics(now time.Time) []*DerivedMetricPoints
func (*TailSamplingProcessor) IngestPacket ¶
func (r *TailSamplingProcessor) IngestPacket(packet *DataPacket)
func (*TailSamplingProcessor) RecordDecision ¶
func (r *TailSamplingProcessor) RecordDecision(packet *DataPacket, decision DerivedMetricDecision)
func (*TailSamplingProcessor) Sampler ¶
func (r *TailSamplingProcessor) Sampler() *GlobalSampler
func (*TailSamplingProcessor) TailSamplingData ¶
func (r *TailSamplingProcessor) TailSamplingData(dataGroups map[uint64]*DataGroup) map[uint64]*DataPacket
func (*TailSamplingProcessor) UpdateConfig ¶
func (r *TailSamplingProcessor) UpdateConfig(token string, cfg *TailSamplingConfigs) error
type TraceTailSampling ¶
type TraceTailSampling struct {
DataTTL time.Duration `toml:"data_ttl" json:"data_ttl"`
DerivedMetrics []*DerivedMetric `toml:"derived_metrics" json:"derived_metrics"`
BuiltinMetrics []*BuiltinMetricCfg `toml:"builtin_metrics" json:"builtin_metrics"`
Pipelines []*SamplingPipeline `toml:"sampling_pipeline" json:"pipelines"`
Version int64 `toml:"version" json:"version"`
GroupKey string `toml:"group_key" json:"group_key"` // 链路固定为 "trace_id"
}
Source Files
¶
- aggr.go
- aggr_config.go
- aggr_selector.go
- aggrbatch.pb.go
- algo_avg.go
- algo_count.go
- algo_count_distinct.go
- algo_first.go
- algo_histogram.go
- algo_last.go
- algo_max.go
- algo_method.go
- algo_min.go
- algo_quantiles.go
- algo_stdev.go
- algo_string.go
- algo_sum.go
- calculator.go
- derived_metric_collector.go
- derived_metric_record.go
- doc.go
- metricbase.go
- point.go
- tail-sampling.go
- tail_sampling_builtin_metrics.go
- tail_sampling_processor.go
- timewheel.go
- tsdata.pb.go
- tsdata_v2.go
- windows.go