aggregate

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

Package aggregate provides point aggregation and tail-sampling helpers.

Index

Constants

View Source
const (
	// actions.
	ActionPassThrough = "passthrough"
	ActionDrop        = "drop"
)
View Source
const (
	GuanceRoutingKey = "Guance-Routing-Key"
	GuancePickKey    = "Guance-Pick-Key"
)
View Source
const (
	Seed1   = uint64(0x9E3779B97F4A7C15)
	Seed2   = uint64(0x6A09E667F3BCC908)
	SeedU32 = uint32(0x7F4A7C15)
)
View Source
const (
	PipelineTypeCondition = "condition"
	PipelineTypeSampling  = "probabilistic"

	PipelineActionKeep = "keep"
	PipelineActionDrop = "drop"
)
View Source
const DefaultDerivedMetricFlushWindow = 30 * time.Second
View Source
const TailSamplingDerivedMeasurement = "tail_sampling"

Variables

View Source
var (
	ErrInvalidLengthAggrbatch        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowAggrbatch          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupAggrbatch = fmt.Errorf("proto: unexpected end of group")
)
View Source
var (
	ErrInvalidLengthTsdata        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowTsdata          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupTsdata = fmt.Errorf("proto: unexpected end of group")
)

Functions

func AlignNextWallTime

func AlignNextWallTime(t time.Time, align time.Duration) int64

func HashCombine

func HashCombine(seed, hash uint64) uint64

HashCombine used to combine 2 u64 hash value, see https://zhuanlan.zhihu.com/p/574573421.

func HashToken

func HashToken(token string, hash64 uint64) uint64

func PickTrace

func PickTrace(source string, pts []*point.Point, version int64) map[uint64]*DataPacket

func SampleStdDev

func SampleStdDev(data []float64) (float64, error)

SampleStdDev 计算样本标准差(除以 N-1).

func SetLogging

func SetLogging(log *logger.Logger)

Types

type Action

type Action string

type AggregateRule

type AggregateRule struct {
	Name       string                            `toml:"name" json:"name"`
	Selector   *RuleSelector                     `toml:"select" json:"select"`
	Groupby    []string                          `toml:"group_by" json:"group_by"`
	Algorithms map[string]*AggregationAlgoConfig `toml:"algorithms" json:"algorithms"`
	// contains filtered or unexported fields
}

AggregateRule configured a specific aggregate rule.

func (*AggregateRule) GroupbyBatch

func (ar *AggregateRule) GroupbyBatch(ac *AggregatorConfigure, pts []*point.Point) (batches []*AggregationBatch)

GroupbyBatch creates aggregation batches from points based on grouping keys.

func (*AggregateRule) GroupbyPoints

func (ar *AggregateRule) GroupbyPoints(pts []*point.Point) map[uint64][]*point.Point

GroupbyPoints groups points by their hash value calculated from grouping keys.

func (*AggregateRule) SelectPoints

func (ar *AggregateRule) SelectPoints(pts []*point.Point) []*point.Point

SelectPoints filters points based on the rule's selector criteria.

type AggregationAlgo

type AggregationAlgo struct {
	// 1. which algorithm to apply for specific field?
	// Use a readable string such as:
	//   method = "sum"
	//   method = "count"
	//   method = "quantiles"
	// Supported values are defined by the package-level AlgoMethod constants.
	Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"`
	// 2. what the origin field name of the data?
	SourceField string `protobuf:"bytes,2,opt,name=source_field,json=sourceField,proto3" json:"source_field,omitempty"`
	Window      int64  `protobuf:"varint,3,opt,name=window,proto3" json:"window,omitempty"`
	// Types that are valid to be assigned to Options:
	//	*AggregationAlgo_HistogramOpts
	//	*AggregationAlgo_ExpoOpts
	//	*AggregationAlgo_QuantileOpts
	Options isAggregationAlgo_Options `protobuf_oneof:"options"`
	AddTags map[string]string         `` /* 170-byte string literal not displayed */
}

func (*AggregationAlgo) Descriptor

func (*AggregationAlgo) Descriptor() ([]byte, []int)

func (*AggregationAlgo) Equal

func (this *AggregationAlgo) Equal(that interface{}) bool

func (*AggregationAlgo) GetAddTags

func (m *AggregationAlgo) GetAddTags() map[string]string

func (*AggregationAlgo) GetExpoOpts

func (m *AggregationAlgo) GetExpoOpts() *ExpoHistogramOptions

func (*AggregationAlgo) GetHistogramOpts

func (m *AggregationAlgo) GetHistogramOpts() *HistogramOptions

func (*AggregationAlgo) GetMethod

func (m *AggregationAlgo) GetMethod() string

func (*AggregationAlgo) GetOptions

func (m *AggregationAlgo) GetOptions() isAggregationAlgo_Options

func (*AggregationAlgo) GetQuantileOpts

func (m *AggregationAlgo) GetQuantileOpts() *QuantileOptions

func (*AggregationAlgo) GetSourceField

func (m *AggregationAlgo) GetSourceField() string

func (*AggregationAlgo) GetWindow

func (m *AggregationAlgo) GetWindow() int64

func (*AggregationAlgo) GoString

func (this *AggregationAlgo) GoString() string

func (*AggregationAlgo) Marshal

func (m *AggregationAlgo) Marshal() (dAtA []byte, err error)

func (*AggregationAlgo) MarshalTo

func (m *AggregationAlgo) MarshalTo(dAtA []byte) (int, error)

func (*AggregationAlgo) MarshalToSizedBuffer

func (m *AggregationAlgo) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*AggregationAlgo) ProtoMessage

func (*AggregationAlgo) ProtoMessage()

func (*AggregationAlgo) Reset

