Documentation
¶
Index ¶
- Constants
- Variables
- func GetVersionDesc(val int64) string
- type Admin
- type AdminConsumerRunningInfo
- type AdminOption
- type ClientVersion
- type ClusterInfo
- type Comparable
- type Connection
- type ConnectionInfo
- type ConsumeFromWhere
- type ConsumeMessageDirectlyResult
- type ConsumeStats
- type ConsumeType
- type ConsumerConfigInfo
- type ConsumerConnection
- type ConsumerConnectionInfo
- type ConsumerGroupRollBackStat
- type DataVersion
- type DeleteSubGroupRequest
- type GetMaxOffsetResponseHeader
- type GroupConsumeInfo
- type GroupList
- type KVTable
- type MessageModel
- type MessageTrack
- type MessageView
- type MqAdmin
- func (a *MqAdmin) BrokerConfig(addr string) (*map[string]string, error)
- func (a *MqAdmin) Close() error
- func (a *MqAdmin) ConsumeMessageDirectly(topic, group, clientId, msgId string) (*ConsumeMessageDirectlyResult, error)
- func (a *MqAdmin) ConsumerCreateOrUpdateRequest(config *ConsumerConfigInfo) (bool, error)
- func (a *MqAdmin) CreateTopic(ctx context.Context, opts ...OptionCreate) error
- func (a *MqAdmin) DeleteSubGroup(request *DeleteSubGroupRequest) bool
- func (a *MqAdmin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error
- func (a *MqAdmin) ExamineSubscriptionGroupConfig(group string) ([]*ConsumerConfigInfo, error)
- func (a *MqAdmin) ExamineTopicStats(topic string) (*TopicStatsTable, error)
- func (a *MqAdmin) FetchAllTopicList(ctx context.Context) (*TopicList, error)
- func (a *MqAdmin) FetchBrokerNameSetBySubscriptionGroup(group string) ([]string, error)
- func (a *MqAdmin) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
- func (a *MqAdmin) GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error)
- func (a *MqAdmin) GetConsumerConnectionInfo(group string) (*ConsumerConnectionInfo, error)
- func (a *MqAdmin) List() (map[string]interface{}, error)
- func (a *MqAdmin) QueryConsumeStatsList(topic, group string) ([]*TopicConsumerInfo, error)
- func (a *MqAdmin) QueryConsumeStatsListByGroup(group string) ([]*TopicConsumerInfo, error)
- func (a *MqAdmin) QueryConsumeStatsListByTopicName(topic string) (map[string]*TopicConsumerInfo, error)
- func (a *MqAdmin) QueryConsumerRunningInfo(group, clientId string, jstack bool) (*AdminConsumerRunningInfo, error)
- func (a *MqAdmin) QueryGroup(group string) (*GroupConsumeInfo, error)
- func (a *MqAdmin) QueryGroupList() ([]*GroupConsumeInfo, error)
- func (a *MqAdmin) QueryMessageByTopic(topic string, begin, end int64) ([]*MessageView, error)
- func (a *MqAdmin) QueryMessageByTopicAndKey(topic, key string) ([]*MessageView, error)
- func (a *MqAdmin) QueryTopicConsumeByWho(topic string) (*GroupList, error)
- func (a *MqAdmin) ResetOffset(request *ResetOffsetRequest) (map[string]*ConsumerGroupRollBackStat, error)
- func (a *MqAdmin) ResetOffsetByTimestamp(topic, group string, timestamp int64, isForce bool) (map[primitive.MessageQueue]int64, error)
- func (a *MqAdmin) ResetOffsetByTimestamp2(topic, group string, timestamp int64, isForce, isC bool) (map[primitive.MessageQueue]int64, error)
- func (a *MqAdmin) ResetOffsetByTimestampOld(topic, group string, timestamp int64, isForce bool) []*RollbackStats
- func (a *MqAdmin) SendTopicMsg(request *SendTopicMessageRequest) (*primitive.SendResult, error)
- func (a *MqAdmin) ViewMessage(topic, msgId string) (map[string]interface{}, error)
- type MqClientApi
- func (c *MqClientApi) ConsumeMessageDirectly(addr, group, clientId, msgId string) (*ConsumeMessageDirectlyResult, error)
- func (c *MqClientApi) CreateSubscriptionGroup(addr string, config *SubscriptionGroupConfig) error
- func (c *MqClientApi) DeleteSubscriptionGroup(addr, group string) error
- func (c *MqClientApi) ExamineConsumeStats(group string) *ConsumeStats
- func (c *MqClientApi) ExamineConsumeStatsWithTopic(group string, topic string) *ConsumeStats
- func (c *MqClientApi) ExamineConsumerConnectionInfo(group string) (*ConsumerConnection, error)
- func (c *MqClientApi) GetBrokerClusterInfo() (*ClusterInfo, error)
- func (c *MqClientApi) GetConsumeStats(addr, group, topic string) (*ConsumeStats, error)
- func (c *MqClientApi) GetConsumeStatsNoTopic(addr, group string) (*ConsumeStats, error)
- func (c *MqClientApi) GetConsumerConnectionList(addr, group string) (*ConsumerConnection, error)
- func (c *MqClientApi) GetConsumerRunningInfo(addr, group, clientId string, jstack bool) (*AdminConsumerRunningInfo, error)
- func (c *MqClientApi) GetMaxOffset(addr, topic string, queueId int) (int64, error)
- func (c *MqClientApi) GetTopicStatsInfo(addr, topic string) (*TopicStatsTable, error)
- func (c *MqClientApi) InvokeBrokerToResetOffset(addr, topic, group string, timestamp int64, isForce bool) (map[primitive.MessageQueue]int64, error)
- func (c *MqClientApi) InvokeBrokerToResetOffset2(addr, topic, group string, timestamp int64, isForce, isC bool) (map[primitive.MessageQueue]int64, error)
- func (c *MqClientApi) QueryTopicConsumeByWho(addr, topic string) (*GroupList, error)
- func (c *MqClientApi) QueryTopicRouteInfo(topic string) (*internal.TopicRouteData, error)
- func (c *MqClientApi) SearchOffset(addr, topic string, queueId int, timestamp int64) (int64, error)
- func (c *MqClientApi) UpdateConsumerOffset(addr string, header *internal.UpdateConsumerOffsetRequestHeader) error
- type MqConsumerApi
- func (c *MqConsumerApi) GetMessageById(msgId string) (*MessageView, error)
- func (c *MqConsumerApi) GetMessageByUniqKey(topic, uniqKey string) (*MessageView, error)
- func (c *MqConsumerApi) QueryMessageByTopic(topic string, begin, end int64) ([]*MessageView, error)
- func (c *MqConsumerApi) QueryMessageByTopicAndKey(topic, key string) ([]*MessageView, error)
- func (c *MqConsumerApi) ViewMessage(topic, msgId string) (map[string]interface{}, error)
- type OffsetWrapper
- type OptionCreate
- func WithBrokerAddrCreate(BrokerAddr string) OptionCreate
- func WithOrder(Order bool) OptionCreate
- func WithPerm(Perm int) OptionCreate
- func WithReadQueueNums(ReadQueueNums int) OptionCreate
- func WithTopicCreate(Topic string) OptionCreate
- func WithTopicFilterType(TopicFilterType string) OptionCreate
- func WithTopicSysFlag(TopicSysFlag int) OptionCreate
- func WithWriteQueueNums(WriteQueueNums int) OptionCreate
- type OptionDelete
- type QueryMessageResponseHeader
- type QueryResult
- type QueueStatInfo
- type RemotingSerializable
- func (r *RemotingSerializable) Decode(data []byte, classOfT interface{}) (interface{}, error)
- func (r *RemotingSerializable) Encode(obj interface{}) ([]byte, error)
- func (r *RemotingSerializable) FromJson(jsonStr string, classOfT interface{}) (interface{}, error)
- func (r *RemotingSerializable) ToJson(obj interface{}, prettyFormat bool) string
- type ResetOffsetRequest
- type RollbackStats
- type SendTopicMessageRequest
- type SortOpts
- type SubscriptionData
- type SubscriptionGroupConfig
- type SubscriptionGroupWrapper
- type TopicConfigCreate
- type TopicConfigDelete
- type TopicConsumerInfo
- type TopicList
- type TopicOffset
- type TopicStatsTable
- type TrackType
Constants ¶
View Source
const ( CONSUMED = TrackType("CONSUMED") CONSUMED_BUT_FILTERED = TrackType("CONSUMED_BUT_FILTERED") PULL = TrackType("PULL") NOT_CONSUME_YET = TrackType("NOT_CONSUME_YET") NOT_ONLINE = TrackType("NOT_ONLINE") UNKNOWN = TrackType("UNKNOWN") )
View Source
const ( CONSUME_ACTIVELY = ConsumeType("CONSUME_ACTIVELY") CONSUME_PASSIVELY = ConsumeType("CONSUME_PASSIVELY") BROADCASTING = MessageModel("BROADCASTING") CLUSTERING = MessageModel("CLUSTERING") CONSUME_FROM_LAST_OFFSET = ConsumeFromWhere("CONSUME_FROM_LAST_OFFSET") CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST = ConsumeFromWhere("CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST") CONSUME_FROM_MIN_OFFSET = ConsumeFromWhere("CONSUME_FROM_MIN_OFFSET") CONSUME_FROM_MAX_OFFSET = ConsumeFromWhere("CONSUME_FROM_MAX_OFFSET") CONSUME_FROM_FIRST_OFFSET = ConsumeFromWhere("CONSUME_FROM_FIRST_OFFSET") CONSUME_FROM_TIMESTAMP = ConsumeFromWhere("CONSUME_FROM_TIMESTAMP") )
Variables ¶
View Source
var CurVersion = "V4_9_6"
Functions ¶
func GetVersionDesc ¶ added in v1.0.5
Types ¶
type Admin ¶
type Admin interface {
CreateTopic(ctx context.Context, opts ...OptionCreate) error
DeleteTopic(ctx context.Context, opts ...OptionDelete) error
GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error)
FetchAllTopicList(ctx context.Context) (*TopicList, error)
QueryTopicRouteInfo(topic string) (*internal.TopicRouteData, error)
QueryConsumersByTopic(topic string) (*internal.TopicRouteData, error)
QueryConsumerConnectsByGroup(ctx context.Context, group string) (*ConsumerConnection, error)
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
Close() error
}
type AdminConsumerRunningInfo ¶ added in v1.0.4
type AdminConsumerRunningInfo struct {
*internal.ConsumerRunningInfo
RemotingSerializable
}
type AdminOption ¶
type AdminOption func(options *adminOptions)
func WithCredentials ¶
func WithCredentials(c primitive.Credentials) AdminOption
func WithNamespace ¶
func WithNamespace(namespace string) AdminOption
WithNamespace set the namespace of admin
func WithResolver ¶
func WithResolver(resolver primitive.NsResolver) AdminOption
WithResolver nameserver resolver to fetch nameserver addr
type ClientVersion ¶
type ClientVersion string
func (*ClientVersion) String ¶ added in v1.0.5
func (c *ClientVersion) String() string
type ClusterInfo ¶
type ClusterInfo struct {
BrokerAddrTable map[string]*internal.BrokerData `json:"brokerAddrTable"`
ClusterAddrTable map[string][]string `json:"clusterAddrTable"`
RemotingSerializable
}
type Comparable ¶ added in v1.0.5
type Comparable interface {
CompareTo(o interface{}) int
}
type Connection ¶
type Connection struct {
ClientId string `json:"clientId"`
ClientAddr string `json:"clientAddr"`
Language remote.LanguageCode `json:"language"`
Version int64 `json:"version"`
}
type ConnectionInfo ¶ added in v1.0.5
type ConnectionInfo struct {
Connection
VersionDesc string `json:"versionDesc"`
}
type ConsumeFromWhere ¶
type ConsumeFromWhere string
type ConsumeMessageDirectlyResult ¶ added in v1.0.7
type ConsumeStats ¶
type ConsumeStats struct {
OffsetTable map[*primitive.MessageQueue]*OffsetWrapper `json:"-"`
ConsumeTps float64 `json:"consumeTps"`
RemotingSerializable
}
func (*ConsumeStats) ComputeTotalDiff ¶
func (c *ConsumeStats) ComputeTotalDiff() int64
type ConsumeType ¶
type ConsumeType string
type ConsumerConfigInfo ¶ added in v1.0.4
type ConsumerConfigInfo struct {
ClusterNameList []string `json:"clusterNameList"`
BrokerNameList []string `json:"brokerNameList"`
SubscriptionGroupConfig SubscriptionGroupConfig `json:"subscriptionGroupConfig"`
}
type ConsumerConnection ¶
type ConsumerConnection struct {
ConnectionSet []Connection `json:"connectionSet"`
SubscriptionTable map[string]SubscriptionData `json:"subscriptionTable"`
ConsumeType ConsumeType `json:"consumeType"`
MessageModel MessageModel `json:"messageModel"`
ConsumeFromWhere ConsumeFromWhere `json:"consumeFromWhere"`
RemotingSerializable
}
func (*ConsumerConnection) ComputeMinVersion ¶
func (c *ConsumerConnection) ComputeMinVersion() int64
type ConsumerConnectionInfo ¶ added in v1.0.5
type ConsumerConnectionInfo struct {
ConnectionSet []*ConnectionInfo `json:"connectionSet"`
*ConsumerConnection
}
type ConsumerGroupRollBackStat ¶ added in v1.0.4
type ConsumerGroupRollBackStat struct {
Status bool `json:"status"`
ErrMsg string `json:"errMsg"`
RollbackStatsList []*RollbackStats `json:"rollbackStatsList"`
}
type DataVersion ¶
type DeleteSubGroupRequest ¶ added in v1.0.4
type GetMaxOffsetResponseHeader ¶ added in v1.0.4
type GetMaxOffsetResponseHeader struct {
Offset int64 `json:"offset"`
RemotingSerializable
}
type GroupConsumeInfo ¶
type GroupConsumeInfo struct {
Group string `json:"group"`
Version string `json:"version"`
Count int `json:"count"`
ConsumeType ConsumeType `json:"consumeType"`
MessageModel MessageModel `json:"messageModel"`
ConsumeTps int `json:"consumeTps"`
DiffTotal int64 `json:"diffTotal"`
}
func (*GroupConsumeInfo) CompareTo ¶ added in v1.0.5
func (g *GroupConsumeInfo) CompareTo(o interface{}) int
type GroupList ¶ added in v1.0.5
type GroupList struct {
GroupList []string `json:"groupList"`
RemotingSerializable
}
type KVTable ¶ added in v1.0.7
type KVTable struct {
Table map[string]string `json:"table"`
RemotingSerializable
}
type MessageModel ¶
type MessageModel string
type MessageTrack ¶ added in v1.0.5
type MessageView ¶ added in v1.0.5
type MessageView struct {
*primitive.MessageExt
MessageBody string `json:"messageBody"`
}
func (*MessageView) CompareTo ¶ added in v1.0.5
func (m *MessageView) CompareTo(o interface{}) int
type MqAdmin ¶
func (*MqAdmin) BrokerConfig ¶ added in v1.0.7
func (*MqAdmin) ConsumeMessageDirectly ¶ added in v1.0.5
func (a *MqAdmin) ConsumeMessageDirectly(topic, group, clientId, msgId string) (*ConsumeMessageDirectlyResult, error)
func (*MqAdmin) ConsumerCreateOrUpdateRequest ¶ added in v1.0.4
func (a *MqAdmin) ConsumerCreateOrUpdateRequest(config *ConsumerConfigInfo) (bool, error)
func (*MqAdmin) CreateTopic ¶
func (a *MqAdmin) CreateTopic(ctx context.Context, opts ...OptionCreate) error
CreateTopic create topic. TODO: another implementation like sarama, without brokerAddr as input
func (*MqAdmin) DeleteSubGroup ¶ added in v1.0.4
func (a *MqAdmin) DeleteSubGroup(request *DeleteSubGroupRequest) bool
func (*MqAdmin) DeleteTopic ¶
func (a *MqAdmin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error
DeleteTopic delete topic in both broker and nameserver.
func (*MqAdmin) ExamineSubscriptionGroupConfig ¶ added in v1.0.4
func (a *MqAdmin) ExamineSubscriptionGroupConfig(group string) ([]*ConsumerConfigInfo, error)
func (*MqAdmin) ExamineTopicStats ¶ added in v1.0.5
func (a *MqAdmin) ExamineTopicStats(topic string) (*TopicStatsTable, error)
func (*MqAdmin) FetchAllTopicList ¶
func (*MqAdmin) FetchBrokerNameSetBySubscriptionGroup ¶ added in v1.0.4
func (*MqAdmin) FetchPublishMessageQueues ¶
func (*MqAdmin) GetAllSubscriptionGroup ¶
func (*MqAdmin) GetConsumerConnectionInfo ¶ added in v1.0.4
func (a *MqAdmin) GetConsumerConnectionInfo(group string) (*ConsumerConnectionInfo, error)
*
- ***消费者相关操作接口
func (*MqAdmin) QueryConsumeStatsList ¶ added in v1.0.4
func (a *MqAdmin) QueryConsumeStatsList(topic, group string) ([]*TopicConsumerInfo, error)
func (*MqAdmin) QueryConsumeStatsListByGroup ¶ added in v1.0.4
func (a *MqAdmin) QueryConsumeStatsListByGroup(group string) ([]*TopicConsumerInfo, error)
func (*MqAdmin) QueryConsumeStatsListByTopicName ¶ added in v1.0.5
func (a *MqAdmin) QueryConsumeStatsListByTopicName(topic string) (map[string]*TopicConsumerInfo, error)
func (*MqAdmin) QueryConsumerRunningInfo ¶ added in v1.0.4
func (a *MqAdmin) QueryConsumerRunningInfo(group, clientId string, jstack bool) (*AdminConsumerRunningInfo, error)
func (*MqAdmin) QueryGroup ¶
func (a *MqAdmin) QueryGroup(group string) (*GroupConsumeInfo, error)
func (*MqAdmin) QueryGroupList ¶
func (a *MqAdmin) QueryGroupList() ([]*GroupConsumeInfo, error)
func (*MqAdmin) QueryMessageByTopic ¶ added in v1.0.5
func (a *MqAdmin) QueryMessageByTopic(topic string, begin, end int64) ([]*MessageView, error)
*
- ***消息相关操作接口
func (*MqAdmin) QueryMessageByTopicAndKey ¶ added in v1.0.5
func (a *MqAdmin) QueryMessageByTopicAndKey(topic, key string) ([]*MessageView, error)
func (*MqAdmin) QueryTopicConsumeByWho ¶ added in v1.0.5
func (*MqAdmin) ResetOffset ¶ added in v1.0.4
func (a *MqAdmin) ResetOffset(request *ResetOffsetRequest) (map[string]*ConsumerGroupRollBackStat, error)
func (*MqAdmin) ResetOffsetByTimestamp ¶ added in v1.0.4
func (*MqAdmin) ResetOffsetByTimestamp2 ¶ added in v1.0.4
func (*MqAdmin) ResetOffsetByTimestampOld ¶ added in v1.0.4
func (a *MqAdmin) ResetOffsetByTimestampOld(topic, group string, timestamp int64, isForce bool) []*RollbackStats
func (*MqAdmin) SendTopicMsg ¶ added in v1.0.5
func (a *MqAdmin) SendTopicMsg(request *SendTopicMessageRequest) (*primitive.SendResult, error)
type MqClientApi ¶ added in v1.0.4
func GetClientApi ¶ added in v1.0.4
func GetClientApi(cli internal.RMQClient) *MqClientApi
func (*MqClientApi) ConsumeMessageDirectly ¶ added in v1.0.5
func (c *MqClientApi) ConsumeMessageDirectly(addr, group, clientId, msgId string) (*ConsumeMessageDirectlyResult, error)
func (*MqClientApi) CreateSubscriptionGroup ¶ added in v1.0.4
func (c *MqClientApi) CreateSubscriptionGroup(addr string, config *SubscriptionGroupConfig) error
func (*MqClientApi) DeleteSubscriptionGroup ¶ added in v1.0.4
func (c *MqClientApi) DeleteSubscriptionGroup(addr, group string) error
func (*MqClientApi) ExamineConsumeStats ¶ added in v1.0.4
func (c *MqClientApi) ExamineConsumeStats(group string) *ConsumeStats
func (*MqClientApi) ExamineConsumeStatsWithTopic ¶ added in v1.0.4
func (c *MqClientApi) ExamineConsumeStatsWithTopic(group string, topic string) *ConsumeStats
func (*MqClientApi) ExamineConsumerConnectionInfo ¶ added in v1.0.4
func (c *MqClientApi) ExamineConsumerConnectionInfo(group string) (*ConsumerConnection, error)
func (*MqClientApi) GetBrokerClusterInfo ¶ added in v1.0.5
func (c *MqClientApi) GetBrokerClusterInfo() (*ClusterInfo, error)
func (*MqClientApi) GetConsumeStats ¶ added in v1.0.4
func (c *MqClientApi) GetConsumeStats(addr, group, topic string) (*ConsumeStats, error)
func (*MqClientApi) GetConsumeStatsNoTopic ¶ added in v1.0.4
func (c *MqClientApi) GetConsumeStatsNoTopic(addr, group string) (*ConsumeStats, error)
func (*MqClientApi) GetConsumerConnectionList ¶ added in v1.0.5
func (c *MqClientApi) GetConsumerConnectionList(addr, group string) (*ConsumerConnection, error)
func (*MqClientApi) GetConsumerRunningInfo ¶ added in v1.0.5
func (c *MqClientApi) GetConsumerRunningInfo(addr, group, clientId string, jstack bool) (*AdminConsumerRunningInfo, error)
func (*MqClientApi) GetMaxOffset ¶ added in v1.0.4
func (c *MqClientApi) GetMaxOffset(addr, topic string, queueId int) (int64, error)
func (*MqClientApi) GetTopicStatsInfo ¶ added in v1.0.4
func (c *MqClientApi) GetTopicStatsInfo(addr, topic string) (*TopicStatsTable, error)
func (*MqClientApi) InvokeBrokerToResetOffset ¶ added in v1.0.4
func (c *MqClientApi) InvokeBrokerToResetOffset(addr, topic, group string, timestamp int64, isForce bool) (map[primitive.MessageQueue]int64, error)
func (*MqClientApi) InvokeBrokerToResetOffset2 ¶ added in v1.0.4
func (c *MqClientApi) InvokeBrokerToResetOffset2(addr, topic, group string, timestamp int64, isForce, isC bool) (map[primitive.MessageQueue]int64, error)
func (*MqClientApi) QueryTopicConsumeByWho ¶ added in v1.0.5
func (c *MqClientApi) QueryTopicConsumeByWho(addr, topic string) (*GroupList, error)
func (*MqClientApi) QueryTopicRouteInfo ¶ added in v1.0.4
func (c *MqClientApi) QueryTopicRouteInfo(topic string) (*internal.TopicRouteData, error)
func (*MqClientApi) SearchOffset ¶ added in v1.0.4
func (*MqClientApi) UpdateConsumerOffset ¶ added in v1.0.4
func (c *MqClientApi) UpdateConsumerOffset(addr string, header *internal.UpdateConsumerOffsetRequestHeader) error
type MqConsumerApi ¶ added in v1.0.5
type MqConsumerApi struct {
Cli internal.RMQClient
Consumer *consumer.DefaultPullConsumer
}
func GetConsumerApi ¶ added in v1.0.5
func GetConsumerApi(cli internal.RMQClient) *MqConsumerApi
func (*MqConsumerApi) GetMessageById ¶ added in v1.0.6
func (c *MqConsumerApi) GetMessageById(msgId string) (*MessageView, error)
func (*MqConsumerApi) GetMessageByUniqKey ¶ added in v1.0.6
func (c *MqConsumerApi) GetMessageByUniqKey(topic, uniqKey string) (*MessageView, error)
func (*MqConsumerApi) QueryMessageByTopic ¶ added in v1.0.5
func (c *MqConsumerApi) QueryMessageByTopic(topic string, begin, end int64) ([]*MessageView, error)
func (*MqConsumerApi) QueryMessageByTopicAndKey ¶ added in v1.0.5
func (c *MqConsumerApi) QueryMessageByTopicAndKey(topic, key string) ([]*MessageView, error)
func (*MqConsumerApi) ViewMessage ¶ added in v1.0.5
func (c *MqConsumerApi) ViewMessage(topic, msgId string) (map[string]interface{}, error)
type OffsetWrapper ¶
type OptionCreate ¶
type OptionCreate func(*TopicConfigCreate)
func WithBrokerAddrCreate ¶
func WithBrokerAddrCreate(BrokerAddr string) OptionCreate
func WithOrder ¶
func WithOrder(Order bool) OptionCreate
func WithPerm ¶
func WithPerm(Perm int) OptionCreate
func WithReadQueueNums ¶
func WithReadQueueNums(ReadQueueNums int) OptionCreate
func WithTopicCreate ¶
func WithTopicCreate(Topic string) OptionCreate
func WithTopicFilterType ¶
func WithTopicFilterType(TopicFilterType string) OptionCreate
func WithTopicSysFlag ¶
func WithTopicSysFlag(TopicSysFlag int) OptionCreate
func WithWriteQueueNums ¶
func WithWriteQueueNums(WriteQueueNums int) OptionCreate
type OptionDelete ¶
type OptionDelete func(*TopicConfigDelete)
func WithBrokerAddrDelete ¶
func WithBrokerAddrDelete(BrokerAddr string) OptionDelete
func WithClusterName ¶
func WithClusterName(ClusterName string) OptionDelete
func WithNameSrvAddr ¶
func WithNameSrvAddr(NameSrvAddr []string) OptionDelete
func WithTopicDelete ¶
func WithTopicDelete(Topic string) OptionDelete
type QueryMessageResponseHeader ¶ added in v1.0.5
type QueryMessageResponseHeader struct {
IndexLastUpdateTimestamp int64 `json:"indexLastUpdateTimestamp"`
IndexLastUpdatePhyoffset int64 `json:"indexLastUpdatePhyoffset"`
}
func (*QueryMessageResponseHeader) Decode ¶ added in v1.0.5
func (request *QueryMessageResponseHeader) Decode(properties map[string]string)
type QueryResult ¶ added in v1.0.5
type QueryResult struct {
IndexLastUpdateTimestamp int64 `json:"indexLastUpdateTimestamp"`
MessageList []*MessageView `json:"messageList"`
}
type QueueStatInfo ¶ added in v1.0.4
type QueueStatInfo struct {
BrokerName string `json:"brokerName"`
QueueId int `json:"queueId"`
BrokerOffset int64 `json:"brokerOffset"`
ConsumerOffset int64 `json:"consumerOffset"`
ClientInfo string `json:"clientInfo"`
LastTimestamp int64 `json:"lastTimestamp"`
}
func (*QueueStatInfo) FromOffsetTableEntry ¶ added in v1.0.4
func (q *QueueStatInfo) FromOffsetTableEntry(key *primitive.MessageQueue, value *OffsetWrapper)
type RemotingSerializable ¶
type RemotingSerializable struct {
ObjToKey bool `json:"-"`
}
func (*RemotingSerializable) Decode ¶
func (r *RemotingSerializable) Decode(data []byte, classOfT interface{}) (interface{}, error)
func (*RemotingSerializable) Encode ¶
func (r *RemotingSerializable) Encode(obj interface{}) ([]byte, error)
func (*RemotingSerializable) FromJson ¶
func (r *RemotingSerializable) FromJson(jsonStr string, classOfT interface{}) (interface{}, error)
func (*RemotingSerializable) ToJson ¶
func (r *RemotingSerializable) ToJson(obj interface{}, prettyFormat bool) string
type ResetOffsetRequest ¶ added in v1.0.4
type RollbackStats ¶ added in v1.0.4
type SendTopicMessageRequest ¶ added in v1.0.5
type SortOpts ¶ added in v1.0.5
type SortOpts struct {
// contains filtered or unexported fields
}
func InitSortCli ¶ added in v1.0.5
func InitSortCli() *SortOpts
func (*SortOpts) GetData ¶ added in v1.0.5
func (s *SortOpts) GetData() []Comparable
type SubscriptionData ¶
type SubscriptionGroupConfig ¶
type SubscriptionGroupWrapper ¶
type SubscriptionGroupWrapper struct {
SubscriptionGroupTable map[string]SubscriptionGroupConfig
DataVersion DataVersion
RemotingSerializable
}
type TopicConfigCreate ¶
type TopicConfigDelete ¶
type TopicConsumerInfo ¶ added in v1.0.4
type TopicConsumerInfo struct {
Topic string `json:"topic"`
DiffTotal int64 `json:"diffTotal"`
LastTimestamp int64 `json:"lastTimestamp"`
QueueStatInfoList []*QueueStatInfo `json:"queueStatInfoList"`
}
func (*TopicConsumerInfo) AppendQueueStatInfo ¶ added in v1.0.4
func (t *TopicConsumerInfo) AppendQueueStatInfo(queueStat *QueueStatInfo)
type TopicList ¶
type TopicList struct {
TopicList []string `json:"topicList"`
BrokerAddr string `json:"brokerAddr"`
RemotingSerializable
}
type TopicOffset ¶ added in v1.0.4
type TopicStatsTable ¶ added in v1.0.4
type TopicStatsTable struct {
OffsetTable map[*primitive.MessageQueue]*TopicOffset `json:"offsetTable"`
RemotingSerializable
}
Click to show internal directories.
Click to hide internal directories.