Documentation
¶
Overview ¶
added by vincent.zhang for sending to kafka 2017.09.25
Index ¶
- Constants
- Variables
- func DestroyConnPools()
- func Push2GraphSendQueue(items []*cmodel.MetaData)
- func Push2JudgeSendQueue(items []*cmodel.MetaData)
- func Push2KafkaLogSendQueue(items []*LogMetricItem)
- func Push2KafkaSendQueue(items []*cmodel.MetaData)
- func Push2TsdbSendQueue(items []*cmodel.MetaData)
- func Start()
- type KafkaItem
- type KafkaProducer
- type KafkaProducerPool
- type LogMetricItem
Constants ¶
View Source
const ( DefaultProcCronPeriod = time.Duration(5) * time.Second //ProcCron的周期,默认1s DefaultLogCronPeriod = time.Duration(3600) * time.Second //LogCron的周期,默认300s )
View Source
const (
DefaultSendQueueMaxSize = 102400 //10.24w
)
View Source
const (
DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
)
send
Variables ¶
View Source
var ( JudgeNodeRing *rings.ConsistentHashNodeRing GraphNodeRing *rings.ConsistentHashNodeRing )
服务节点的一致性哈希环 pk -> node
View Source
var ( TsdbQueue *nlist.SafeListLimited JudgeQueues = make(map[string]*nlist.SafeListLimited) GraphQueues = make(map[string]*nlist.SafeListLimited) //added by vincent.zhang for sending to kafka KafkaQueue *nlist.SafeListLimited KafkaLogQueue *nlist.SafeListLimited // added by qimin.xu KafkaFilterQueues = make(map[string]*nlist.SafeListLimited) )
发送缓存队列 node -> queue_of_data
View Source
var ( JudgeConnPools *backend.SafeRpcConnPools TsdbConnPoolHelper *backend.TsdbConnPoolHelper GraphConnPools *backend.SafeRpcConnPools )
连接池 node_address -> connection_pool
View Source
var (
MinStep int //最小上报周期,单位sec
)
默认参数
Functions ¶
func DestroyConnPools ¶
func DestroyConnPools()
func Push2GraphSendQueue ¶
将数据 打入 某个Graph的发送缓存队列, 具体是哪一个Graph 由一致性哈希 决定
func Push2JudgeSendQueue ¶
将数据 打入 某个Judge的发送缓存队列, 具体是哪一个Judge 由一致性哈希 决定
func Push2KafkaLogSendQueue ¶
func Push2KafkaLogSendQueue(items []*LogMetricItem)
将原始数据入到kafka Log发送缓存队列
Types ¶
type KafkaItem ¶
type KafkaItem struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Tags string `json:"tags"`
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
DsType string `json:"dstype"`
Step int `json:"step"`
}
added by vincent.zhang for sending to Kafka 转化为kafka格式
func (*KafkaItem) KafkaString ¶
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func NewKafkaProducer ¶
func NewKafkaProducer(name string, address []string) (*KafkaProducer, error)
func (*KafkaProducer) AsyncClose ¶
func (this *KafkaProducer) AsyncClose()
func (*KafkaProducer) Close ¶
func (this *KafkaProducer) Close() error
func (*KafkaProducer) LogRun ¶
func (this *KafkaProducer) LogRun()
added by vincent.zhang for sending string log to kafka
func (*KafkaProducer) Name ¶
func (this *KafkaProducer) Name() string
func (*KafkaProducer) Run ¶
func (this *KafkaProducer) Run()
type KafkaProducerPool ¶
type KafkaProducerPool struct {
sync.RWMutex
Name string
Address []string
MaxProducers int32
MaxIdle int32
Cnt int64
New func(name string, address []string) (*KafkaProducer, error)
// contains filtered or unexported fields
}
Kafka_Producer_Pool
func NewKafkaProducerPool ¶
func NewKafkaProducerPool(name string, address []string, maxProducers int32, maxIdle int32) *KafkaProducerPool
func (*KafkaProducerPool) Destroy ¶
func (this *KafkaProducerPool) Destroy()
func (*KafkaProducerPool) Fetch ¶
func (this *KafkaProducerPool) Fetch() (*KafkaProducer, error)
func (*KafkaProducerPool) ForceClose ¶
func (this *KafkaProducerPool) ForceClose(producer *KafkaProducer)
func (*KafkaProducerPool) Proc ¶
func (this *KafkaProducerPool) Proc() string
func (*KafkaProducerPool) Release ¶
func (this *KafkaProducerPool) Release(producer *KafkaProducer)
type LogMetricItem ¶
type LogMetricItem struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Timestamp int64 `json:"timestamp"`
Value string `json:"value"`
Step int `json:"step"`
}
func (*LogMetricItem) KafkaString ¶
func (this *LogMetricItem) KafkaString() (s string)
Click to show internal directories.
Click to hide internal directories.