kafka

package
v0.0.0-...-72f28c0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2025 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	BootstrapServers     string // Comma-separated list of brokers: "broker1:9092,broker2:9092"
	RequiredAcks         int    // e.g. 1 or kafka.RequireAll (see kafka-go docs)
	MaxRetries           int    // Maximum number of retry attempts
	MessageChannelBuffer int    // Size of the message channel buffer
	WorkerPoolSize       int    // Number of worker goroutines to process messages
	BatchSize            int    // Maximum number of messages to batch before sending
	BatchTimeoutMs       int    // Maximum time to wait before sending a batch

}

Config holds configuration options for the Kafka producer.

type Consumer

type Consumer interface {
	// Start begins the consumption loop. It reads messages from the configured topics,
	// applies the provided handler (with built-in retry logic), and commits offsets (if using manual commits).
	// This method blocks until the context is cancelled or an unrecoverable error occurs.
	Start(ctx context.Context, handler func(ctx context.Context, msg *kafka.Message) error) error

	// Close gracefully shuts down the consumer.
	Close() error
}

Consumer defines the interface for consuming Kafka messages.

func NewKafkaConsumer

func NewKafkaConsumer(logger zerolog.Logger, cfg ConsumerConfig, producer Producer, metrics metrics.Metrics) (Consumer, error)

NewKafkaConsumer creates and configures a new Kafka consumer. It splits the comma-separated BootstrapServers into a slice of broker addresses.

type ConsumerConfig

type ConsumerConfig struct {
	BootstrapServers      string        // Comma-separated list of brokers: "broker1:9092,broker2:9092"
	GroupID               string        // Consumer group identifier
	Topic                 string        // topic to subscribe to
	CommitInterval        time.Duration // If > 0, automatic commits occur at this interval; if 0, manual commits are required.
	MaxProcessingRetries  int           // Number of attempts to process a message before giving up
	DeadLetterTopic       string        // Topic name for the Dead Letter Queue (DLQ)
	ConsumerChannelBuffer int           // Size of the message channel buffer
	ConsumerWorkerPool    int           // Number of worker goroutines to process messages
}

ConsumerConfig holds configuration options for the Kafka consumer.

type KafkaWriter

type KafkaWriter interface {
	WriteMessages(ctx context.Context, msgs ...kafka.Message) error
	Close() error
}

KafkaWriter is an interface for writing messages to Kafka. It is used to abstract the kafka.Writer dependency for testing.

type Producer

type Producer interface {
	// Produce sends a message to a given topic. If sending fails, a retry is scheduled.
	// The returned channel will receive an error when the message is successfully produced or when an error occurs.
	// The error will be nil if the message is produced successfully.
	// The error will be non-nil if the message could not be produced after all retries
	Produce(ctx context.Context, topic string, key []byte, value []byte) <-chan error

	// Close gracefully shuts down the producer.
	// The context is used to enforce a timeout for the operation.
	Close(ctx context.Context) error
}

Producer is the interface for sending Kafka messages.

func NewKafkaProducer

func NewKafkaProducer(logger zerolog.Logger, cfg Config) (Producer, error)

NewKafkaProducer creates and configures a new Kafka producer.

type Reader

type Reader interface {
	// ReadMessage reads the next message from the topic.
	// This method blocks until a message is available or the context is cancelled.
	// If the context is cancelled, an error is returned.
	ReadMessage(ctx context.Context) (kafka.Message, error)

	// CommitMessages commits the provided messages.
	// This method is used for manual commits.
	// If the context is cancelled, an error is returned.
	CommitMessages(ctx context.Context, msgs ...kafka.Message) error

	// Close gracefully shuts down the reader.
	// This method is used to close the reader when the consumer is shut down.
	Close() error
}

Reader is an interface for reading messages from Kafka. It is used to abstract the kafka.Reader dependency for testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL