Versions in this module Expand all Collapse all v0 v0.0.1 Mar 15, 2022 Changes in this version + const DefaultMaxBatchBytes + const DefaultUpdatePublishChannelBufferSize + const InitResponseTimeout + func CreateTopics(brokerAddress string, dialer *kafka.Dialer, topics []string) error + func DeleteTopics(brokerAddress string, dialer *kafka.Dialer, topics []string) error + func GetGroupId(idElements []string) string + func HasTopics(brokerAddress string, dialer *kafka.Dialer, topics []string) (bool, error) + func IsInitRequestforFinish(command int8) bool + func IsInitRequestforStart(command int8) bool + type InitCommand struct + func NewInitCommand(groupId string) *InitCommand + func (ic *InitCommand) CreateInitRequestForFinish() (*kafka.Message, error) + func (ic *InitCommand) CreateInitRequestforStart() (*kafka.Message, error) + func (ic *InitCommand) CreateInitResponseForCreateTopic(errMessage string) (*kafka.Message, error) + func (ic *InitCommand) CreateInitResponseForNotFindData() (*kafka.Message, error) + func (ic *InitCommand) GetGroupId() string + func (ic *InitCommand) IsInitResponseforCreateTopic(message *kafka.Message) (bool, error) + func (ic *InitCommand) IsInitResponseforNotFindData(message *kafka.Message) (bool, error) + type InitCommandMessage struct + Command int8 + ErrMessage string + GroupId string + func DecodeInitCommand(message *kafka.Message) (*InitCommandMessage, error) + type MessageQueueConfig struct + func GetMessageQueueConfig(dataClient coreclientset.Interface, kubeClient kubernetes.Interface, ...) (*MessageQueueConfig, error) + func NewMessageQueueConfig(brokers []string, user string, password string) *MessageQueueConfig + func (mqc *MessageQueueConfig) CreateSaslDialer() (*kafka.Dialer, error) + func (mqc *MessageQueueConfig) GetBrokers() []string + func (mqc *MessageQueueConfig) GetPassword() string + func (mqc *MessageQueueConfig) GetUser() string + type MessageQueueInitTopics struct + Init string + Request string + Response string + type MessageQueueTopic struct + func NewMessageQueueFileSystemTopic(nameElements []string) *MessageQueueTopic + func NewMessageQueueRdbTopic(nameElements []string) *MessageQueueTopic + func (mqt *MessageQueueTopic) CreateInitTopics() (*MessageQueueInitTopics, error) + func (mqt *MessageQueueTopic) CreateUpdateTopic() (string, error)