func (m *AggregationAlgo) Reset()

func (*AggregationAlgo) Size

func (m *AggregationAlgo) Size() (n int)

func (*AggregationAlgo) String

func (this *AggregationAlgo) String() string

func (*AggregationAlgo) Unmarshal

func (m *AggregationAlgo) Unmarshal(dAtA []byte) error

func (*AggregationAlgo) XXX_DiscardUnknown

func (m *AggregationAlgo) XXX_DiscardUnknown()

func (*AggregationAlgo) XXX_Marshal

func (m *AggregationAlgo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*AggregationAlgo) XXX_Merge

func (m *AggregationAlgo) XXX_Merge(src proto.Message)

func (*AggregationAlgo) XXX_OneofWrappers

func (*AggregationAlgo) XXX_OneofWrappers() []interface{}

XXX_OneofWrappers is for the internal use of the proto package.

func (*AggregationAlgo) XXX_Size

func (m *AggregationAlgo) XXX_Size() int

func (*AggregationAlgo) XXX_Unmarshal

func (m *AggregationAlgo) XXX_Unmarshal(b []byte) error

type AggregationAlgoConfig

type AggregationAlgoConfig struct {
	Method        string                `toml:"method" json:"method"`
	SourceField   string                `toml:"source_field" json:"source_field"`
	Window        int64                 `toml:"window" json:"window"`
	AddTags       map[string]string     `toml:"add_tags" json:"add_tags"`
	HistogramOpts *HistogramOptions     `toml:"histogram_opts" json:"histogram_opts"`
	ExpoOpts      *ExpoHistogramOptions `toml:"expo_opts" json:"expo_opts"`
	QuantileOpts  *QuantileOptions      `toml:"quantile_opts" json:"quantile_opts"`
}

func (*AggregationAlgoConfig) ToAggregationAlgo

func (cfg *AggregationAlgoConfig) ToAggregationAlgo() *AggregationAlgo

type AggregationAlgo_ExpoOpts

type AggregationAlgo_ExpoOpts struct {
	ExpoOpts *ExpoHistogramOptions `protobuf:"bytes,11,opt,name=expo_opts,json=expoOpts,proto3,oneof" json:"expo_opts,omitempty"`
}

func (*AggregationAlgo_ExpoOpts) Equal

func (this *AggregationAlgo_ExpoOpts) Equal(that interface{}) bool

func (*AggregationAlgo_ExpoOpts) GoString

func (this *AggregationAlgo_ExpoOpts) GoString() string

func (*AggregationAlgo_ExpoOpts) MarshalTo

func (m *AggregationAlgo_ExpoOpts) MarshalTo(dAtA []byte) (int, error)

func (*AggregationAlgo_ExpoOpts) MarshalToSizedBuffer

func (m *AggregationAlgo_ExpoOpts) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*AggregationAlgo_ExpoOpts) Size

func (m *AggregationAlgo_ExpoOpts) Size() (n int)

func (*AggregationAlgo_ExpoOpts) String

func (this *AggregationAlgo_ExpoOpts) String() string

type AggregationAlgo_HistogramOpts

type AggregationAlgo_HistogramOpts struct {
	HistogramOpts *HistogramOptions `protobuf:"bytes,10,opt,name=histogram_opts,json=histogramOpts,proto3,oneof" json:"histogram_opts,omitempty"`
}

func (*AggregationAlgo_HistogramOpts) Equal

func (this *AggregationAlgo_HistogramOpts) Equal(that interface{}) bool

func (*AggregationAlgo_HistogramOpts) GoString

func (this *AggregationAlgo_HistogramOpts) GoString() string

func (*AggregationAlgo_HistogramOpts) MarshalTo

func (m *AggregationAlgo_HistogramOpts) MarshalTo(dAtA []byte) (int, error)

func (*AggregationAlgo_HistogramOpts) MarshalToSizedBuffer

func (m *AggregationAlgo_HistogramOpts) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*AggregationAlgo_HistogramOpts) Size

func (m *AggregationAlgo_HistogramOpts) Size() (n int)

func (*AggregationAlgo_HistogramOpts) String

func (this *AggregationAlgo_HistogramOpts) String() string

type AggregationAlgo_QuantileOpts

type AggregationAlgo_QuantileOpts struct {
	QuantileOpts *QuantileOptions `protobuf:"bytes,12,opt,name=quantile_opts,json=quantileOpts,proto3,oneof" json:"quantile_opts,omitempty"`
}

func (*AggregationAlgo_QuantileOpts) Equal

func (this *AggregationAlgo_QuantileOpts) Equal(that interface{}) bool

func (*AggregationAlgo_QuantileOpts) GoString

func (this *AggregationAlgo_QuantileOpts) GoString() string

func (*AggregationAlgo_QuantileOpts) MarshalTo

func (m *AggregationAlgo_QuantileOpts) MarshalTo(dAtA []byte) (int, error)

func (*AggregationAlgo_QuantileOpts) MarshalToSizedBuffer

func (m *AggregationAlgo_QuantileOpts) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*AggregationAlgo_QuantileOpts) Size

func (m *AggregationAlgo_QuantileOpts) Size() (n int)

func (*AggregationAlgo_QuantileOpts) String

func (this *AggregationAlgo_QuantileOpts) String() string

type AggregationBatch

