Documentation
¶
Index ¶
- func MessageToProducerMessage(msg *Message) (producerMessage *sarama.ProducerMessage, err error)
- type BrokerInfo
- type Config
- type Group
- type GroupDescription
- type GroupMemberDescription
- type IService
- type Info
- type LeaveGroupResponse
- type MemberResponse
- type Message
- type MessageHeader
- type OffsetFetchResponse
- type OffsetFetchResponseBlock
- type PartitionMetadata
- type ProducerMessage
- type Service
- func (this_ *Service) Close()
- func (this_ *Service) CreatePartitions(topic string, count int32) (err error)
- func (this_ *Service) CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error)
- func (this_ *Service) DeleteConsumerGroup(groupId string) (err error)
- func (this_ *Service) DeleteConsumerGroupOffset(group string, topic string, partition int32) (err error)
- func (this_ *Service) DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error)
- func (this_ *Service) DeleteTopic(topic string) (err error)
- func (this_ *Service) DescribeConsumerGroups(groups []string) (res []*GroupDescription, err error)
- func (this_ *Service) DescribeTopics(topics []string) (res []*TopicMetadata, err error)
- func (this_ *Service) GetClient() (res sarama.Client, err error)
- func (this_ *Service) GetOffset(topic string, partitionID int32, time int64) (offset int64, err error)
- func (this_ *Service) GetServers() []string
- func (this_ *Service) GetTopic(topic string, time int64) (res *TopicInfo, err error)
- func (this_ *Service) GetTopics() (res []*TopicInfo, err error)
- func (this_ *Service) Info() (res *Info, err error)
- func (this_ *Service) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (res *OffsetFetchResponse, err error)
- func (this_ *Service) ListConsumerGroups() (res []*Group, err error)
- func (this_ *Service) MarkOffset(groupId string, topic string, partition int32, offset int64) (err error)
- func (this_ *Service) NewSyncProducer() (syncProducer sarama.SyncProducer, err error)
- func (this_ *Service) Partitions(topic string) (partitions []int32, err error)
- func (this_ *Service) Pull(groupId string, topics []string, PullSize int, PullTimeout int, ...) (msgList []*Message, err error)
- func (this_ *Service) Push(msg *Message) (err error)
- func (this_ *Service) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (res *LeaveGroupResponse, err error)
- func (this_ *Service) ResetOffset(groupId string, topic string, partition int32, offset int64) (err error)
- type TopicInfo
- type TopicMetadata
- type TopicPartition
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MessageToProducerMessage ¶
func MessageToProducerMessage(msg *Message) (producerMessage *sarama.ProducerMessage, err error)
Types ¶
type BrokerInfo ¶
type Config ¶
type Config struct {
// Disabled 禁用 上层 初始化服务时候 可以判断该属性 如果为 配置 true 则不去初始化服务
Disabled bool `json:"disabled,omitempty" yaml:"disabled,omitempty"`
Address string `json:"address" yaml:"address"`
Username string `json:"username,omitempty" yaml:"username,omitempty"`
Password string `json:"password,omitempty" yaml:"password,omitempty"`
CertPath string `json:"certPath,omitempty" yaml:"certPath,omitempty"`
}
Config kafka配置
type GroupDescription ¶
type GroupDescription struct {
// Version defines the protocol version to use for encode and decode
Version int16 `json:"version"`
// Err contains the describe error as the KError type.
Err sarama.KError `json:"err"`
// ErrorCode contains the describe error, or 0 if there was no error.
ErrorCode int16 `json:"errorCode"`
// GroupId contains the group ID string.
GroupId string `json:"groupId"`
// State contains the group state string, or the empty string.
State string `json:"state"`
// ProtocolType contains the group protocol type, or the empty string.
ProtocolType string `json:"protocolType"`
// Protocol contains the group protocol data, or the empty string.
Protocol string `json:"protocol"`
// Members contains the group members.
Members map[string]*GroupMemberDescription `json:"members"`
// AuthorizedOperations contains a 32-bit bitfield to represent authorized
// operations for this group.
AuthorizedOperations int32 `json:"authorizedOperations"`
}
type GroupMemberDescription ¶
type GroupMemberDescription struct {
// Version defines the protocol version to use for encode and decode
Version int16 `json:"version"`
// MemberId contains the member ID assigned by the group coordinator.
MemberId string `json:"memberId"`
// GroupInstanceId contains the unique identifier of the consumer instance
// provided by end user.
GroupInstanceId *string `json:"groupInstanceId"`
// ClientId contains the client ID used in the member's latest join group
// request.
ClientId string `json:"clientId"`
// ClientHost contains the client host.
ClientHost string `json:"clientHost"`
// MemberMetadata contains the metadata corresponding to the current group
// protocol in use.
MemberMetadata []byte `json:"memberMetadata"`
// MemberAssignment contains the current assignment provided by the group
// leader.
MemberAssignment []byte `json:"memberAssignment"`
}
type IService ¶
type IService interface {
// Close 关闭 kafka 客户端
Close()
// Info 查看 kafka 信息
Info() (res *Info, err error)
// GetTopics 获取主题
GetTopics() (res []*TopicInfo, err error)
// GetTopic 获取主题
GetTopic(topic string, time int64) (res *TopicInfo, err error)
// Pull 拉取消息
Pull(groupId string, topics []string, PullSize int, PullTimeout int, keyType, valueType string) (msgList []*Message, err error)
// MarkOffset 提交 位置
MarkOffset(groupId string, topic string, partition int32, offset int64) (err error)
// ResetOffset 重置 位置
ResetOffset(groupId string, topic string, partition int32, offset int64) (err error)
// CreatePartitions 创建 主题 分区
CreatePartitions(topic string, count int32) (err error)
// CreateTopic 创建主题
CreateTopic(topic string, numPartitions int32, replicationFactor int16) (err error)
// DeleteTopic 删除 主题
DeleteTopic(topic string) (err error)
// DeleteConsumerGroup 删除 某个 消费组
DeleteConsumerGroup(groupId string) (err error)
// DeleteRecords 删除 主题 数据
DeleteRecords(topic string, partitionOffsets map[int32]int64) (err error)
// NewSyncProducer 创建 提供者
NewSyncProducer() (syncProducer sarama.SyncProducer, err error)
// Push 推送
Push(msg *Message) (err error)
// GetOffset 获取 主题 某个 分区 最新 位置
GetOffset(topic string, partitionID int32, time int64) (offset int64, err error)
// Partitions 获取 主题 分区
Partitions(topic string) (partitions []int32, err error)
// ListConsumerGroups 查询 所有 消费组
ListConsumerGroups() (res []*Group, err error)
// DescribeConsumerGroups 查询 消费组 明细
DescribeConsumerGroups(groups []string) (res []*GroupDescription, err error)
// DeleteConsumerGroupOffset 删除 消费组 某个主题 分区
DeleteConsumerGroupOffset(group string, topic string, partition int32) (err error)
// ListConsumerGroupOffsets 查询 消费组 主题分区 信息
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (res *OffsetFetchResponse, err error)
// RemoveMemberFromConsumerGroup 删除 消费组 成员
RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (res *LeaveGroupResponse, err error)
// DescribeTopics 主题 元数据
DescribeTopics(topics []string) (res []*TopicMetadata, err error)
// GetClient 获取 kafka 客户端
GetClient() (res sarama.Client, err error)
}
type Info ¶
type Info struct {
Brokers []*BrokerInfo `json:"brokers"`
}
type LeaveGroupResponse ¶
type LeaveGroupResponse struct {
Version int16 `json:"version"`
ThrottleTime int32 `json:"throttleTime"`
Err sarama.KError `json:"err"`
Members []MemberResponse `json:"members"`
}
type MemberResponse ¶
type Message ¶
type Message struct {
KeyType string `json:"keyType,omitempty"`
Key string `json:"key,omitempty"`
ValueType string `json:"valueType,omitempty"`
Value string `json:"value,omitempty"`
Topic string `json:"topic,omitempty"`
Partition *int32 `json:"partition,omitempty"`
Offset *int64 `json:"offset,omitempty"`
Headers []MessageHeader `json:"headers,omitempty"`
Timestamp *time.Time `json:"timestamp,omitempty"`
}
func ConsumerMessageToMessage ¶
func ConsumerMessageToMessage(keyType string, valueType string, consumerMessage *sarama.ConsumerMessage) (msg *Message)
type MessageHeader ¶
type OffsetFetchResponse ¶
type PartitionMetadata ¶
type PartitionMetadata struct {
// Version defines the protocol version to use for encode and decode
Version int16 `json:"version"`
// Err contains the partition error, or 0 if there was no error.
Err sarama.KError `json:"err"`
// ID contains the partition index.
ID int32 `json:"ID"`
// Leader contains the ID of the leader broker.
Leader int32 `json:"leader"`
// LeaderEpoch contains the leader epoch of this partition.
LeaderEpoch int32 `json:"leaderEpoch"`
// Replicas contains the set of all nodes that host this partition.
Replicas []int32 `json:"replicas"`
// Isr contains the set of nodes that are in sync with the leader for this partition.
Isr []int32 `json:"isr"`
// OfflineReplicas contains the set of offline replicas of this partition.
OfflineReplicas []int32 `json:"offlineReplicas"`
}
type ProducerMessage ¶
type ProducerMessage struct {
*sarama.ProducerMessage
}
type Service ¶
type Service struct {
*Config
}
Service 注册处理器在线信息等
func (*Service) CreatePartitions ¶
func (*Service) CreateTopic ¶
func (*Service) DeleteConsumerGroup ¶
func (*Service) DeleteConsumerGroupOffset ¶
func (*Service) DeleteRecords ¶
func (*Service) DeleteTopic ¶
func (*Service) DescribeConsumerGroups ¶
func (this_ *Service) DescribeConsumerGroups(groups []string) (res []*GroupDescription, err error)
func (*Service) DescribeTopics ¶
func (this_ *Service) DescribeTopics(topics []string) (res []*TopicMetadata, err error)
func (*Service) GetOffset ¶
func (this_ *Service) GetOffset(topic string, partitionID int32, time int64) (offset int64, err error)
GetOffset 查询集群以获取 主题/分区组合上的给定时间(以毫秒为单位)。 对于最早的可用偏移,时间应该是OffsetOldest, OffsetNewest是下一次或某一时间将生成的消息的偏移量。
func (*Service) GetServers ¶
func (*Service) ListConsumerGroupOffsets ¶
func (*Service) ListConsumerGroups ¶
func (*Service) MarkOffset ¶
func (*Service) NewSyncProducer ¶
func (this_ *Service) NewSyncProducer() (syncProducer sarama.SyncProducer, err error)
NewSyncProducer 创建生产者
func (*Service) Partitions ¶
func (*Service) RemoveMemberFromConsumerGroup ¶
func (this_ *Service) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (res *LeaveGroupResponse, err error)
type TopicInfo ¶
type TopicInfo struct {
Topic string `json:"topic"`
Partitions []*TopicPartition `json:"partitions"`
}
type TopicMetadata ¶
type TopicMetadata struct {
// Version defines the protocol version to use for encode and decode
Version int16 `json:"version"`
// Err contains the topic error, or 0 if there was no error.
Err sarama.KError `json:"err"`
// Name contains the topic name.
Name string `json:"name"`
// IsInternal contains a True if the topic is internal.
IsInternal bool `json:"isInternal"`
// Partitions contains each partition in the topic.
Partitions []*PartitionMetadata `json:"partitions"`
}
type TopicPartition ¶
Click to show internal directories.
Click to hide internal directories.