Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka is a struct that represents a Kafka producer.
func NewKafka ¶
func NewKafka(logger logger.LoggerInterface, brokers []string) *Kafka
NewKafka initializes a new Kafka struct.
It takes a logger and a list of broker addresses as inputs and returns a pointer to the Kafka struct. It creates a new Kafka producer with the given configuration, and logs a message indicating if the connection is successful. If the connection fails, it logs an error message and exits.
func (*Kafka) GetBrokers ¶
GetBrokers returns a list of the Kafka broker addresses that the producer is connected to.
func (*Kafka) SendMessage ¶
SendMessage sends a message to the given Kafka topic with the given key and value.
It uses the configured SyncProducer to send the message and logs the result of the send operation. If the send operation fails, it returns an error.
func (*Kafka) StartConsumers ¶
func (k *Kafka) StartConsumers(topics []string, groupID string, handler sarama.ConsumerGroupHandler) error
StartConsumers starts a Kafka consumer group to consume messages from the specified topics.
It takes a list of topics, a consumer group ID, and a handler implementing the sarama.ConsumerGroupHandler interface. The method initializes a new Kafka consumer group and begins consuming messages in a background goroutine. If an error occurs during consumption, it retries up to a maximum number of retries with a delay between attempts. Any errors from the consumer group are logged and the function returns an error if the consumer group initialization fails.
type MockProducer ¶
type MockProducer struct {
Messages []*sarama.ProducerMessage
ShouldFail bool
}
func (*MockProducer) Close ¶
func (m *MockProducer) Close() error
func (*MockProducer) SendMessage ¶
func (m *MockProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error)
type SyncProducer ¶
type SyncProducer interface {
SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
Close() error
}
SyncProducer is an interface that represents a Kafka producer.