type AggregationBatch struct {
	RoutingKey uint64 `protobuf:"varint,1,opt,name=routing_key,json=routingKey,proto3" json:"routing_key,omitempty"`
	ConfigHash uint64 `protobuf:"varint,2,opt,name=config_hash,json=configHash,proto3" json:"config_hash,omitempty"`
	// raw toml configures
	RawConfig       []byte                      `protobuf:"bytes,3,opt,name=raw_config,json=rawConfig,proto3" json:"raw_config,omitempty"`
	AggregationOpts map[string]*AggregationAlgo `` /* 194-byte string literal not displayed */
	Points          *point.PBPoints             `protobuf:"bytes,5,opt,name=points,proto3" json:"points,omitempty"`
	PickKey         uint64                      `protobuf:"varint,6,opt,name=pick_key,json=pickKey,proto3" json:"pick_key,omitempty"`
}

func (*AggregationBatch) Descriptor

func (*AggregationBatch) Descriptor() ([]byte, []int)

func (*AggregationBatch) Equal

func (this *AggregationBatch) Equal(that interface{}) bool

func (*AggregationBatch) GetAggregationOpts

func (m *AggregationBatch) GetAggregationOpts() map[string]*AggregationAlgo

func (*AggregationBatch) GetConfigHash

func (m *AggregationBatch) GetConfigHash() uint64

func (*AggregationBatch) GetPickKey

func (m *AggregationBatch) GetPickKey() uint64

func (*AggregationBatch) GetPoints

func (m *AggregationBatch) GetPoints() *point.PBPoints

func (*AggregationBatch) GetRawConfig

func (m *AggregationBatch) GetRawConfig() []byte

func (*AggregationBatch) GetRoutingKey

func (m *AggregationBatch) GetRoutingKey() uint64

func (*AggregationBatch) GoString

func (this *AggregationBatch) GoString() string

func (*AggregationBatch) Marshal

func (m *AggregationBatch) Marshal() (dAtA []byte, err error)

func (*AggregationBatch) MarshalTo

func (m *AggregationBatch) MarshalTo(dAtA []byte) (int, error)

func (*AggregationBatch) MarshalToSizedBuffer

func (m *AggregationBatch) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*AggregationBatch) ProtoMessage

func (*AggregationBatch) ProtoMessage()

func (*AggregationBatch) Reset

func (m *AggregationBatch) Reset()

func (*AggregationBatch) Size

func (m *AggregationBatch) Size() (n int)

func (*AggregationBatch) String

func (this *AggregationBatch) String() string

func (*AggregationBatch) Unmarshal

func (m *AggregationBatch) Unmarshal(dAtA []byte) error

func (*AggregationBatch) XXX_DiscardUnknown

func (m *AggregationBatch) XXX_DiscardUnknown()

func (*AggregationBatch) XXX_Marshal

func (m *AggregationBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*AggregationBatch) XXX_Merge

func (m *AggregationBatch) XXX_Merge(src proto.Message)

func (*AggregationBatch) XXX_Size

func (m *AggregationBatch) XXX_Size() int

func (*AggregationBatch) XXX_Unmarshal

func (m *AggregationBatch) XXX_Unmarshal(b []byte) error

type AggregatorConfigure

type AggregatorConfigure struct {
	DefaultWindow    time.Duration    `toml:"default_window" json:"default_window"`
	AggregateRules   []*AggregateRule `toml:"aggregate_rules" json:"aggregate_rules"`
	DefaultAction    Action           `toml:"action" json:"action"`
	DeleteRulesPoint bool             `toml:"delete_rules_point" json:"delete_rules_point"`
	// contains filtered or unexported fields
}

AggregatorConfigure is the top-level aggregator configure on single workspace.

func (*AggregatorConfigure) PickPoints

func (ac *AggregatorConfigure) PickPoints(category string, pts []*point.Point) map[uint64]*Batchs

PickPoints organizes points into batches based on aggregation rules and grouping keys. 多种 category:M,L,RUM,T 类型的数据都可以筛选,返回的一定是指标类型。

func (*AggregatorConfigure) SelectPoints

func (ac *AggregatorConfigure) SelectPoints(pts []*point.Point) (groups [][]*point.Point)

SelectPoints selects points from the input slice based on aggregate rules.

func (*AggregatorConfigure) Setup

func (ac *AggregatorConfigure) Setup() error

Setup initializes the aggregator configuration, validates rules, and prepares calculators.

func (*AggregatorConfigure) UnmarshalTOML

func (ac *AggregatorConfigure) UnmarshalTOML(data interface{}) error

type AlgoMethod

type AlgoMethod string
const (
	METHOD_UNSPECIFIED AlgoMethod = ""
	SUM                AlgoMethod = "sum"
	AVG                AlgoMethod = "avg"
	COUNT              AlgoMethod = "count"
	MIN                AlgoMethod = "min"
	MAX                AlgoMethod = "max"
	HISTOGRAM          AlgoMethod = "histogram"
	EXPO_HISTOGRAM     AlgoMethod = "expo_histogram"
	STDEV              AlgoMethod = "stdev"
	QUANTILES          AlgoMethod = "quantiles"
	COUNT_DISTINCT     AlgoMethod = "count_distinct"
	LAST               AlgoMethod = "last"
	FIRST              AlgoMethod = "first"
)

func NormalizeAlgoMethod

func NormalizeAlgoMethod(raw string) AlgoMethod

func (AlgoMethod) String

func (m AlgoMethod) String() string

type Batchs

type Batchs struct {
	Batchs  []*AggregationBatch `protobuf:"bytes,1,rep,name=batchs,proto3" json:"batchs,omitempty"`
	PickKey uint64              `protobuf:"varint,2,opt,name=pick_key,json=pickKey,proto3" json:"pick_key,omitempty"`
}

func (*Batchs) Descriptor

func (*Batchs) Descriptor() ([]byte, []int)

func (*Batchs) Equal

func (this *Batchs) Equal(that interface{}) bool

func (*Batchs) GetBatchs

func (m *Batchs) GetBatchs() []*AggregationBatch

func (*Batchs) GetPickKey

