Documentation
¶
Index ¶
- type AccumulationInfo
- type GroupConfig
- type GroupInfo
- type Metadata
- func (m *Metadata) Accumulation(queue, group string) (int64, int64, error)
- func (m *Metadata) AddGroup(group string, queue string, write bool, read bool, url string, ips []string) error
- func (m *Metadata) AddQueue(queue string, idcs []string) error
- func (m *Metadata) Close()
- func (m *Metadata) DelQueue(queue string) error
- func (m *Metadata) DeleteGroup(group string, queue string) error
- func (m *Metadata) ExistGroup(queue, group string) bool
- func (m *Metadata) ExistQueue(queue string) bool
- func (m *Metadata) GetBrokerAddrsByIdc(idcs ...string) map[string][]string
- func (m *Metadata) GetGroupConfig(group string, queue string) (*GroupConfig, error)
- func (m *Metadata) GetGroupMap() map[string][]string
- func (m *Metadata) GetProxyConfigByID(id int) (string, error)
- func (m *Metadata) GetQueueConfig(queue string) *QueueConfig
- func (m *Metadata) GetQueueInfo(queues ...string) ([]*QueueInfo, error)
- func (m *Metadata) GetQueueMap() map[string][]string
- func (m *Metadata) GetQueues() (queues []string)
- func (m *Metadata) LoadMetrics() ([]byte, error)
- func (m *Metadata) LocalManager() *kafka.Manager
- func (m *Metadata) Proxys() (map[string]string, error)
- func (m *Metadata) RefreshMetadata() error
- func (m *Metadata) RegisterService(id int, data string) error
- func (m *Metadata) ResetOffset(queue string, group string, time int64) error
- func (m *Metadata) SaveMetrics(data string) error
- func (m *Metadata) UpdateGroupConfig(group string, queue string, write bool, read bool, url string, ips []string) error
- type Queue
- type QueueConfig
- type QueueInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccumulationInfo ¶
type GroupConfig ¶
type GroupConfig struct {
Group string `json:"group,omitempty"`
Queue string `json:"queue,omitempty"`
Write bool `json:"write"`
Read bool `json:"read"`
Url string `json:"url"`
Ips []string `json:"ips"`
}
func (*GroupConfig) Load ¶
func (c *GroupConfig) Load(data []byte) error
func (*GroupConfig) String ¶
func (c *GroupConfig) String() string
type GroupInfo ¶
type GroupInfo struct {
Group string `json:"group"`
Queues []*GroupConfig `json:"queues,omitempty"`
}
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
func NewMetadata ¶
return a new metadata instance
func (*Metadata) Accumulation ¶
func (*Metadata) AddGroup ¶
func (m *Metadata) AddGroup(group string, queue string, write bool, read bool, url string, ips []string) error
add a group to given queue
func (*Metadata) DeleteGroup ¶
delete given group
func (*Metadata) ExistGroup ¶
Test a group exist
func (*Metadata) GetBrokerAddrsByIdc ¶
func (*Metadata) GetGroupConfig ¶
func (m *Metadata) GetGroupConfig(group string, queue string) (*GroupConfig, error)
func (*Metadata) GetGroupMap ¶
Get queue names of per group
func (*Metadata) GetProxyConfigByID ¶
Get a proxy's config
func (*Metadata) GetQueueConfig ¶
func (m *Metadata) GetQueueConfig(queue string) *QueueConfig
没有深拷贝,目前貌似不需要
func (*Metadata) GetQueueInfo ¶
TODO 回头修改HTTP API时同时修改返回的数据结构,能够最大化简化逻辑
func (*Metadata) GetQueueMap ¶
Get group names of per queue
func (*Metadata) LoadMetrics ¶
func (*Metadata) LocalManager ¶
return local IDC kafka manager
func (*Metadata) RefreshMetadata ¶
refresh metadata from zookeeper
func (*Metadata) RegisterService ¶
register service to zookeeper
func (*Metadata) ResetOffset ¶
reset given queue-group's offset by time
func (*Metadata) SaveMetrics ¶
type Queue ¶
type Queue interface {
Create(queue string, idcs []string) error
Update(queue string) error
Delete(queue string) error
Lookup(queue string, group string) ([]*QueueInfo, error)
AddGroup(group string, queue string, write bool, read bool, url string, ips []string) error
UpdateGroup(group string, queue string, write bool, read bool, url string, ips []string) error
DeleteGroup(group string, queue string) error
LookupGroup(group string) ([]*GroupInfo, error)
GetSingleGroup(group string, queue string) (*GroupConfig, error)
SendMessage(queue string, group string, data []byte, flag uint64) (id string, err error)
RecvMessage(queue string, group string) (id string, data []byte, flag uint64, err error)
AckMessage(queue string, group string, id string) error
AccumulationStatus() ([]AccumulationInfo, error)
Proxys() (map[string]string, error)
GetProxyConfigByID(id int) (string, error)
UpTime() int64
Version() string
Close()
}
type QueueConfig ¶
type QueueConfig struct {
Queue string `json:"queue"`
Ctime int64 `json:"ctime"`
Length int64 `json:"length"`
Groups map[string]GroupConfig `json:"groups,omitempty"`
Idcs []string `json:"idcs,omitempty"`
}
func (*QueueConfig) Parse ¶
func (q *QueueConfig) Parse(data []byte) error
func (*QueueConfig) String ¶
func (q *QueueConfig) String() string
Click to show internal directories.
Click to hide internal directories.