Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { BootstrapServers string // Comma-separated list of brokers: "broker1:9092,broker2:9092" RequiredAcks int // e.g. 1 or kafka.RequireAll (see kafka-go docs) MaxRetries int // Maximum number of retry attempts MessageChannelBuffer int // Size of the message channel buffer WorkerPoolSize int // Number of worker goroutines to process messages BatchSize int // Maximum number of messages to batch before sending BatchTimeoutMs int // Maximum time to wait before sending a batch }
Config holds configuration options for the Kafka producer.
type Consumer ¶
type Consumer interface { // Start begins the consumption loop. It reads messages from the configured topics, // applies the provided handler (with built-in retry logic), and commits offsets (if using manual commits). // This method blocks until the context is cancelled or an unrecoverable error occurs. Start(ctx context.Context, handler func(ctx context.Context, msg *kafka.Message) error) error // Close gracefully shuts down the consumer. Close() error }
Consumer defines the interface for consuming Kafka messages.
func NewKafkaConsumer ¶
func NewKafkaConsumer(logger zerolog.Logger, cfg ConsumerConfig, producer Producer, metrics metrics.Metrics) (Consumer, error)
NewKafkaConsumer creates and configures a new Kafka consumer. It splits the comma-separated BootstrapServers into a slice of broker addresses.
type ConsumerConfig ¶
type ConsumerConfig struct { BootstrapServers string // Comma-separated list of brokers: "broker1:9092,broker2:9092" GroupID string // Consumer group identifier Topic string // topic to subscribe to CommitInterval time.Duration // If > 0, automatic commits occur at this interval; if 0, manual commits are required. MaxProcessingRetries int // Number of attempts to process a message before giving up DeadLetterTopic string // Topic name for the Dead Letter Queue (DLQ) ConsumerChannelBuffer int // Size of the message channel buffer ConsumerWorkerPool int // Number of worker goroutines to process messages }
ConsumerConfig holds configuration options for the Kafka consumer.
type KafkaWriter ¶
type KafkaWriter interface { WriteMessages(ctx context.Context, msgs ...kafka.Message) error Close() error }
KafkaWriter is an interface for writing messages to Kafka. It is used to abstract the kafka.Writer dependency for testing.
type Producer ¶
type Producer interface { // Produce sends a message to a given topic. If sending fails, a retry is scheduled. // The returned channel will receive an error when the message is successfully produced or when an error occurs. // The error will be nil if the message is produced successfully. // The error will be non-nil if the message could not be produced after all retries Produce(ctx context.Context, topic string, key []byte, value []byte) <-chan error // Close gracefully shuts down the producer. // The context is used to enforce a timeout for the operation. Close(ctx context.Context) error }
Producer is the interface for sending Kafka messages.
type Reader ¶
type Reader interface { // ReadMessage reads the next message from the topic. // This method blocks until a message is available or the context is cancelled. // If the context is cancelled, an error is returned. ReadMessage(ctx context.Context) (kafka.Message, error) // CommitMessages commits the provided messages. // This method is used for manual commits. // If the context is cancelled, an error is returned. CommitMessages(ctx context.Context, msgs ...kafka.Message) error // Close gracefully shuts down the reader. // This method is used to close the reader when the consumer is shut down. Close() error }
Reader is an interface for reading messages from Kafka. It is used to abstract the kafka.Reader dependency for testing.
Click to show internal directories.
Click to hide internal directories.