func (m *Batchs) GetPickKey() uint64

func (*Batchs) GoString

func (this *Batchs) GoString() string

func (*Batchs) Marshal

func (m *Batchs) Marshal() (dAtA []byte, err error)

func (*Batchs) MarshalTo

func (m *Batchs) MarshalTo(dAtA []byte) (int, error)

func (*Batchs) MarshalToSizedBuffer

func (m *Batchs) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Batchs) ProtoMessage

func (*Batchs) ProtoMessage()

func (*Batchs) Reset

func (m *Batchs) Reset()

func (*Batchs) Size

func (m *Batchs) Size() (n int)

func (*Batchs) String

func (this *Batchs) String() string

func (*Batchs) Unmarshal

func (m *Batchs) Unmarshal(dAtA []byte) error

func (*Batchs) XXX_DiscardUnknown

func (m *Batchs) XXX_DiscardUnknown()

func (*Batchs) XXX_Marshal

func (m *Batchs) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Batchs) XXX_Merge

func (m *Batchs) XXX_Merge(src proto.Message)

func (*Batchs) XXX_Size

func (m *Batchs) XXX_Size() int

func (*Batchs) XXX_Unmarshal

func (m *Batchs) XXX_Unmarshal(b []byte) error

type BuiltinMetricCfg

type BuiltinMetricCfg struct {
	Name    string `toml:"name" json:"name"`
	Enabled bool   `toml:"enabled" json:"enabled"`
}

type Cache

type Cache struct {

	// 每一个窗口创建一个对象,针对这个Window 进行add 操作,最终到达容忍时间,整个windows会从map中删除
	// key:容忍时间+窗口时间。
	WindowsBuckets map[int64]*Windows

	Expired time.Duration
	// contains filtered or unexported fields
}

func NewCache

func NewCache(exp time.Duration) *Cache

func (*Cache) AddBatch

func (c *Cache) AddBatch(token string, batch *AggregationBatch) (n, expN int)

func (*Cache) AddBatchs

func (c *Cache) AddBatchs(token string, batchs []*AggregationBatch) (n, expN int)

func (*Cache) GetAndSetBucket

func (c *Cache) GetAndSetBucket(exp int64, token string, cal Calculator)

func (*Cache) GetExpWidows

func (c *Cache) GetExpWidows() []*Window

type Calculator

type Calculator interface {
	Add(any)
	Aggr() ([]*point.Point, error)
	Reset()
	Base() *MetricBase
	ToString() string
}

type DataBatch

type DataBatch struct {
	CollectorId string        `protobuf:"bytes,1,opt,name=collector_id,json=collectorId,proto3" json:"collector_id,omitempty"`
	Packets     []*DataPacket `protobuf:"bytes,2,rep,name=packets,proto3" json:"packets,omitempty"`
}

batch packages

func (*DataBatch) Descriptor

func (*DataBatch) Descriptor() ([]byte, []int)

func (*DataBatch) Equal

func (this *DataBatch) Equal(that interface{}) bool

func (*DataBatch) GetCollectorId

func (m *DataBatch) GetCollectorId() string

func (*DataBatch) GetPackets

func (m *DataBatch) GetPackets() []*DataPacket

func (*DataBatch) GoString

func (this *DataBatch) GoString() string

func (*DataBatch) Marshal

func (m *DataBatch) Marshal() (dAtA []byte, err error)

func (*DataBatch) MarshalTo

func (m *DataBatch) MarshalTo(dAtA []byte) (int, error)

func (*DataBatch) MarshalToSizedBuffer

func (m *DataBatch) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*DataBatch) ProtoMessage

func (*DataBatch) ProtoMessage()

func (*DataBatch) Reset

func (m *DataBatch) Reset()

func (*DataBatch) Size

func (m *DataBatch) Size() (n int)

func (*DataBatch) String

func (this *DataBatch) String() string

func (*DataBatch) Unmarshal

func (m *DataBatch) Unmarshal(dAtA []byte) error

func (*DataBatch) XXX_DiscardUnknown

func (m *DataBatch) XXX_DiscardUnknown()

func (*DataBatch) XXX_Marshal

func (m *DataBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*DataBatch) XXX_Merge

func (m *DataBatch) XXX_Merge(src proto.Message)

func (*DataBatch) XXX_Size

func (m *DataBatch) XXX_Size() int

func (*DataBatch) XXX_Unmarshal

func (m *DataBatch) XXX_Unmarshal(b []byte) error

type DataGroup

type DataGroup struct {
	FirstSeen   time.Time
	ExpiredTime int64
	// contains filtered or unexported fields
}

DataGroup 是时间轮中的 entry, 仅持有原始 point 二进制切片,避免长期持有展开对象图。

func (*DataGroup) Reset

func (dg *DataGroup) Reset()

Reset 清理函数.

type DataPacket

type DataPacket struct {
	GroupIdHash            uint64 `protobuf:"varint,1,opt,name=group_id_hash,json=groupIdHash,proto3" json:"group_id_hash,omitempty"`
	RawGroupId             string `protobuf:"bytes,2,opt,name=raw_group_id,json=rawGroupId,proto3" json:"raw_group_id,omitempty"`
	Token                  string `protobuf:"bytes,3,opt,name=token,proto3" json:"token,omitempty"`
	Source                 string `protobuf:"bytes,4,opt,name=source,proto3" json:"source,omitempty"`
	DataType               string `protobuf:"bytes,5,opt,name=data_type,json=dataType,proto3" json:"data_type,omitempty"`
	ConfigVersion          int64  `protobuf:"varint,6,opt,name=config_version,json=configVersion,proto3" json:"config_version,omitempty"`
	HasError               bool   `protobuf:"varint,7,opt,name=has_error,json=hasError,proto3" json:"has_error,omitempty"`
	GroupKey               string `protobuf:"bytes,8,opt,name=group_key,json=groupKey,proto3" json:"group_key,omitempty"`
	PointCount             int32  `protobuf:"varint,9,opt,name=point_count,json=pointCount,proto3" json:"point_count,omitempty"`
	TraceStartTimeUnixNano int64  `` /* 135-byte string literal not displayed */
	TraceEndTimeUnixNano   int64  `` /* 129-byte string literal not displayed */
	PointsPayload          []byte `protobuf:"bytes,12,opt,name=points_payload,json=pointsPayload,proto3" json:"points_payload,omitempty"`
	MaxPointTimeUnixNano   int64  `` /* 129-byte string literal not displayed */
}

数据包:使用连续 PBPoints payload,避免尾采样链路重复解码/编码。

func (*DataPacket) Descriptor

func (*DataPacket) Descriptor() ([]byte, []int)

func (*DataPacket) Equal

func (this *DataPacket) Equal(that interface{}) bool

func (*DataPacket) GetConfigVersion

func (m *DataPacket) GetConfigVersion() int64

func (*DataPacket) GetDataType

func (m *DataPacket) GetDataType() string

func (*DataPacket) GetGroupIdHash

func (m *DataPacket) GetGroupIdHash() uint64

func (*DataPacket) GetGroupKey

func (m *DataPacket) GetGroupKey() string

func (*DataPacket) GetHasError

func (m *DataPacket) GetHasError() bool

func (*DataPacket) GetMaxPointTimeUnixNano

func (m *DataPacket) GetMaxPointTimeUnixNano() int64

func (*DataPacket) GetPointCount

func (m *DataPacket) GetPointCount() int32

func (*DataPacket) GetPointsPayload

func (m *DataPacket) GetPointsPayload() []byte

func (*DataPacket) GetRawGroupId

func (m *DataPacket) GetRawGroupId() string

func (*DataPacket) GetSource

func (m *DataPacket) GetSource() string

func (*DataPacket) GetToken

func (m *DataPacket) GetToken() string

func (*DataPacket) GetTraceEndTimeUnixNano

func (m *DataPacket) GetTraceEndTimeUnixNano() int64

func (*DataPacket) GetTraceStartTimeUnixNano

func (m *DataPacket) GetTraceStartTimeUnixNano() int64

func (*DataPacket) GoString

func (this *DataPacket) GoString() string

func (*DataPacket) Marshal

func (m *DataPacket) Marshal() (dAtA []byte, err error)

func (*DataPacket) MarshalTo

func (m *DataPacket) MarshalTo(dAtA []byte) (int, error)

func (*DataPacket) MarshalToSizedBuffer

func (m *DataPacket) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*DataPacket) ProtoMessage

func (*DataPacket) ProtoMessage()

func (*DataPacket) Reset

func (m *DataPacket) Reset()

func (*DataPacket) Size

func (m *DataPacket) Size() (n int)

func (*DataPacket) String

func (this *DataPacket) String() string

func (*DataPacket) Unmarshal

func (m *DataPacket) Unmarshal(dAtA []byte) error

func (*DataPacket) WalkRawPBPoints

func (packet *DataPacket) WalkRawPBPoints(fn func([]byte) bool) error

func (*DataPacket) XXX_DiscardUnknown

func (m *DataPacket) XXX_DiscardUnknown()

func (*DataPacket) XXX_Marshal

func (m *DataPacket) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*DataPacket) XXX_Merge

func (m *DataPacket) XXX_Merge(src proto.Message)

func (*DataPacket) XXX_Size

func (m *DataPacket) XXX_Size() int

func (*DataPacket) XXX_Unmarshal

func (m *DataPacket) XXX_Unmarshal(b []byte) error

type DerivedMetric

type DerivedMetric struct {
	Name      string     `toml:"name" json:"name"`
	Type      AlgoMethod `toml:"type" json:"type"`
	Condition string     `toml:"condition" json:"condition"`
	Groupby   []string   `toml:"group_by" json:"group_by"`
}

type DerivedMetricCollector

type DerivedMetricCollector struct {
	// contains filtered or unexported fields
}

func NewDerivedMetricCollector

func NewDerivedMetricCollector(window time.Duration) *DerivedMetricCollector

func (*DerivedMetricCollector) Add

func (c *DerivedMetricCollector) Add(records []DerivedMetricRecord)

func (*DerivedMetricCollector) Flush

func (*DerivedMetricCollector) String

func (c *DerivedMetricCollector) String() string

type DerivedMetricDecision

type DerivedMetricDecision string
const (
	DerivedMetricDecisionUnknown DerivedMetricDecision = ""
	DerivedMetricDecisionKept    DerivedMetricDecision = "kept"
	DerivedMetricDecisionDropped DerivedMetricDecision = "dropped"
)

type DerivedMetricKind

type DerivedMetricKind string
const (
	DerivedMetricKindSum       DerivedMetricKind = "sum"
	DerivedMetricKindHistogram DerivedMetricKind = "histogram"
)

type DerivedMetricPoints

type DerivedMetricPoints struct {
	Token string
	PTS   []*point.Point
}

type DerivedMetricRecord

type DerivedMetricRecord struct {
	Token       string
	DataType    string
	MetricName  string
	Kind        DerivedMetricKind
	Stage       DerivedMetricStage
	Decision    DerivedMetricDecision
	Measurement string
	Tags        map[string]string
	Value       float64
	Buckets     []float64
	Time        time.Time
}

DerivedMetricRecord is the lightweight event produced by tail-sampling hooks.

type DerivedMetricStage

type DerivedMetricStage string
const (
	DerivedMetricStageIngest      DerivedMetricStage = "ingest"
	DerivedMetricStagePreDecision DerivedMetricStage = "pre_decision"
	DerivedMetricStageDecision    DerivedMetricStage = "decision"
)

type ExpoHistogramOptions

type ExpoHistogramOptions struct {
	MaxScale     int32 `protobuf:"varint,1,opt,name=max_scale,json=maxScale,proto3" json:"max_scale,omitempty"`
	MaxBuckets   int32 `protobuf:"varint,2,opt,name=max_buckets,json=maxBuckets,proto3" json:"max_buckets,omitempty"`
	RecordMinMax bool  `protobuf:"varint,3,opt,name=record_min_max,json=recordMinMax,proto3" json:"record_min_max,omitempty"`
}

func (*ExpoHistogramOptions) Descriptor

func (*ExpoHistogramOptions) Descriptor() ([]byte, []int)

func (*ExpoHistogramOptions) Equal

func (this *ExpoHistogramOptions) Equal(that interface{}) bool

func (*ExpoHistogramOptions) GetMaxBuckets

func (m *ExpoHistogramOptions) GetMaxBuckets() int32

func (*ExpoHistogramOptions) GetMaxScale

func (m *ExpoHistogramOptions) GetMaxScale() int32

func (*ExpoHistogramOptions) GetRecordMinMax

func (m *ExpoHistogramOptions) GetRecordMinMax() bool

func (*ExpoHistogramOptions) GoString

func (this *ExpoHistogramOptions) GoString() string

func (*ExpoHistogramOptions) Marshal

func (m *ExpoHistogramOptions) Marshal() (dAtA []byte, err error)

func (*ExpoHistogramOptions) MarshalTo

func (m *ExpoHistogramOptions) MarshalTo(dAtA []byte) (int, error)

func (*ExpoHistogramOptions) MarshalToSizedBuffer

func (m *ExpoHistogramOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ExpoHistogramOptions) ProtoMessage

func (*ExpoHistogramOptions) ProtoMessage()

func (*ExpoHistogramOptions) Reset

func (m *ExpoHistogramOptions) Reset()

func (*ExpoHistogramOptions) Size

func (m *ExpoHistogramOptions) Size() (n int)

func (*ExpoHistogramOptions) String

func (this *ExpoHistogramOptions) String() string

func (*ExpoHistogramOptions) Unmarshal

func (m *ExpoHistogramOptions) Unmarshal(dAtA []byte) error

func (*ExpoHistogramOptions) XXX_DiscardUnknown

func (m *ExpoHistogramOptions) XXX_DiscardUnknown()

func (*ExpoHistogramOptions) XXX_Marshal

func (m *ExpoHistogramOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ExpoHistogramOptions) XXX_Merge

func (m *ExpoHistogramOptions) XXX_Merge(src proto.Message)

func (*ExpoHistogramOptions) XXX_Size

func (m *ExpoHistogramOptions) XXX_Size() int

func (*ExpoHistogramOptions) XXX_Unmarshal

func (m *ExpoHistogramOptions) XXX_Unmarshal(b []byte) error

type GlobalSampler

type GlobalSampler struct {
	// contains filtered or unexported fields
}

GlobalSampler 全局管理器.

func NewGlobalSampler

func NewGlobalSampler(shardCount int, waitTime time.Duration) *GlobalSampler

func (*GlobalSampler) AdvanceTime

func (s *GlobalSampler) AdvanceTime() map[uint64]*DataGroup

AdvanceTime 拨动时间轮,返回当前槽位到期的数据.

func (*GlobalSampler) GetLoggingConfig

func (s *GlobalSampler) GetLoggingConfig(token string) *LoggingTailSampling

func (*GlobalSampler) GetRUMConfig

func (s *GlobalSampler) GetRUMConfig(token string) *RUMTailSampling

func (*GlobalSampler) GetTraceConfig

func (s *GlobalSampler) GetTraceConfig(token string) *TraceTailSampling

func (*GlobalSampler) Ingest

func (s *GlobalSampler) Ingest(packet *DataPacket)

func (*GlobalSampler) TailSamplingData

func (s *GlobalSampler) TailSamplingData(dataGroups map[uint64]*DataGroup) map[uint64]*DataPacket

func (*GlobalSampler) TailSamplingOutcomes

func (s *GlobalSampler) TailSamplingOutcomes(dataGroups map[uint64]*DataGroup) map[uint64]*TailSamplingOutcome

func (*GlobalSampler) UpdateConfig

func (s *GlobalSampler) UpdateConfig(token string, ts *TailSamplingConfigs) error

type HistogramOptions

type HistogramOptions struct {
	// for explict histogram(e.g.  10, 50, 100)
	Buckets []float64 `protobuf:"fixed64,1,rep,packed,name=buckets,proto3" json:"buckets,omitempty"`
}

func (*HistogramOptions) Descriptor

func (*HistogramOptions) Descriptor() ([]byte, []int)

func (*HistogramOptions) Equal

func (this *HistogramOptions) Equal(that interface{}) bool

func (*HistogramOptions) GetBuckets

func (m *HistogramOptions) GetBuckets() []float64

func (*HistogramOptions) GoString

func (this *HistogramOptions) GoString() string

func (*HistogramOptions) Marshal

func (m *HistogramOptions) Marshal() (dAtA []byte, err error)

func (*HistogramOptions) MarshalTo

func (m *HistogramOptions) MarshalTo(dAtA []byte) (int, error)

func (*HistogramOptions) MarshalToSizedBuffer

func (m *HistogramOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*HistogramOptions) ProtoMessage

func (*HistogramOptions) ProtoMessage()

func (*HistogramOptions) Reset

func (m *HistogramOptions) Reset()

