Documentation
¶
Index ¶
- Constants
- Variables
- func Init(cfg BackendSection) error
- func RegisterDataSource(pluginId string, datasource DataSource)
- func RegisterPushEndpoint(pluginId string, push PushEndpoint)
- type BackendSection
- type DataSource
- type KafkaData
- type KafkaPushEndpoint
- type KafkaSection
- type KfClient
- type OpenTsdbPushEndpoint
- type OpenTsdbSection
- type PushEndpoint
Constants ¶
View Source
const ( DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms DefaultSendQueueMaxSize = 102400 //10.24w MaxSendRetry = 10 )
send
Variables ¶
View Source
var (
MinStep int //最小上报周期,单位sec
)
View Source
var (
StraPath string
)
Functions ¶
func Init ¶
func Init(cfg BackendSection) error
func RegisterDataSource ¶
func RegisterDataSource(pluginId string, datasource DataSource)
func RegisterPushEndpoint ¶
func RegisterPushEndpoint(pluginId string, push PushEndpoint)
Types ¶
type BackendSection ¶
type BackendSection struct {
DataSource string `yaml:"datasource"`
StraPath string `yaml:"straPath"`
M3db m3db.M3dbSection `yaml:"m3db"`
Tsdb tsdb.TsdbSection `yaml:"tsdb"`
Influxdb influxdb.InfluxdbSection `yaml:"influxdb"`
OpenTsdb OpenTsdbSection `yaml:"opentsdb"`
Kafka KafkaSection `yaml:"kafka"`
}
type DataSource ¶
type DataSource interface {
PushEndpoint
// query data for judge
QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse
// query data for ui
QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse
// query metrics & tags
QueryMetrics(recv dataobj.EndpointsRecv) *dataobj.MetricResp
QueryTagPairs(recv dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp
QueryIndexByClude(recv []dataobj.CludeRecv) []dataobj.XcludeResp
QueryIndexByFullTags(recv []dataobj.IndexByFullTagsRecv) ([]dataobj.IndexByFullTagsResp, int)
// tsdb instance
GetInstance(metric, endpoint string, tags map[string]string) []string
}
func GetDataSourceFor ¶
func GetDataSourceFor(pluginId string) (DataSource, error)
get backend datasource (pluginId == "" for default datasource)
type KafkaPushEndpoint ¶
type KafkaPushEndpoint struct {
// config
Section KafkaSection
// 发送缓存队列 node -> queue_of_data
KafkaQueue chan KafkaData
}
func (*KafkaPushEndpoint) Init ¶
func (kafka *KafkaPushEndpoint) Init()
func (*KafkaPushEndpoint) Push2Queue ¶
func (kafka *KafkaPushEndpoint) Push2Queue(items []*dataobj.MetricValue)
type KafkaSection ¶
type KafkaSection struct {
Enabled bool `yaml:"enabled"`
Name string `yaml:"name"`
Topic string `yaml:"topic"`
BrokersPeers string `yaml:"brokersPeers"`
ConnTimeout int `yaml:"connTimeout"`
CallTimeout int `yaml:"callTimeout"`
MaxRetry int `yaml:"maxRetry"`
KeepAlive int64 `yaml:"keepAlive"`
SaslUser string `yaml:"saslUser"`
SaslPasswd string `yaml:"saslPasswd"`
}
type KfClient ¶
type KfClient struct {
Topic string
BrokersPeers []string
// contains filtered or unexported fields
}
func NewKfClient ¶
func NewKfClient(c KafkaSection) (kafkaSender *KfClient, err error)
type OpenTsdbPushEndpoint ¶
type OpenTsdbPushEndpoint struct {
// config
Section OpenTsdbSection
OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper
// 发送缓存队列 node -> queue_of_data
OpenTsdbQueue *list.SafeListLimited
}
func (*OpenTsdbPushEndpoint) Init ¶
func (opentsdb *OpenTsdbPushEndpoint) Init()
func (*OpenTsdbPushEndpoint) Push2Queue ¶
func (opentsdb *OpenTsdbPushEndpoint) Push2Queue(items []*dataobj.MetricValue)
将原始数据入到tsdb发送缓存队列
type OpenTsdbSection ¶
type OpenTsdbSection struct {
Enabled bool `yaml:"enabled"`
Name string `yaml:"name"`
Batch int `yaml:"batch"`
ConnTimeout int `yaml:"connTimeout"`
CallTimeout int `yaml:"callTimeout"`
WorkerNum int `yaml:"workerNum"`
MaxConns int `yaml:"maxConns"`
MaxIdle int `yaml:"maxIdle"`
MaxRetry int `yaml:"maxRetry"`
Address string `yaml:"address"`
}
type PushEndpoint ¶
type PushEndpoint interface {
// push data
Push2Queue(items []*dataobj.MetricValue)
}
Click to show internal directories.
Click to hide internal directories.