kafka

package
v1.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2025 License: MIT Imports: 5 Imported by: 0

README ¶

📦 Package kafka

Source Path: ./pkg/kafka

🧩 Types

Kafka
type Kafka struct {
	logger logger.LoggerInterface
	producer sarama.SyncProducer
	brokers []string
}
Methods
GetBrokers

GetBrokers returns a list of the Kafka broker addresses that the producer is connected to.

func (k *Kafka) GetBrokers() []string
SendMessage

SendMessage sends a message to the given Kafka topic with the given key and value.

It uses the configured SyncProducer to send the message and logs the result of the send operation. If the send operation fails, it returns an error.

func (k *Kafka) SendMessage(topic string, key string, value []byte) error
StartConsumers

StartConsumers starts a Kafka consumer group to consume messages from the specified topics.

It takes a list of topics, a consumer group ID, and a handler implementing the sarama.ConsumerGroupHandler interface. The method initializes a new Kafka consumer group and begins consuming messages in a background goroutine. If an error occurs during consumption, it retries up to a maximum number of retries with a delay between attempts. Any errors from the consumer group are logged and the function returns an error if the consumer group initialization fails.

func (k *Kafka) StartConsumers(topics []string, groupID string, handler sarama.ConsumerGroupHandler) error

Documentation ¶

Index ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

This section is empty.

Types ¶

type Kafka ¶

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka is a struct that represents a Kafka producer.

func NewKafka ¶

func NewKafka(logger logger.LoggerInterface, brokers []string) *Kafka

NewKafka initializes a new Kafka struct.

It takes a logger and a list of broker addresses as inputs and returns a pointer to the Kafka struct. It creates a new Kafka producer with the given configuration, and logs a message indicating if the connection is successful. If the connection fails, it logs an error message and exits.

func (*Kafka) GetBrokers ¶

func (k *Kafka) GetBrokers() []string

GetBrokers returns a list of the Kafka broker addresses that the producer is connected to.

func (*Kafka) SendMessage ¶

func (k *Kafka) SendMessage(topic string, key string, value []byte) error

SendMessage sends a message to the given Kafka topic with the given key and value.

It uses the configured SyncProducer to send the message and logs the result of the send operation. If the send operation fails, it returns an error.

func (*Kafka) StartConsumers ¶

func (k *Kafka) StartConsumers(topics []string, groupID string, handler sarama.ConsumerGroupHandler) error

StartConsumers starts a Kafka consumer group to consume messages from the specified topics.

It takes a list of topics, a consumer group ID, and a handler implementing the sarama.ConsumerGroupHandler interface. The method initializes a new Kafka consumer group and begins consuming messages in a background goroutine. If an error occurs during consumption, it retries up to a maximum number of retries with a delay between attempts. Any errors from the consumer group are logged and the function returns an error if the consumer group initialization fails.

type MockProducer ¶

type MockProducer struct {
	Messages   []*sarama.ProducerMessage
	ShouldFail bool
}

func (*MockProducer) Close ¶

func (m *MockProducer) Close() error

func (*MockProducer) SendMessage ¶

func (m *MockProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error)

type SyncProducer ¶

type SyncProducer interface {
	SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
	Close() error
}

SyncProducer is an interface that represents a Kafka producer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL