kafka

package
v0.74.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package kafka provides some shared interfaces for the Kafka components.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultConsumerSaramaConfig added in v0.61.2

func DefaultConsumerSaramaConfig(name string, readCommitted bool) (*sarama.Config, error)

DefaultConsumerSaramaConfig function creates a Sarama configuration with a client ID derived from host name and consumer name.

Types

type Batch

type Batch interface {
	// Messages of the batch.
	Messages() []Message
}

Batch interface for multiple AWS SQS messages.

func NewBatch

func NewBatch(messages []Message) Batch

NewBatch initializes a new batch of messages returning an instance of the implementation of the kafka Batch interface.

type BatchProcessorFunc

type BatchProcessorFunc func(Batch) error

BatchProcessorFunc definition of a batch async processor function.

type Component added in v0.74.0

type Component struct {
	// contains filtered or unexported fields
}

Component is a kafka consumer implementation that processes messages in batch.

func New added in v0.74.0

func New(name, group string, brokers, topics []string, proc BatchProcessorFunc, saramaCfg *sarama.Config, oo ...OptionFunc) (*Component, error)

New initializes a new kafka consumer component with support for functional configuration. The default failure strategy is the ExitStrategy. The default batch size is 1 and the batch timeout is 100ms. The default number of retries is 0 and the retry wait is 0.

func (*Component) Run added in v0.74.0

func (c *Component) Run(ctx context.Context) error

Run starts the consumer processing loop to process messages from Kafka.

type FailStrategy

type FailStrategy int

FailStrategy type definition.

const (
	// ExitStrategy does not commit failed message offsets and exits the application.
	ExitStrategy FailStrategy = iota
	// SkipStrategy commits the offset of messages that failed processing, and continues processing.
	SkipStrategy
)

type Message

type Message interface {
	// Context will contain the context to be used for processing.
	// Each context will have a logger setup which can be used to create a logger from context.
	Context() context.Context
	// Message will contain the raw Kafka message.
	Message() *sarama.ConsumerMessage
	// Span contains the tracing span of this message.
	Span() trace.Span
}

Message interface for wrapping messages that are handled by the kafka component.

func NewMessage

func NewMessage(ctx context.Context, sp trace.Span, msg *sarama.ConsumerMessage) Message

NewMessage initializes a new message which is an implementation of the kafka Message interface.

type OptionFunc added in v0.74.0

type OptionFunc func(*Component) error

OptionFunc definition for configuring the component in a functional way.

func WithBatchMessageDeduplication added in v0.74.0

func WithBatchMessageDeduplication() OptionFunc

WithBatchMessageDeduplication enables the deduplication of messages based on the message's key. This implementation does not do additional sorting, but instead relies on the ordering guarantees that Kafka gives within partitions of a topic. Don't use this functionality if you've changed your producer's partition hashing behaviour to a nondeterministic way.

func WithBatchSize added in v0.74.0

func WithBatchSize(size uint) OptionFunc

WithBatchSize sets the message batch size the component should process at once.

func WithBatchTimeout added in v0.74.0

func WithBatchTimeout(timeout time.Duration) OptionFunc

WithBatchTimeout sets the message batch timeout. If the desired batch size is not reached and if the timeout elapses without new messages coming in, the messages in the buffer would get processed as a batch.

func WithCheckTopic added in v0.74.0

func WithCheckTopic() OptionFunc

WithCheckTopic checks whether the component-configured topics exist in the broker.

func WithCommitSync added in v0.74.0

func WithCommitSync() OptionFunc

WithCommitSync instructs the consumer to commit offsets in a blocking operation after processing every batch of messages.

func WithFailureStrategy added in v0.74.0

func WithFailureStrategy(fs FailStrategy) OptionFunc

WithFailureStrategy sets the strategy to follow for the component when it encounters an error. The ExitStrategy will fail the component, if there are retries > 0 then the component will reconnect and retry the failed message. The SkipStrategy will skip the message on failure. If a client wants to retry a message before failing then this needs to be handled in the BatchProcessorFunc.

func WithNewSessionCallback added in v0.74.0

func WithNewSessionCallback(sessionCallback func(sarama.ConsumerGroupSession) error) OptionFunc

WithNewSessionCallback adds a callback when a new consumer group session is created (e.g., rebalancing).

func WithRetries added in v0.74.0

func WithRetries(count uint32) OptionFunc

WithRetries sets the number of time a component should retry in case of an error. These retries are depleted in these cases: * when there are temporary connection issues * a message batch fails to be processed through the user-defined processing function and the failure strategy is set to kafka.ExitStrategy * any other reason for which the component needs to reconnect.

func WithRetryWait added in v0.74.0

func WithRetryWait(interval time.Duration) OptionFunc

WithRetryWait sets the wait period for the component retry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL