Documentation
¶
Index ¶
- Constants
- Variables
- func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse
- func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse
- func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam
- func GetCounter(metric, tag string, tagMap map[string]string) (counter string, err error)
- func GetJudges() []string
- func GetSeries(start, end int64, req []SeriesReq) ([]dataobj.QueryData, error)
- func Init(cfg BackendSection)
- func Push2InfluxdbSendQueue(items []*dataobj.MetricValue)
- func Push2JudgeSendQueue(items []*dataobj.MetricValue)
- func Push2OpenTsdbSendQueue(items []*dataobj.MetricValue)
- func Push2TsdbSendQueue(items []*dataobj.MetricValue)
- func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)
- func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)
- func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int)
- func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int)
- func TagMatch(straTags []model.Tag, tag map[string]string) bool
- type BackendSection
- type ClusterNode
- type ConsistentHashRing
- type InfluxClient
- type InfluxdbSection
- type OpenTsdbSection
- type Pool
- type Series
- type SeriesReq
- type SeriesResp
- type Tagkv
Constants ¶
View Source
const ( DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms MaxSendRetry = 10 )
send
View Source
const DefaultSendQueueMaxSize = 102400 //10.24w
Variables ¶
View Source
var ( Config BackendSection // 服务节点的一致性哈希环 pk -> node TsdbNodeRing *ConsistentHashRing // 发送缓存队列 node -> queue_of_data TsdbQueues = make(map[string]*list.SafeListLimited) JudgeQueues = cache.SafeJudgeQueue{} InfluxdbQueue *list.SafeListLimited OpenTsdbQueue *list.SafeListLimited // 连接池 node_address -> connection_pool TsdbConnPools *pools.ConnPools JudgeConnPools *pools.ConnPools OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper )
View Source
var (
MinStep int //最小上报周期,单位sec
)
Functions ¶
func FetchDataForUI ¶
func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse
func GenQParam ¶
func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam
func GetCounter ¶
func Init ¶
func Init(cfg BackendSection)
func Push2InfluxdbSendQueue ¶ added in v1.4.0
func Push2InfluxdbSendQueue(items []*dataobj.MetricValue)
将原始数据插入到influxdb缓存队列
func Push2JudgeSendQueue ¶
func Push2JudgeSendQueue(items []*dataobj.MetricValue)
func Push2OpenTsdbSendQueue ¶ added in v1.4.0
func Push2OpenTsdbSendQueue(items []*dataobj.MetricValue)
将原始数据入到tsdb发送缓存队列
func Push2TsdbSendQueue ¶
func Push2TsdbSendQueue(items []*dataobj.MetricValue)
Push2TsdbSendQueue pushes data to a TSDB instance which depends on the consistent ring.
func QueryOne ¶
func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)
func RebuildConsistentHashRing ¶
func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)
func Send2JudgeTask ¶
func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int)
func Send2TsdbTask ¶
func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int)
Types ¶
type BackendSection ¶
type BackendSection struct {
Enabled bool `yaml:"enabled"`
Batch int `yaml:"batch"`
ConnTimeout int `yaml:"connTimeout"`
CallTimeout int `yaml:"callTimeout"`
WorkerNum int `yaml:"workerNum"`
MaxConns int `yaml:"maxConns"`
MaxIdle int `yaml:"maxIdle"`
IndexTimeout int `yaml:"indexTimeout"`
StraPath string `yaml:"straPath"`
HbsMod string `yaml:"hbsMod"`
Replicas int `yaml:"replicas"`
Cluster map[string]string `yaml:"cluster"`
ClusterList map[string]*ClusterNode `json:"clusterList"`
Influxdb InfluxdbSection `yaml:"influxdb"`
OpenTsdb OpenTsdbSection `yaml:"opentsdb"`
}
type ClusterNode ¶
type ClusterNode struct {
Addrs []string `json:"addrs"`
}
type ConsistentHashRing ¶
func NewConsistentHashRing ¶
func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing
func (*ConsistentHashRing) GetNode ¶
func (c *ConsistentHashRing) GetNode(pk string) (string, error)
func (*ConsistentHashRing) GetRing ¶
func (c *ConsistentHashRing) GetRing() *consistent.Consistent
func (*ConsistentHashRing) Set ¶
func (c *ConsistentHashRing) Set(r *consistent.Consistent)
type InfluxClient ¶ added in v1.4.0
func NewInfluxdbClient ¶ added in v1.4.0
func NewInfluxdbClient() (*InfluxClient, error)
func (*InfluxClient) Send ¶ added in v1.4.0
func (c *InfluxClient) Send(items []*dataobj.InfluxdbItem) error
type InfluxdbSection ¶ added in v1.4.0
type InfluxdbSection struct {
Enabled bool `yaml:"enabled"`
Batch int `yaml:"batch"`
MaxRetry int `yaml:"maxRetry"`
WorkerNum int `yaml:"workerNum"`
Timeout int `yaml:"timeout"`
Address string `yaml:"address"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Precision string `yaml:"precision"`
}
type OpenTsdbSection ¶ added in v1.4.0
type OpenTsdbSection struct {
Enabled bool `yaml:"enabled"`
Batch int `yaml:"batch"`
ConnTimeout int `yaml:"connTimeout"`
CallTimeout int `yaml:"callTimeout"`
WorkerNum int `yaml:"workerNum"`
MaxConns int `yaml:"maxConns"`
MaxIdle int `yaml:"maxIdle"`
MaxRetry int `yaml:"maxRetry"`
Address string `yaml:"address"`
}
type Pool ¶
func SelectPoolByPK ¶
type SeriesResp ¶
Click to show internal directories.
Click to hide internal directories.