func (*HistogramOptions) Size

func (m *HistogramOptions) Size() (n int)

func (*HistogramOptions) String

func (this *HistogramOptions) String() string

func (*HistogramOptions) Unmarshal

func (m *HistogramOptions) Unmarshal(dAtA []byte) error

func (*HistogramOptions) XXX_DiscardUnknown

func (m *HistogramOptions) XXX_DiscardUnknown()

func (*HistogramOptions) XXX_Marshal

func (m *HistogramOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HistogramOptions) XXX_Merge

func (m *HistogramOptions) XXX_Merge(src proto.Message)

func (*HistogramOptions) XXX_Size

func (m *HistogramOptions) XXX_Size() int

func (*HistogramOptions) XXX_Unmarshal

func (m *HistogramOptions) XXX_Unmarshal(b []byte) error

type LoggingGroupDimension

type LoggingGroupDimension struct {
	// 分组键(如 user_id, order_id, session_id)
	GroupKey string `toml:"group_key" json:"group_key"`

	// 该分组维度下的采样管道
	Pipelines []*SamplingPipeline `toml:"pipelines" json:"pipelines"`

	// 该分组特有的派生指标
	DerivedMetrics []*DerivedMetric `toml:"derived_metrics" json:"derived_metrics"`
}

func (*LoggingGroupDimension) PickLogging

func (logGroup *LoggingGroupDimension) PickLogging(source string, pts []*point.Point) (map[uint64]*DataPacket, []*point.Point)

type LoggingTailSampling

type LoggingTailSampling struct {
	DataTTL time.Duration `toml:"data_ttl" json:"data_ttl"`
	Version int64         `toml:"version" json:"version"`
	// 内置指标配置,默认全开
	BuiltinMetrics []*BuiltinMetricCfg `toml:"builtin_metrics" json:"builtin_metrics"`

	// 按分组维度配置(不再是全局管道)
	GroupDimensions []*LoggingGroupDimension `toml:"group_dimensions" json:"group_dimensions"`
}

type MetricBase

type MetricBase struct {
	// contains filtered or unexported fields
}

func (*MetricBase) String

func (mb *MetricBase) String() string

type PipelineAction

type PipelineAction string

type PipelineType

type PipelineType string

type PointsData

type PointsData struct {
	PTS   []*point.Point
	Token string
}

func WindowsToData

func WindowsToData(ws []*Window) []*PointsData

type QuantileOptions

type QuantileOptions struct {
	// The target percentiles (e.g. 0.50, 0.99)
	Percentiles []float64 `protobuf:"fixed64,1,rep,packed,name=percentiles,proto3" json:"percentiles,omitempty"`
}

func (*QuantileOptions) Descriptor

func (*QuantileOptions) Descriptor() ([]byte, []int)

func (*QuantileOptions) Equal

func (this *QuantileOptions) Equal(that interface{}) bool

func (*QuantileOptions) GetPercentiles

func (m *QuantileOptions) GetPercentiles() []float64

func (*QuantileOptions) GoString

func (this *QuantileOptions) GoString() string

func (*QuantileOptions) Marshal

func (m *QuantileOptions) Marshal() (dAtA []byte, err error)

func (*QuantileOptions) MarshalTo

func (m *QuantileOptions) MarshalTo(dAtA []byte) (int, error)

func (*QuantileOptions) MarshalToSizedBuffer

func (m *QuantileOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*QuantileOptions) ProtoMessage

func (*QuantileOptions) ProtoMessage()

func (*QuantileOptions) Reset

func (m *QuantileOptions) Reset()

func (*QuantileOptions) Size

func (m *QuantileOptions) Size() (n int)

func (*QuantileOptions) String

func (this *QuantileOptions) String() string

func (*QuantileOptions) Unmarshal

func (m *QuantileOptions) Unmarshal(dAtA []byte) error

func (*QuantileOptions) XXX_DiscardUnknown

func (m *QuantileOptions) XXX_DiscardUnknown()

func (*QuantileOptions) XXX_Marshal

func (m *QuantileOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*QuantileOptions) XXX_Merge

func (m *QuantileOptions) XXX_Merge(src proto.Message)

func (*QuantileOptions) XXX_Size

func (m *QuantileOptions) XXX_Size() int

func (*QuantileOptions) XXX_Unmarshal

func (m *QuantileOptions) XXX_Unmarshal(b []byte) error

type RUMGroupDimension

type RUMGroupDimension struct {
	GroupKey       string              `toml:"group_key" json:"group_key"` // session_id, user_id, page_id
	Pipelines      []*SamplingPipeline `toml:"pipelines" json:"pipelines"`
	DerivedMetrics []*DerivedMetric    `toml:"derived_metrics" json:"derived_metrics"`
}

func (*RUMGroupDimension) PickRUM

func (rumGroup *RUMGroupDimension) PickRUM(source string, pts []*point.Point) (map[uint64]*DataPacket, []*point.Point)

type RUMTailSampling

type RUMTailSampling struct {
	DataTTL time.Duration `toml:"data_ttl" json:"data_ttl"`
	Version int64         `toml:"version" json:"version"`
	// 内置指标配置,默认全开
	BuiltinMetrics []*BuiltinMetricCfg `toml:"builtin_metrics" json:"builtin_metrics"`

	// RUM可能也有多个分组维度
	GroupDimensions []*RUMGroupDimension `toml:"group_dimensions" json:"group_dimensions"`
}

RUMTailSampling holds the RUM tail-sampling configuration.

type RuleSelector

type RuleSelector struct {
	Category     string   `toml:"category" json:"category"`
	Measurements []string `toml:"measurements" json:"measurements"`
	MetricName   []string `toml:"metric_name" json:"metric_name"`
	Condition    string   `toml:"condition" json:"condition"`
	// contains filtered or unexported fields
}

