Documentation
¶
Overview ¶
Package kafka provides some shared interfaces for the Kafka components.
Index ¶
- func DefaultConsumerSaramaConfig(name string, readCommitted bool) (*sarama.Config, error)
- type Batch
- type BatchProcessorFunc
- type Component
- type FailStrategy
- type Message
- type OptionFunc
- func WithBatchMessageDeduplication() OptionFunc
- func WithBatchSize(size uint) OptionFunc
- func WithBatchTimeout(timeout time.Duration) OptionFunc
- func WithCheckTopic() OptionFunc
- func WithCommitSync() OptionFunc
- func WithFailureStrategy(fs FailStrategy) OptionFunc
- func WithNewSessionCallback(sessionCallback func(sarama.ConsumerGroupSession) error) OptionFunc
- func WithRetries(count uint32) OptionFunc
- func WithRetryWait(interval time.Duration) OptionFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Batch ¶
type Batch interface { // Messages of the batch. Messages() []Message }
Batch interface for multiple AWS SQS messages.
type BatchProcessorFunc ¶
BatchProcessorFunc definition of a batch async processor function.
type Component ¶ added in v0.74.0
type Component struct {
// contains filtered or unexported fields
}
Component is a kafka consumer implementation that processes messages in batch.
func New ¶ added in v0.74.0
func New(name, group string, brokers, topics []string, proc BatchProcessorFunc, saramaCfg *sarama.Config, oo ...OptionFunc) (*Component, error)
New initializes a new kafka consumer component with support for functional configuration. The default failure strategy is the ExitStrategy. The default batch size is 1 and the batch timeout is 100ms. The default number of retries is 0 and the retry wait is 0.
type FailStrategy ¶
type FailStrategy int
FailStrategy type definition.
const ( // ExitStrategy does not commit failed message offsets and exits the application. ExitStrategy FailStrategy = iota // SkipStrategy commits the offset of messages that failed processing, and continues processing. SkipStrategy )
type Message ¶
type Message interface { // Context will contain the context to be used for processing. // Each context will have a logger setup which can be used to create a logger from context. Context() context.Context // Message will contain the raw Kafka message. Message() *sarama.ConsumerMessage // Span contains the tracing span of this message. Span() trace.Span }
Message interface for wrapping messages that are handled by the kafka component.
func NewMessage ¶
NewMessage initializes a new message which is an implementation of the kafka Message interface.
type OptionFunc ¶ added in v0.74.0
OptionFunc definition for configuring the component in a functional way.
func WithBatchMessageDeduplication ¶ added in v0.74.0
func WithBatchMessageDeduplication() OptionFunc
WithBatchMessageDeduplication enables the deduplication of messages based on the message's key. This implementation does not do additional sorting, but instead relies on the ordering guarantees that Kafka gives within partitions of a topic. Don't use this functionality if you've changed your producer's partition hashing behaviour to a nondeterministic way.
func WithBatchSize ¶ added in v0.74.0
func WithBatchSize(size uint) OptionFunc
WithBatchSize sets the message batch size the component should process at once.
func WithBatchTimeout ¶ added in v0.74.0
func WithBatchTimeout(timeout time.Duration) OptionFunc
WithBatchTimeout sets the message batch timeout. If the desired batch size is not reached and if the timeout elapses without new messages coming in, the messages in the buffer would get processed as a batch.
func WithCheckTopic ¶ added in v0.74.0
func WithCheckTopic() OptionFunc
WithCheckTopic checks whether the component-configured topics exist in the broker.
func WithCommitSync ¶ added in v0.74.0
func WithCommitSync() OptionFunc
WithCommitSync instructs the consumer to commit offsets in a blocking operation after processing every batch of messages.
func WithFailureStrategy ¶ added in v0.74.0
func WithFailureStrategy(fs FailStrategy) OptionFunc
WithFailureStrategy sets the strategy to follow for the component when it encounters an error. The ExitStrategy will fail the component, if there are retries > 0 then the component will reconnect and retry the failed message. The SkipStrategy will skip the message on failure. If a client wants to retry a message before failing then this needs to be handled in the BatchProcessorFunc.
func WithNewSessionCallback ¶ added in v0.74.0
func WithNewSessionCallback(sessionCallback func(sarama.ConsumerGroupSession) error) OptionFunc
WithNewSessionCallback adds a callback when a new consumer group session is created (e.g., rebalancing).
func WithRetries ¶ added in v0.74.0
func WithRetries(count uint32) OptionFunc
WithRetries sets the number of time a component should retry in case of an error. These retries are depleted in these cases: * when there are temporary connection issues * a message batch fails to be processed through the user-defined processing function and the failure strategy is set to kafka.ExitStrategy * any other reason for which the component needs to reconnect.
func WithRetryWait ¶ added in v0.74.0
func WithRetryWait(interval time.Duration) OptionFunc
WithRetryWait sets the wait period for the component retry.