RuleSelector selects measurements and fields among points.

func (*RuleSelector) Setup

func (rs *RuleSelector) Setup() error

Setup initializes the rule selector with validation and prepares whitelists.

type SamplingPipeline

type SamplingPipeline struct {
	Name      string         `toml:"name" json:"name"`
	Type      PipelineType   `toml:"type" json:"type"`
	Condition string         `toml:"condition,omitempty" json:"condition,omitempty"`
	Action    PipelineAction `toml:"action,omitempty" json:"action,omitempty"`
	Rate      float64        `toml:"rate,omitempty" json:"rate,omitempty"`
	HashKeys  []string       `toml:"hash_keys" json:"hash_keys"`
	// contains filtered or unexported fields
}

func (*SamplingPipeline) Apply

func (sp *SamplingPipeline) Apply() error

func (*SamplingPipeline) DoAction

func (sp *SamplingPipeline) DoAction(td *DataPacket) (bool, *DataPacket)

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

Shard 定义分段桶.

type TailSamplingBuiltinMetric

type TailSamplingBuiltinMetric interface {
	Name() string
	OnIngest(packet *DataPacket) []DerivedMetricRecord
	OnPreDecision(packet *DataPacket) []DerivedMetricRecord
	OnDecision(packet *DataPacket, decision DerivedMetricDecision) []DerivedMetricRecord
}

type TailSamplingBuiltinMetrics

type TailSamplingBuiltinMetrics []TailSamplingBuiltinMetric

func DefaultTailSamplingBuiltinMetrics

func DefaultTailSamplingBuiltinMetrics() TailSamplingBuiltinMetrics

func (TailSamplingBuiltinMetrics) OnDecision

func (TailSamplingBuiltinMetrics) OnIngest

func (TailSamplingBuiltinMetrics) OnPreDecision

func (ms TailSamplingBuiltinMetrics) OnPreDecision(packet *DataPacket) []DerivedMetricRecord

type TailSamplingConfigs

type TailSamplingConfigs struct {
	Tracing *TraceTailSampling   `toml:"trace" json:"trace"`
	Logging *LoggingTailSampling `toml:"logging" json:"logging"`
	RUM     *RUMTailSampling     `toml:"rum" json:"rum"`
	Version int64                `toml:"version" json:"version"`
}

func (*TailSamplingConfigs) Init

func (t *TailSamplingConfigs) Init() error

func (*TailSamplingConfigs) ToString

func (t *TailSamplingConfigs) ToString() string

type TailSamplingOutcome

type TailSamplingOutcome struct {
	Packet       *DataPacket
	SourcePacket *DataPacket
	Decision     DerivedMetricDecision
}

type TailSamplingProcessor

type TailSamplingProcessor struct {
	// contains filtered or unexported fields
}

func NewDefaultTailSamplingProcessor

func NewDefaultTailSamplingProcessor(shardCount int, waitTime time.Duration) *TailSamplingProcessor

func NewTailSamplingProcessor

func NewTailSamplingProcessor(
	sampler *GlobalSampler,
	collector *DerivedMetricCollector,
	metrics TailSamplingBuiltinMetrics,
) *TailSamplingProcessor

func (*TailSamplingProcessor) AdvanceTime

func (r *TailSamplingProcessor) AdvanceTime() map[uint64]*DataGroup

func (*TailSamplingProcessor) BuiltinMetrics

func (*TailSamplingProcessor) Collector

func (*TailSamplingProcessor) FlushDerivedMetrics

func (r *TailSamplingProcessor) FlushDerivedMetrics(now time.Time) []*DerivedMetricPoints

func (*TailSamplingProcessor) IngestPacket

func (r *TailSamplingProcessor) IngestPacket(packet *DataPacket)

func (*TailSamplingProcessor) RecordDecision

func (r *TailSamplingProcessor) RecordDecision(packet *DataPacket, decision DerivedMetricDecision)

func (*TailSamplingProcessor) Sampler

func (r *TailSamplingProcessor) Sampler() *GlobalSampler

func (*TailSamplingProcessor) TailSamplingData

func (r *TailSamplingProcessor) TailSamplingData(dataGroups map[uint64]*DataGroup) map[uint64]*DataPacket

func (*TailSamplingProcessor) UpdateConfig

func (r *TailSamplingProcessor) UpdateConfig(token string, cfg *TailSamplingConfigs) error

type TraceTailSampling

type TraceTailSampling struct {
	DataTTL        time.Duration       `toml:"data_ttl" json:"data_ttl"`
	DerivedMetrics []*DerivedMetric    `toml:"derived_metrics" json:"derived_metrics"`
	BuiltinMetrics []*BuiltinMetricCfg `toml:"builtin_metrics" json:"builtin_metrics"`
	Pipelines      []*SamplingPipeline `toml:"sampling_pipeline" json:"pipelines"`
	Version        int64               `toml:"version" json:"version"`

	GroupKey string `toml:"group_key" json:"group_key"` // 链路固定为 "trace_id"
}

type Window

type Window struct {
	Token string // 用户唯一标记
	// contains filtered or unexported fields
}

func (*Window) AddCal

func (w *Window) AddCal(cal Calculator)

func (*Window) Reset

func (w *Window) Reset()

Reset 准备将对象放回池子前调用.

type Windows

type Windows struct {

	// 为方便快速定位到用户数据的所在的window需要一个ID表
	// token -> Window ID
	IDs map[string]int
	// WindowID -> Window
	WS []*Window
	// contains filtered or unexported fields
}

func (*Windows) AddCal

func (ws *Windows) AddCal(token string, cal Calculator)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL