kafka

package module
v1.22.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2026 License: BSD-2-Clause Imports: 29 Imported by: 5

README

Kafka Library

CI Go Report Card Go Reference License

A production-ready Kafka abstraction library for Go, built on top of IBM's Sarama client. Provides a clean interface for Kafka operations while adding essential features like metrics, batch processing, transaction support, and comprehensive message handling patterns.

Features

  • 🚀 Production Ready: Built for high-throughput, low-latency applications
  • 🔧 Interface-Driven Architecture: Composition-friendly with extensive interface support
  • 📊 Built-in Metrics: Prometheus integration throughout all components
  • 🔄 Batch Processing: Efficient batch message handling with configurable parameters
  • 🔒 Transaction Support: Atomic message processing with transaction decorators
  • 🎯 Flexible Patterns: Function types, decorators, and composition patterns
  • 🛡️ TLS Support: Built-in TLS configuration for secure connections
  • Offset Management: Multiple offset tracking strategies and fallback behavior

Table of Contents


Installation

go get github.com/bborbe/kafka

Quick Start

Basic Producer
package main

import (
    "context"
    "log"

    "github.com/bborbe/kafka"
    "github.com/IBM/sarama"
)

func main() {
    ctx := context.Background()
    brokers := kafka.ParseBrokers("localhost:9092")

    // Create producer with optional Sarama config
    producer, err := kafka.NewSyncProducer(ctx, brokers)
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    // Send message
    msg := &sarama.ProducerMessage{
        Topic: "my-topic",
        Value: sarama.StringEncoder("Hello Kafka!"),
    }

    partition, offset, err := producer.SendMessage(ctx, msg)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Message sent to partition %d at offset %d", partition, offset)
}
JSON Message Sender
package main

import (
    "context"
    "log"

    "github.com/bborbe/kafka"
    "github.com/bborbe/log"
)

func main() {
    ctx := context.Background()
    brokers := kafka.ParseBrokers("localhost:9092")

    // Create producer
    producer, err := kafka.NewSyncProducer(ctx, brokers)
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    // Create JSON sender with producer and log sampler factory
    logSamplerFactory := log.NewLogSamplerFactory()
    sender := kafka.NewJsonSender(producer, logSamplerFactory)

    // Define a value type for your data
    type EventValue struct {
        UserID    int    `json:"user_id"`
        Action    string `json:"action"`
        Timestamp string `json:"timestamp"`
    }

    // Create and send update
    value := EventValue{
        UserID:    123,
        Action:    "login",
        Timestamp: "2023-01-01T00:00:00Z",
    }

    key := kafka.NewKey("user-123")
    err = sender.SendUpdate(ctx, kafka.Topic("events"), key, value)
    if err != nil {
        log.Fatal(err)
    }
}
Basic Consumer
package main

import (
    "context"
    "log"

    "github.com/bborbe/kafka"
    "github.com/bborbe/log"
    "github.com/IBM/sarama"
)

func main() {
    ctx := context.Background()
    brokers := kafka.ParseBrokers("localhost:9092")

    // Create Sarama client
    config := sarama.NewConfig()
    config.Version = sarama.V2_6_0_0
    saramaClient, err := sarama.NewClient(brokers.Hosts(), config)
    if err != nil {
        log.Fatal(err)
    }
    defer saramaClient.Close()

    // Create message handler
    handler := kafka.MessageHandlerFunc(func(ctx context.Context, msg *sarama.ConsumerMessage) error {
        log.Printf("Received message: %s", string(msg.Value))
        return nil
    })

    // Create consumer with log sampler factory
    logSamplerFactory := log.NewLogSamplerFactory()
    consumer := kafka.NewSimpleConsumer(
        saramaClient,
        kafka.Topic("my-topic"),
        kafka.OffsetNewest,
        handler,
        logSamplerFactory,
    )

    // Start consuming
    if err := consumer.Consume(ctx); err != nil {
        log.Fatal(err)
    }
}

Advanced Usage

Batch Processing
// Create batch handler from your existing handler
batchHandler := kafka.NewMessageHandlerBatch(
    handler,
    kafka.ParseBatchSize(100),                    // Process 100 messages at once
    kafka.NewMessageHandlerBatchDelay(time.Second), // Or wait 1 second
)

// Use with NewSimpleConsumerBatch for batch processing
consumer := kafka.NewSimpleConsumerBatch(
    saramaClient,
    kafka.Topic("my-topic"),
    kafka.OffsetNewest,
    batchHandler,
    100, // batch size
    logSamplerFactory,
)
Metrics Integration
// Wrap producer with metrics
producer = kafka.NewSyncProducerMetrics(producer, metrics)

// Wrap message handler with metrics  
handler = kafka.NewMessageHandlerMetrics(handler, metrics)
Transaction Support
// Create transactional message handler
txHandler := kafka.NewMessageHandlerTx(ctx, handler, db)

consumer, err := kafka.NewSimpleConsumer(ctx, brokers, "my-topic", txHandler)
Performance Tuning

For high-throughput scenarios, customize the Sarama configuration when creating the client provider:

package main

import (
    "context"
    "log"
    "time"

    "github.com/bborbe/kafka"
    "github.com/bborbe/errors"
    "github.com/IBM/sarama"
)

func main() {
    ctx := context.Background()
    brokers := kafka.ParseBrokers("localhost:9092")

    // Create high-performance client provider with tuned configuration
    saramaClientProvider, err := kafka.NewSaramaClientProviderByType(
        ctx,
        kafka.SaramaClientProviderTypeReused,
        brokers,
        func(config *sarama.Config) {
            config.Consumer.MaxWaitTime = 1000 * time.Millisecond  // Increase from 500ms for larger batches
            config.Consumer.Fetch.Default = 10 * 1024 * 1024       // 10MB (from 1MB default)
            config.Consumer.Fetch.Max = 50 * 1024 * 1024           // 50MB max fetch size
            config.ChannelBufferSize = 1000                         // Buffer 1000 messages (from 256 default)
        },
    )
    if err != nil {
        log.Fatal(errors.Wrapf(ctx, err, "create sarama client provider failed"))
    }
    defer saramaClientProvider.Close()

    // Use the provider with your consumer...
}

Configuration Parameters:

  • Consumer.MaxWaitTime: Maximum time broker waits before responding (higher = larger batches but higher latency; default: 500ms)
  • Consumer.Fetch.Default: Target data to fetch per request (higher = fewer requests, better throughput; default: 1MB)
  • Consumer.Fetch.Max: Maximum data broker can return (protects against excessive memory usage; default: varies)
  • ChannelBufferSize: Internal channel buffer size (higher = better throughput under load, more memory; default: 256)

Trade-offs: This configuration optimizes for throughput over latency. Larger fetch sizes, longer wait times, and bigger buffers increase memory usage and per-message latency but significantly improve overall throughput. Use for high-volume scenarios where batch processing efficiency matters more than individual message latency.

Architecture

Core Interfaces
  • Consumer - Basic message consumption interface
  • SyncProducer - Synchronous message production interface
  • MessageHandler - Message processing logic interface
  • OffsetManager - Offset tracking strategies interface
Design Patterns
  • Interface-Driven: All major components are interface-based for easy composition and testing
  • Decorator Pattern: Wrap components with additional functionality (metrics, transactions, filtering)
  • Function Types: Functional programming support via ConsumerFunc, MessageHandlerFunc

Testing

This library uses Ginkgo and Gomega for testing.

# Run all tests
make test

# Run tests with verbose output
ginkgo -v

# Run specific test
go test -run TestSpecific

Development

# Install dependencies and run all checks
make precommit

# Format code
make format

# Generate mocks
make generate

# Run linting and security checks
make check

Dependencies

License

This project is licensed under the BSD 2-Clause License - see the LICENSE file for details.

Documentation

Overview

Package kafka provides a production-ready Kafka abstraction library built on top of IBM's Sarama client.

This package offers a clean interface for Kafka operations while adding essential features like metrics integration, batch processing, transaction support, and comprehensive message handling patterns.

The library follows interface-driven architecture principles, making it composition-friendly with extensive interface support for testing and modularity.

Key Features:

  • Interface-driven design with Consumer, SyncProducer, MessageHandler interfaces
  • Built-in Prometheus metrics integration
  • Batch processing with configurable parameters and delays
  • Transaction support with atomic message processing
  • TLS support for secure connections
  • Multiple offset management strategies
  • Decorator patterns for extending functionality
  • Function types for functional programming support

Basic usage example:

ctx := context.Background()
brokers := kafka.ParseBrokers("localhost:9092")

// Create producer
producer, err := kafka.NewSyncProducer(ctx, brokers)
if err != nil {
	log.Fatal(err)
}
defer producer.Close()

// Send message
msg := &sarama.ProducerMessage{
	Topic: "my-topic",
	Value: sarama.StringEncoder("Hello Kafka!"),
}

partition, offset, err := producer.SendMessage(ctx, msg)
if err != nil {
	log.Fatal(err)
}

Index

Constants

View Source
const DefaultOffsetStoreBucket = "offset-store"
View Source
const OffsetNewest = Offset(sarama.OffsetNewest)

OffsetNewest represents the most recent offset available for a partition.

View Source
const OffsetOldest = Offset(sarama.OffsetOldest)

OffsetOldest represents the oldest offset available for a partition.

View Source
const OutOfRangeErrorMessage = "The requested offset is outside the range of offsets maintained by the server for the given topic/partition"

OutOfRangeErrorMessage defines the error message returned when the requested offset is outside the range of offsets maintained by the server.

Variables

AvailableSaramaClientProviderTypes contains all valid SaramaClientProviderType values.

View Source
var ClosedError = ErrClosed

ClosedError is deprecated: Use ErrClosed instead.

View Source
var DefaultSaramaClientPoolOptions = SaramaClientPoolOptions{
	MaxPoolSize:        10,
	HealthCheckTimeout: 5 * time.Second,
}

DefaultSaramaClientPoolOptions returns default pool configuration.

View Source
var ErrClosed = stderrors.New("closed")

ErrClosed is returned when operations are attempted on a closed offset manager.

Functions

func ConsumerGroupOffsets

func ConsumerGroupOffsets(
	ctx context.Context,
	saramaClient sarama.Client,
	offsetManager OffsetManager,
	topic Topic,
) (map[Partition]Offset, error)

ConsumerGroupOffsets retrieves the next offset for each partition of the specified topic using the provided offset manager. It returns a map of partitions to their corresponding next offsets.

func CreatePartitionConsumer

func CreatePartitionConsumer(
	ctx context.Context,
	consumerFromClient sarama.Consumer,
	metricsConsumer MetricsPartitionConsumer,
	topic Topic,
	partition Partition,
	fallbackOffset Offset,
	nextOffset Offset,
) (sarama.PartitionConsumer, error)

CreatePartitionConsumer create partition consumer and use initial offset if out of range error

func CreateSaramaConfig

func CreateSaramaConfig(
	ctx context.Context,
	brokers Brokers,
	opts ...SaramaConfigOptions,
) (*sarama.Config, error)

CreateSaramaConfig creates a new Sarama configuration with default settings and applies optional modifications. It configures producers for high durability, consumers for oldest offset consumption, and enables TLS if brokers use TLS schema.

func GetRealOffset

func GetRealOffset(
	ctx context.Context,
	saramaClient sarama.Client,
	topic Topic,
	offsets map[Partition]Offset,
) (map[Partition]Offset, error)

GetRealOffset get offset for newest or oldest

func GzipDecoder added in v1.16.0

func GzipDecoder(
	ctx context.Context,
	compressedData []byte,
) ([]byte, error)

GzipDecoder decompresses gzip-compressed data. It takes compressed data as a byte slice, decompresses it using gzip, and returns the decompressed data as a byte slice. Returns an error if the data is nil, not valid gzip format, or decompression fails.

func IsBrokenPipeError

func IsBrokenPipeError(err error) bool

IsBrokenPipeError checks if the given error represents a broken pipe error. It returns true if the error message ends with "broken pipe".

func NewGzipEncoder added in v1.16.0

func NewGzipEncoder(
	ctx context.Context,
	value []byte,
) (sarama.ByteEncoder, error)

NewGzipEncoder compresses a byte slice using gzip compression with default compression level. It returns the compressed data as a sarama.ByteEncoder. Returns an error if compression fails.

func NewGzipEncoderWithLevel added in v1.16.0

func NewGzipEncoderWithLevel(
	ctx context.Context,
	value []byte,
	level int,
) (sarama.ByteEncoder, error)

NewGzipEncoderWithLevel compresses a byte slice using gzip compression with a specified compression level. It returns the compressed data as a sarama.ByteEncoder. The level parameter should be one of: gzip.NoCompression, gzip.BestSpeed, gzip.BestCompression, or gzip.DefaultCompression. Returns an error if the compression level is invalid or compression fails.

func NewJSONEncoder added in v1.18.0

func NewJSONEncoder(ctx context.Context, value interface{}) (sarama.Encoder, error)

NewJSONEncoder creates a new JSON encoder that marshals the given value into a Sarama encoder.

func NewJsonEncoder deprecated

func NewJsonEncoder(ctx context.Context, value interface{}) (sarama.Encoder, error)

NewJsonEncoder creates a new JSON encoder that marshals the given value into a Sarama encoder.

Deprecated: Use NewJSONEncoder instead.

func NewOffsetManagerHandler

func NewOffsetManagerHandler(
	offsetManager OffsetManager,
	cancel context.CancelFunc,
) libhttp.WithError

NewOffsetManagerHandler creates an HTTP handler for managing Kafka consumer offsets.

func NewTLSConfig

func NewTLSConfig(
	ctx context.Context,
	clientCertFile, clientKeyFile, caCertFile string,
) (*tls.Config, error)

NewTLSConfig creates a TLS configuration for secure Kafka connections using client certificates and CA.

Types

type BatchSize

type BatchSize int

BatchSize represents the number of messages to process in a single batch operation.

func (BatchSize) Int

func (s BatchSize) Int() int

Int returns the batch size as an int value.

func (BatchSize) Int64

func (s BatchSize) Int64() int64

Int64 returns the batch size as an int64 value.

func (BatchSize) String

func (s BatchSize) String() string

func (BatchSize) Validate

func (s BatchSize) Validate(ctx context.Context) error

Validate ensures the batch size is greater than zero.

type Broker

type Broker string

Broker represents a Kafka broker address with schema and host information.

func ParseBroker

func ParseBroker(value string) Broker

ParseBroker parses a string value into a Broker, adding a default plain schema if none is specified.

func (Broker) Host

func (b Broker) Host() string

Host extracts and returns the host portion of the broker address.

func (Broker) MarshalText added in v1.17.2

func (b Broker) MarshalText() ([]byte, error)

MarshalText implements encoding.TextMarshaler for Broker.

func (Broker) Schema

func (b Broker) Schema() BrokerSchema

Schema extracts and returns the schema portion of the broker address.

func (Broker) String

func (b Broker) String() string

String returns the string representation of the Broker.

func (*Broker) UnmarshalText added in v1.17.0

func (b *Broker) UnmarshalText(text []byte) error

UnmarshalText implements encoding.TextUnmarshaler for Broker.

type BrokerSchema

type BrokerSchema string

BrokerSchema represents the connection schema type for a Kafka broker (plain or tls).

const (
	// PlainSchema represents a plain text connection to Kafka broker.
	PlainSchema BrokerSchema = "plain"
	// TLSSchema represents a TLS-encrypted connection to Kafka broker.
	TLSSchema BrokerSchema = "tls"
)

func (BrokerSchema) String

func (s BrokerSchema) String() string

String returns the string representation of the BrokerSchema.

type BrokerSchemas

type BrokerSchemas []BrokerSchema

BrokerSchemas represents a slice of BrokerSchema values.

func (BrokerSchemas) Contains

func (s BrokerSchemas) Contains(schema BrokerSchema) bool

Contains checks if the given schema exists in the BrokerSchemas slice.

type Brokers

type Brokers []Broker

Brokers represents a collection of Kafka broker addresses.

func ParseBrokers

func ParseBrokers(values []string) Brokers

ParseBrokers converts a slice of string broker addresses into a Brokers slice.

func ParseBrokersFromString

func ParseBrokersFromString(value string) Brokers

ParseBrokersFromString parses a comma-separated string of broker addresses into a Brokers slice.

func (Brokers) Hosts

func (b Brokers) Hosts() []string

Hosts returns a slice of all broker host addresses from the Brokers collection.

func (Brokers) MarshalText added in v1.17.2

func (b Brokers) MarshalText() ([]byte, error)

MarshalText implements encoding.TextMarshaler for Brokers.

func (Brokers) Schemas

func (b Brokers) Schemas() BrokerSchemas

Schemas returns a slice of all broker schemas from the Brokers collection.

func (Brokers) String

func (b Brokers) String() string

String returns a comma-separated string representation of all brokers.

func (Brokers) Strings

func (b Brokers) Strings() []string

Strings returns a slice of string representations of all brokers.

func (*Brokers) UnmarshalText added in v1.17.0

func (b *Brokers) UnmarshalText(text []byte) error

UnmarshalText implements encoding.TextUnmarshaler for Brokers.

type Consumer

type Consumer interface {
	// Consume starts consuming messages and blocks until the context is cancelled or an error occurs.
	Consume(ctx context.Context) error
}

Consumer defines the interface for consuming messages from Kafka.

func NewOffsetConsumer

func NewOffsetConsumer(
	saramaClient sarama.Client,
	topic Topic,
	offsetManager OffsetManager,
	messageHandler MessageHandler,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewOffsetConsumer creates a new offset-based consumer that processes messages one at a time.

func NewOffsetConsumerBatch

func NewOffsetConsumerBatch(
	saramaClient sarama.Client,
	topic Topic,
	offsetManager OffsetManager,
	messageHandlerBatch MessageHandlerBatch,
	batchSize BatchSize,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewOffsetConsumerBatch creates a new offset-based consumer that processes messages in batches.

func NewOffsetConsumerBatchWithProvider added in v1.19.0

func NewOffsetConsumerBatchWithProvider(
	saramaClientProvider SaramaClientProvider,
	topic Topic,
	offsetManager OffsetManager,
	messageHandlerBatch MessageHandlerBatch,
	batchSize BatchSize,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewOffsetConsumerBatchWithProvider creates a new offset-based consumer that processes messages in batches.

func NewOffsetConsumerHighwaterMarks

func NewOffsetConsumerHighwaterMarks(
	saramaClient sarama.Client,
	topic Topic,
	offsetManager OffsetManager,
	messageHandler MessageHandler,
	trigger run.Fire,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewOffsetConsumerHighwaterMarks creates a consumer that processes messages up to high watermarks.

func NewOffsetConsumerHighwaterMarksBatch

func NewOffsetConsumerHighwaterMarksBatch(
	saramaClient sarama.Client,
	topic Topic,
	offsetManager OffsetManager,
	messageHandlerBatch MessageHandlerBatch,
	batchSize BatchSize,
	trigger run.Fire,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewOffsetConsumerHighwaterMarksBatch creates a batch consumer that processes messages up to high watermarks.

func NewOffsetConsumerHighwaterMarksBatchWithProvider added in v1.19.0

func NewOffsetConsumerHighwaterMarksBatchWithProvider(
	saramaClientProvider SaramaClientProvider,
	topic Topic,
	offsetManager OffsetManager,
	messageHandlerBatch MessageHandlerBatch,
	batchSize BatchSize,
	trigger run.Fire,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewOffsetConsumerHighwaterMarksBatchWithProvider creates a batch consumer that processes messages up to high watermarks.

func NewOffsetConsumerHighwaterMarksWithProvider added in v1.19.0

func NewOffsetConsumerHighwaterMarksWithProvider(
	saramaClientProvider SaramaClientProvider,
	topic Topic,
	offsetManager OffsetManager,
	messageHandler MessageHandler,
	trigger run.Fire,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewOffsetConsumerHighwaterMarksWithProvider creates a consumer that processes messages up to high watermarks.

func NewOffsetConsumerWithProvider added in v1.19.0

func NewOffsetConsumerWithProvider(
	saramaClientProvider SaramaClientProvider,
	topic Topic,
	offsetManager OffsetManager,
	messageHandler MessageHandler,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewOffsetConsumerWithProvider creates a new offset-based consumer that processes messages one at a time.

func NewSimpleConsumer

func NewSimpleConsumer(
	saramaClient SaramaClient,
	topic Topic,
	initalOffset Offset,
	messageHandler MessageHandler,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewSimpleConsumer creates a new simple consumer that processes messages individually. It wraps the provided MessageHandler in a batch handler with batch size 1.

func NewSimpleConsumerBatch

func NewSimpleConsumerBatch(
	saramaClient SaramaClient,
	topic Topic,
	initalOffset Offset,
	messageHandler MessageHandlerBatch,
	batchSize BatchSize,
	logSamplerFactory log.SamplerFactory,
	options ...func(*ConsumerOptions),
) Consumer

NewSimpleConsumerBatch creates a new simple consumer that processes messages in batches. It uses a simple offset manager with the provided initial offset for both initial and fallback scenarios.

type ConsumerErrorHandler

type ConsumerErrorHandler interface {
	HandleError(err *sarama.ConsumerError) error
}

ConsumerErrorHandler defines an interface for handling Kafka consumer errors.

func NewConsumerErrorHandler

func NewConsumerErrorHandler(
	metricsConsumer MetricsConsumer,
) ConsumerErrorHandler

NewConsumerErrorHandler creates a new ConsumerErrorHandler that logs errors and updates metrics.

type ConsumerErrorHandlerFunc

type ConsumerErrorHandlerFunc func(err *sarama.ConsumerError) error

ConsumerErrorHandlerFunc is a function type that implements ConsumerErrorHandler.

func (ConsumerErrorHandlerFunc) HandleError

func (c ConsumerErrorHandlerFunc) HandleError(err *sarama.ConsumerError) error

HandleError implements the ConsumerErrorHandler interface for ConsumerErrorHandlerFunc.

type ConsumerFunc

type ConsumerFunc func(ctx context.Context) error

ConsumerFunc is a function type that implements the Consumer interface.

func (ConsumerFunc) Consume

func (c ConsumerFunc) Consume(ctx context.Context) error

Consume implements the Consumer interface for ConsumerFunc.

type ConsumerOptions

type ConsumerOptions struct {
	TargetLag int64
	Delay     libtime.Duration
}

ConsumerOptions configures optional parameters for offset consumers.

type Entries

type Entries []Entry

Entries represents a collection of Entry items for batch operations.

type Entry

type Entry struct {
	Topic   Topic                 `json:"topic"`
	Headers []sarama.RecordHeader `json:"headers"`
	Key     Key                   `json:"key"`
	Value   Value                 `json:"value"`
}

Entry represents a single Kafka message entry with topic, headers, key and value.

type Filter

type Filter[KEY ~[]byte | ~string, OBJECT any] interface {
	// Filtered return true if should be filter out
	Filtered(ctx context.Context, key KEY, object OBJECT) (bool, error)
}

Filter defines an interface for filtering objects based on key and object content.

type FilterFunc

type FilterFunc[KEY ~[]byte | ~string, OBJECT any] func(ctx context.Context, key KEY, object OBJECT) (bool, error)

FilterFunc is a function type that implements the Filter interface.

func (FilterFunc[KEY, OBJECT]) Filtered

func (f FilterFunc[KEY, OBJECT]) Filtered(
	ctx context.Context,
	key KEY,
	object OBJECT,
) (bool, error)

Filtered implements the Filter interface.

type FilterTx

type FilterTx[KEY ~[]byte | ~string, OBJECT any] interface {
	// Filtered return true if should be filter out
	Filtered(ctx context.Context, tx libkv.Tx, key KEY, object OBJECT) (bool, error)
}

FilterTx defines an interface for filtering objects within a transaction context.

type FilterTxFunc

type FilterTxFunc[KEY ~[]byte | ~string, OBJECT any] func(ctx context.Context, tx libkv.Tx, key KEY, object OBJECT) (bool, error)

FilterTxFunc is a function type that implements the FilterTx interface.

func (FilterTxFunc[KEY, OBJECT]) Filtered

func (f FilterTxFunc[KEY, OBJECT]) Filtered(
	ctx context.Context,
	tx libkv.Tx,
	key KEY,
	object OBJECT,
) (bool, error)

Filtered implements the FilterTx interface.

type Group

type Group string

Group represents a Kafka consumer group identifier.

func (Group) String

func (t Group) String() string
type Header map[string][]string

Header represents HTTP-style headers for Kafka messages with support for multiple values per key.

func ParseHeader

func ParseHeader(saramaHeaders []*sarama.RecordHeader) Header

ParseHeader converts Sarama record headers into a Header map.

func (Header) Add

func (h Header) Add(key string, value string)

Add appends a value to the header key, supporting multiple values per key.

func (Header) AsSaramaHeaders

func (h Header) AsSaramaHeaders() []sarama.RecordHeader

AsSaramaHeaders converts the Header into Sarama record headers format.

func (Header) Get

func (h Header) Get(key string) string

Get returns the first value for the given header key, or empty string if not found.

func (Header) Remove

func (h Header) Remove(key string)

Remove deletes all values for the given header key.

func (Header) Set

func (h Header) Set(key string, values []string)

Set replaces all values for the given header key.

type HighwaterMarkProvider

type HighwaterMarkProvider interface {
	HighWaterMark(ctx context.Context, topic Topic, partition Partition) (*Offset, error)
}

HighwaterMarkProvider provides methods to retrieve high water mark offsets from Kafka topics.

func NewHighwaterMarkProvider

func NewHighwaterMarkProvider(
	saramaClient sarama.Client,
) HighwaterMarkProvider

NewHighwaterMarkProvider creates a new HighwaterMarkProvider using the provided Sarama client.

type JSONSender

type JSONSender interface {
	SendUpdate(
		ctx context.Context,
		topic Topic,
		key Key,
		value Value,
		headers ...sarama.RecordHeader,
	) error
	SendUpdates(ctx context.Context, entries Entries) error
	SendDelete(ctx context.Context, topic Topic, key Key, headers ...sarama.RecordHeader) error
	SendDeletes(ctx context.Context, entries Entries) error
}

JSONSender provides methods for sending JSON-encoded messages to Kafka topics.

func NewJSONSender

func NewJSONSender(
	producer SyncProducer,
	logSamplerFactory log.SamplerFactory,
	optionsFns ...func(options *JSONSenderOptions),
) JSONSender

NewJSONSender creates a new JSONSender with the provided producer and options.

func NewJsonSender deprecated added in v1.14.1

func NewJsonSender(
	producer SyncProducer,
	logSamplerFactory log.SamplerFactory,
	optionsFns ...func(options *JSONSenderOptions),
) JSONSender

NewJsonSender creates a new JSONSender with the provided producer and options.

Deprecated: Use NewJSONSender instead.

type JSONSenderOptions

type JSONSenderOptions struct {
	ValidationDisabled bool
}

JSONSenderOptions defines configuration options for JSONSender behavior.

type JsonSender deprecated added in v1.14.1

type JsonSender = JSONSender

JsonSender provides methods for sending JSON-encoded messages to Kafka topics.

Deprecated: Use JSONSender instead.

type JsonSenderOptions deprecated added in v1.14.1

type JsonSenderOptions = JSONSenderOptions

JsonSenderOptions defines configuration options for JsonSender behavior.

Deprecated: Use JSONSenderOptions instead.

type Key

type Key interface {
	Bytes() []byte
	String() string
}

Key represents a Kafka message key that can be converted to bytes or string.

func NewKey

func NewKey[K ~[]byte | ~string](value K) Key

NewKey creates a new Key from a byte slice or string value.

func ParseKey

func ParseKey(ctx context.Context, value interface{}) (*Key, error)

ParseKey parses an interface value into a Key by converting it to a string.

type Keys

type Keys []Key

Keys represents a collection of Key items.

type MessageHanderList

type MessageHanderList []MessageHandler

MessageHanderList is a list of MessageHandler that executes handlers sequentially.

func (MessageHanderList) ConsumeMessage

func (m MessageHanderList) ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error

ConsumeMessage executes all handlers in the list sequentially for a single message.

type MessageHanderTxList

type MessageHanderTxList []MessageHandlerTx

MessageHanderTxList is a list of MessageHandlerTx that executes handlers sequentially within a transaction.

func (MessageHanderTxList) ConsumeMessage

func (m MessageHanderTxList) ConsumeMessage(
	ctx context.Context,
	tx libkv.Tx,
	msg *sarama.ConsumerMessage,
) error

ConsumeMessage executes all transaction handlers in the list sequentially for a single message.

type MessageHandler

type MessageHandler interface {
	// ConsumeMessage processes a single Kafka message and returns an error if processing fails.
	ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error
}

MessageHandler defines the interface for processing individual Kafka messages.

func NewMessageHandlerMetrics

func NewMessageHandlerMetrics(
	messageHandler MessageHandler,
	metrics MetricsMessageHandler,
) MessageHandler

NewMessageHandlerMetrics is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.

func NewMessageHandlerSkipErrors

func NewMessageHandlerSkipErrors(
	handler MessageHandler,
	logSamplerFactory log.SamplerFactory,
) MessageHandler

NewMessageHandlerSkipErrors creates a message handler that logs and skips errors.

func NewMessageHandlerUpdate

func NewMessageHandlerUpdate[KEY ~[]byte | ~string, OBJECT any](
	updateHandler UpdaterHandler[KEY, OBJECT],
) MessageHandler

NewMessageHandlerUpdate creates a generic message handler that processes JSON messages for CRUD operations.

func NewMessageTxUpdate

func NewMessageTxUpdate(db libkv.DB, messageHandlerTx MessageHandlerTx) MessageHandler

NewMessageTxUpdate creates a message handler that executes within a database update transaction.

func NewMessageTxView

func NewMessageTxView(db libkv.DB, messageHandlerTx MessageHandlerTx) MessageHandler

NewMessageTxView creates a read-only message handler that executes within a database view transaction.

func NewOffsetTriggerMessageHandler

func NewOffsetTriggerMessageHandler(
	triggerOffsets PartitionOffsets,
	topic Topic,
	trigger run.Fire,
) MessageHandler

NewOffsetTriggerMessageHandler returns message handler that call the given trigger if all offset are reached

type MessageHandlerBatch

type MessageHandlerBatch interface {
	ConsumeMessages(ctx context.Context, messages []*sarama.ConsumerMessage) error
}

MessageHandlerBatch defines an interface for processing batches of Kafka messages.

func NewMessageHandlerBatch

func NewMessageHandlerBatch(messageHandler MessageHandler) MessageHandlerBatch

NewMessageHandlerBatch creates a new batch message handler that processes messages individually using the provided MessageHandler.

func NewMessageHandlerBatchDelay

func NewMessageHandlerBatchDelay(
	messageHandlerBatch MessageHandlerBatch,
	waiterDuration libtime.WaiterDuration,
	delay libtime.Duration,
) MessageHandlerBatch

NewMessageHandlerBatchDelay returns a MessageHandlerBatch that sleep for the given duration after each consume this enabled the consumer to get more messages per consume

func NewMessageHandlerBatchMetrics

func NewMessageHandlerBatchMetrics(
	messageHandler MessageHandlerBatch,
	metrics MetricsMessageHandler,
) MessageHandlerBatch

NewMessageHandlerBatchMetrics is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.

func NewMessageHandlerBatchSkipErrors

func NewMessageHandlerBatchSkipErrors(
	handler MessageHandlerBatch,
	logSamplerFactory log.SamplerFactory,
) MessageHandlerBatch

NewMessageHandlerBatchSkipErrors creates a message handler that logs and skips errors.

func NewMessageHandlerBatchTxUpdate

func NewMessageHandlerBatchTxUpdate(
	db libkv.DB,
	messageHandler MessageHandlerBatchTx,
) MessageHandlerBatch

NewMessageHandlerBatchTxUpdate creates a batch handler that executes within a database update transaction.

func NewMessageHandlerBatchTxView

func NewMessageHandlerBatchTxView(
	db libkv.DB,
	messageHandler MessageHandlerBatchTx,
) MessageHandlerBatch

NewMessageHandlerBatchTxView creates a read-only batch handler that executes within a database view transaction.

type MessageHandlerBatchFunc

type MessageHandlerBatchFunc func(ctx context.Context, messages []*sarama.ConsumerMessage) error

MessageHandlerBatchFunc is a function type that implements MessageHandlerBatch interface.

func (MessageHandlerBatchFunc) ConsumeMessages

func (b MessageHandlerBatchFunc) ConsumeMessages(
	ctx context.Context,
	messages []*sarama.ConsumerMessage,
) error

ConsumeMessages implements the MessageHandlerBatch interface.

type MessageHandlerBatchList

type MessageHandlerBatchList []MessageHandlerBatch

MessageHandlerBatchList is a list of MessageHandlerBatch that executes handlers sequentially.

func (MessageHandlerBatchList) ConsumeMessages

func (m MessageHandlerBatchList) ConsumeMessages(
	ctx context.Context,
	messages []*sarama.ConsumerMessage,
) error

ConsumeMessages executes all handlers in the list sequentially.

type MessageHandlerBatchTx

type MessageHandlerBatchTx interface {
	ConsumeMessages(ctx context.Context, tx libkv.Tx, messages []*sarama.ConsumerMessage) error
}

MessageHandlerBatchTx defines the interface for handling batch messages within a transaction context.

func NewMessageHandlerBatchTx

func NewMessageHandlerBatchTx(messageHandler MessageHandlerTx) MessageHandlerBatchTx

NewMessageHandlerBatchTx creates a batch transaction handler from a single message transaction handler.

func NewMessageHandlerBatchTxMetrics

func NewMessageHandlerBatchTxMetrics(
	messageHandler MessageHandlerBatchTx,
	metrics MetricsMessageHandler,
) MessageHandlerBatchTx

NewMessageHandlerBatchTxMetrics is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.

func NewMessageHandlerBatchTxSkipErrors

func NewMessageHandlerBatchTxSkipErrors(
	handler MessageHandlerBatchTx,
	logSamplerFactory log.SamplerFactory,
) MessageHandlerBatchTx

NewMessageHandlerBatchTxSkipErrors creates a transaction batch message handler that logs and skips errors.

type MessageHandlerBatchTxFunc

type MessageHandlerBatchTxFunc func(ctx context.Context, tx libkv.Tx, messages []*sarama.ConsumerMessage) error

MessageHandlerBatchTxFunc is a function type that implements MessageHandlerBatchTx interface.

func (MessageHandlerBatchTxFunc) ConsumeMessages

func (b MessageHandlerBatchTxFunc) ConsumeMessages(
	ctx context.Context,
	tx libkv.Tx,
	messages []*sarama.ConsumerMessage,
) error

ConsumeMessages implements the MessageHandlerBatchTx interface.

type MessageHandlerBatchTxList

type MessageHandlerBatchTxList []MessageHandlerBatchTx

MessageHandlerBatchTxList is a list of MessageHandlerBatchTx that executes handlers sequentially within a transaction.

func (MessageHandlerBatchTxList) ConsumeMessages

func (m MessageHandlerBatchTxList) ConsumeMessages(
	ctx context.Context,
	tx libkv.Tx,
	messages []*sarama.ConsumerMessage,
) error

ConsumeMessages executes all transaction handlers in the list sequentially.

type MessageHandlerFunc

type MessageHandlerFunc func(ctx context.Context, msg *sarama.ConsumerMessage) error

MessageHandlerFunc allow use a function as MessageHandler.

func (MessageHandlerFunc) ConsumeMessage

func (m MessageHandlerFunc) ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error

ConsumeMessage forward to the function.

type MessageHandlerTx

type MessageHandlerTx interface {
	ConsumeMessage(ctx context.Context, tx libkv.Tx, msg *sarama.ConsumerMessage) error
}

MessageHandlerTx defines the interface for handling messages within a transaction context.

func NewMessageHandlerTxMetrics

func NewMessageHandlerTxMetrics(
	messageHandler MessageHandlerTx,
	metrics MetricsMessageHandler,
) MessageHandlerTx

NewMessageHandlerTxMetrics is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.

func NewMessageHandlerTxSkipErrors

func NewMessageHandlerTxSkipErrors(
	handler MessageHandlerTx,
	logSamplerFactory log.SamplerFactory,
) MessageHandlerTx

NewMessageHandlerTxSkipErrors creates a transaction message handler that logs and skips errors.

func NewMessageHandlerTxUpdate

func NewMessageHandlerTxUpdate[KEY ~[]byte | ~string, OBJECT any](
	updateHandlerTx UpdaterHandlerTx[KEY, OBJECT],
) MessageHandlerTx

NewMessageHandlerTxUpdate creates a generic transaction message handler that processes JSON messages for CRUD operations.

type MessageHandlerTxFunc

type MessageHandlerTxFunc func(ctx context.Context, tx libkv.Tx, msg *sarama.ConsumerMessage) error

MessageHandlerTxFunc allow use a function as MessageHandler.

func (MessageHandlerTxFunc) ConsumeMessage

func (m MessageHandlerTxFunc) ConsumeMessage(
	ctx context.Context,
	tx libkv.Tx,
	msg *sarama.ConsumerMessage,
) error

ConsumeMessage forward to the function.

type Metadata

type Metadata string

Metadata represents a string-based metadata identifier used in Kafka operations.

const DefaultMetadata Metadata = "offsetmanager"

DefaultMetadata defines the default metadata value used for offset manager operations.

func (Metadata) String

func (m Metadata) String() string

type Metrics

Metrics provides a comprehensive interface for collecting Kafka-related metrics. It combines all metric collection interfaces for consumers, producers, and message handlers.

func NewMetrics

func NewMetrics() Metrics

NewMetrics creates a new Metrics implementation that collects Prometheus metrics for Kafka operations including consumer, producer, and message handler activities.

type MetricsConsumer

type MetricsConsumer interface {
	CurrentOffset(topic Topic, partition Partition, offset Offset)
	HighWaterMarkOffset(topic Topic, partition Partition, offset Offset)
	ErrorCounterInc(topic Topic, partition Partition)
}

MetricsConsumer provides metrics collection methods for Kafka consumer operations. It tracks offset positions, high watermarks, and consumer errors.

type MetricsMessageHandler

type MetricsMessageHandler interface {
	MessageHandlerTotalCounterInc(topic Topic, partition Partition)
	MessageHandlerSuccessCounterInc(topic Topic, partition Partition)
	MessageHandlerFailureCounterInc(topic Topic, partition Partition)
	MessageHandlerDurationMeasure(topic Topic, partition Partition, duration time.Duration)
}

MetricsMessageHandler provides metrics collection methods for message handler operations. It tracks total processed messages, success/failure rates, and processing durations.

type MetricsPartitionConsumer

type MetricsPartitionConsumer interface {
	ConsumePartitionCreateOutOfRangeErrorInitialize(topic Topic, partition Partition)
	ConsumePartitionCreateOutOfRangeErrorInc(topic Topic, partition Partition)
	ConsumePartitionCreateFailureInc(topic Topic, partition Partition)
	ConsumePartitionCreateSuccessInc(topic Topic, partition Partition)
	ConsumePartitionCreateTotalInc(topic Topic, partition Partition)
}

MetricsPartitionConsumer provides metrics collection methods for partition consumer creation operations. It tracks the success, failure, and out-of-range error rates when creating partition consumers.

type MetricsSyncProducer

type MetricsSyncProducer interface {
	SyncProducerTotalCounterInc(topic Topic)
	SyncProducerFailureCounterInc(topic Topic)
	SyncProducerSuccessCounterInc(topic Topic)
	SyncProducerDurationMeasure(topic Topic, duration time.Duration)
}

MetricsSyncProducer provides metrics collection methods for synchronous Kafka producer operations. It tracks total messages sent, success/failure rates, and send durations.

type Offset

type Offset int64

Offset in the Kafka topic.

func HighWaterMark

func HighWaterMark(
	ctx context.Context,
	saramaClient sarama.Client,
	topic Topic,
	partition Partition,
) (*Offset, error)

HighWaterMark returns the high water mark offset for the specified topic and partition.

func OffsetFromBytes

func OffsetFromBytes(content []byte) Offset

OffsetFromBytes returns the offset for the given bytes.

func ParseOffset

func ParseOffset(ctx context.Context, value string) (*Offset, error)

ParseOffset from a string

func (Offset) Bytes

func (o Offset) Bytes() []byte

Bytes representation for the offset.

func (Offset) Int64

func (o Offset) Int64() int64

Int64 value for the offset.

func (Offset) Ptr

func (o Offset) Ptr() *Offset

Ptr returns a pointer to the offset value.

func (Offset) String

func (o Offset) String() string

type OffsetManager

type OffsetManager interface {
	// InitialOffset returns the offset to use when no previous offset exists for a partition.
	InitialOffset() Offset
	// FallbackOffset returns the offset to use when the stored offset is invalid or unavailable.
	FallbackOffset() Offset
	// NextOffset retrieves the next offset to consume for the given topic and partition.
	NextOffset(ctx context.Context, topic Topic, partition Partition) (Offset, error)
	// MarkOffset marks the provided offset as processed. This only allows forward movement (incrementing).
	// To follow upstream conventions, you should mark the offset of the next message to read, not the last message read.
	MarkOffset(ctx context.Context, topic Topic, partition Partition, nextOffset Offset) error
	// ResetOffset resets to the provided offset, allowing backward movement to earlier or smaller values.
	// This acts as a counterpart to MarkOffset and should be called before MarkOffset when setting offsets backwards.
	ResetOffset(ctx context.Context, topic Topic, partition Partition, nextOffset Offset) error
	io.Closer
}

OffsetManager manages Kafka consumer offsets for topics and partitions. It provides methods to retrieve initial and fallback offsets, track the next offset to consume, and mark offsets as processed.

func NewSaramaOffsetManager

func NewSaramaOffsetManager(
	saramaClient SaramaClient,
	group Group,
	initalOffset Offset,
	fallbackOffset Offset,
) OffsetManager

NewSaramaOffsetManager creates a new offset manager using Sarama's built-in offset management.

func NewSimpleOffsetManager

func NewSimpleOffsetManager(
	initalOffset Offset,
	fallbackOffset Offset,
) OffsetManager

NewSimpleOffsetManager creates a new simple in-memory offset manager.

func NewStoreOffsetManager

func NewStoreOffsetManager(
	offsetStore OffsetStore,
	initalOffset Offset,
	fallbackOffset Offset,
) OffsetManager

NewStoreOffsetManager creates a new offset manager that persists offsets using a store.

type OffsetStore

type OffsetStore interface {
	Get(ctx context.Context, topic Topic, partition Partition) (Offset, error)
	Set(ctx context.Context, topic Topic, partition Partition, offset Offset) error
}

OffsetStore provides persistent storage for Kafka topic partition offsets.

func NewOffsetStore

func NewOffsetStore(
	db libkv.DB,
) OffsetStore

NewOffsetStore creates a new OffsetStore using the provided database.

func NewOffsetStoreGroup

func NewOffsetStoreGroup(
	db libkv.DB,
	group Group,
) OffsetStore

NewOffsetStoreGroup creates a new OffsetStore for a specific consumer group.

type Partition

type Partition int32

Partition in Kafka.

func ParsePartition

func ParsePartition(ctx context.Context, value string) (*Partition, error)

ParsePartition from a string

func PartitionFromBytes

func PartitionFromBytes(content []byte) Partition

PartitionFromBytes returns the partition for the given bytes.

func (Partition) Bytes

func (p Partition) Bytes() []byte

Bytes representation for the partion.

func (Partition) Int32

func (p Partition) Int32() int32

Int32 value of the partition.

func (Partition) String

func (p Partition) String() string

String returns the string representation of the partition.

type PartitionOffsetItem

type PartitionOffsetItem struct {
	Offset    Offset
	Partition Partition
}

PartitionOffsetItem represents a single partition and its offset.

type PartitionOffsetItems

type PartitionOffsetItems []PartitionOffsetItem

PartitionOffsetItems represents a slice of partition offset items.

func (PartitionOffsetItems) Offsets

Offsets converts the slice to a PartitionOffsets map.

type PartitionOffsets

type PartitionOffsets map[Partition]Offset

PartitionOffsets represents a mapping of partitions to their corresponding offsets.

func HighWaterMarks

func HighWaterMarks(
	ctx context.Context,
	saramaClient sarama.Client,
	topic Topic,
) (PartitionOffsets, error)

HighWaterMarks returns the high water marks for all partitions of the specified topic.

func ParsePartitionOffsetFromBytes

func ParsePartitionOffsetFromBytes(
	ctx context.Context,
	offsetBytes []byte,
) (PartitionOffsets, error)

ParsePartitionOffsetFromBytes parses partition offsets from JSON bytes.

func (PartitionOffsets) Bytes

func (o PartitionOffsets) Bytes() ([]byte, error)

Bytes serializes the partition offsets to JSON bytes.

func (PartitionOffsets) Clone

Clone creates a deep copy of the partition offsets.

func (PartitionOffsets) OffsetPartitions

func (o PartitionOffsets) OffsetPartitions() PartitionOffsetItems

OffsetPartitions converts the map to a slice of PartitionOffsetItem.

type Partitions

type Partitions []Partition

Partitions represents a collection of Kafka partitions.

func PartitionsFromInt32

func PartitionsFromInt32(partitions []int32) Partitions

PartitionsFromInt32 converts a slice of int32 values to Partitions.

type SaramaClient

type SaramaClient interface {
	sarama.Client
}

SaramaClient defines the interface for Sarama Kafka client operations.

func CreateSaramaClient

func CreateSaramaClient(
	ctx context.Context,
	brokers Brokers,
	opts ...SaramaConfigOptions,
) (SaramaClient, error)

CreateSaramaClient creates a new Sarama Kafka client with the specified configuration.

type SaramaClientPool added in v1.22.0

type SaramaClientPool interface {
	// Acquire returns a healthy client from pool or creates new one.
	// The client is health-checked before being returned.
	Acquire(ctx context.Context) (SaramaClient, error)

	// Release returns client to pool (discards if unhealthy).
	// The healthy parameter indicates whether the client is still healthy.
	Release(client SaramaClient, healthy bool)

	// Close closes all pooled connections.
	Close() error
}

SaramaClientPool manages a pool of Sarama clients with health checks. It reuses healthy clients and discards unhealthy ones automatically.

func NewSaramaClientPool added in v1.22.0

func NewSaramaClientPool(
	factory func(context.Context) (SaramaClient, error),
	opts SaramaClientPoolOptions,
) SaramaClientPool

NewSaramaClientPool creates a new client pool with the specified factory function. Use DefaultSaramaClientPoolOptions for default configuration.

type SaramaClientPoolOptions added in v1.22.0

type SaramaClientPoolOptions struct {
	// MaxPoolSize is the maximum number of clients to keep in the pool.
	// Default: 10
	MaxPoolSize int

	// HealthCheckTimeout is the timeout for health checks.
	// Default: 5s
	HealthCheckTimeout time.Duration
}

SaramaClientPoolOptions configures the client pool behavior.

type SaramaClientProvider added in v1.18.0

type SaramaClientProvider interface {
	// Client creates and returns a Sarama client.
	// The behavior depends on the implementation - it may return a new client or reuse an existing one.
	Client(ctx context.Context) (SaramaClient, error)
	// Close closes all Sarama clients that were created by this provider.
	// This method is safe to call multiple times and safe to defer immediately after creation.
	Close() error
}

SaramaClientProvider provides Sarama Kafka clients and manages their lifecycle. It creates clients on demand and tracks them for proper cleanup via Close().

func NewSaramaClientProviderByType added in v1.18.0

func NewSaramaClientProviderByType(
	ctx context.Context,
	providerType SaramaClientProviderType,
	brokers Brokers,
	opts ...SaramaConfigOptions,
) (SaramaClientProvider, error)

NewSaramaClientProviderByType creates a SaramaClientProvider based on the specified type.

func NewSaramaClientProviderExisting added in v1.19.0

func NewSaramaClientProviderExisting(
	saramaClient SaramaClient,
) SaramaClientProvider

NewSaramaClientProviderExisting creates a SaramaClientProvider that wraps an existing Sarama client. This adapter allows existing Sarama clients to be used with the provider pattern without breaking backward compatibility. The provider returns the same client instance on every call to Client() and delegates Close() to the wrapped client.

func NewSaramaClientProviderNew added in v1.18.0

func NewSaramaClientProviderNew(
	brokers Brokers,
	opts ...SaramaConfigOptions,
) SaramaClientProvider

NewSaramaClientProviderNew creates a SaramaClientProvider that creates a new client for each call. All created clients are tracked and closed when Close() is called. The provided options will be applied to all created clients.

func NewSaramaClientProviderPool added in v1.22.0

func NewSaramaClientProviderPool(
	brokers Brokers,
	poolOpts SaramaClientPoolOptions,
	opts ...SaramaConfigOptions,
) SaramaClientProvider

NewSaramaClientProviderPool creates a SaramaClientProvider that uses a connection pool. The pool manages client lifecycle with health checks and automatic reconnection. Pool configuration can be customized via poolOpts.

func NewSaramaClientProviderReused added in v1.18.0

func NewSaramaClientProviderReused(
	brokers Brokers,
	opts ...SaramaConfigOptions,
) SaramaClientProvider

NewSaramaClientProviderReused creates a SaramaClientProvider that reuses a single client for all calls. The client is created lazily on the first call to Client(). The provided options will be applied when creating the client.

type SaramaClientProviderType added in v1.18.0

type SaramaClientProviderType string

SaramaClientProviderType defines the type of Sarama client provider to use.

const (
	// SaramaClientProviderTypeReused creates a provider that reuses a single client for all calls.
	SaramaClientProviderTypeReused SaramaClientProviderType = "reused"
	// SaramaClientProviderTypeNew creates a provider that creates a new client for each call.
	SaramaClientProviderTypeNew SaramaClientProviderType = "new"
	// SaramaClientProviderTypePool creates a provider that uses a connection pool with health checks.
	SaramaClientProviderTypePool SaramaClientProviderType = "pool"
)

func ParseSaramaClientProviderType added in v1.18.0

func ParseSaramaClientProviderType(
	ctx context.Context,
	value interface{},
) (*SaramaClientProviderType, error)

ParseSaramaClientProviderType parses a value into a SaramaClientProviderType.

func (SaramaClientProviderType) Ptr added in v1.18.0

Ptr returns a pointer to the SaramaClientProviderType.

func (SaramaClientProviderType) String added in v1.18.0

func (s SaramaClientProviderType) String() string

String returns the string representation of the SaramaClientProviderType.

func (SaramaClientProviderType) Validate added in v1.18.0

Validate checks if the SaramaClientProviderType is valid.

type SaramaClientProviderTypes added in v1.18.0

type SaramaClientProviderTypes []SaramaClientProviderType

SaramaClientProviderTypes is a slice of SaramaClientProviderType values.

func (SaramaClientProviderTypes) Contains added in v1.18.0

func (s SaramaClientProviderTypes) Contains(providerType SaramaClientProviderType) bool

Contains checks if the given SaramaClientProviderType is in the slice.

type SaramaConfigOptions

type SaramaConfigOptions func(config *sarama.Config)

SaramaConfigOptions defines a function type for modifying Sarama configuration.

type SaramaConsumer

type SaramaConsumer interface {
	sarama.Consumer
}

SaramaConsumer provides an interface wrapper around sarama.Consumer for testing and abstraction purposes.

type SaramaPartitionConsumer

type SaramaPartitionConsumer interface {
	sarama.PartitionConsumer
}

SaramaPartitionConsumer represents a wrapper interface for sarama.PartitionConsumer.

type SaramaSyncProducer

type SaramaSyncProducer interface {
	sarama.SyncProducer
}

SaramaSyncProducer wraps the Sarama SyncProducer interface to enable dependency injection and testing.

type SyncProducer

type SyncProducer interface {
	// SendMessage sends a single message to Kafka and returns the partition and offset where it was stored.
	SendMessage(
		ctx context.Context,
		msg *sarama.ProducerMessage,
	) (partition int32, offset int64, err error)
	// SendMessages sends multiple messages to Kafka in a single batch operation.
	SendMessages(ctx context.Context, msgs []*sarama.ProducerMessage) error
	// Close closes the producer and releases its resources.
	Close() error
}

SyncProducer defines the interface for synchronously sending messages to Kafka.

func NewSyncProducer

func NewSyncProducer(
	ctx context.Context,
	brokers Brokers,
	opts ...SaramaConfigOptions,
) (SyncProducer, error)

NewSyncProducer creates a new synchronous Kafka producer with the given brokers and configuration options.

func NewSyncProducerFromSaramaClient added in v1.19.0

func NewSyncProducerFromSaramaClient(
	ctx context.Context,
	saramaClient SaramaClient,
) (SyncProducer, error)

NewSyncProducerFromSaramaClient creates a new SyncProducer from an existing Sarama client. This is useful when you already have a configured Sarama client and want to create a producer from it.

func NewSyncProducerFromSaramaClientProvider added in v1.19.0

func NewSyncProducerFromSaramaClientProvider(
	ctx context.Context,
	saramaClientProvider SaramaClientProvider,
) (SyncProducer, error)

NewSyncProducerFromSaramaClientProvider creates a new SyncProducer using a SaramaClientProvider. It obtains a client from the provider and creates a sync producer from it. This enables flexible client lifecycle management strategies (reused, new per call, etc.).

func NewSyncProducerFromSaramaSyncProducer

func NewSyncProducerFromSaramaSyncProducer(saramaSyncProducer sarama.SyncProducer) SyncProducer

NewSyncProducerFromSaramaSyncProducer creates a new SyncProducer wrapper around an existing Sarama SyncProducer.

func NewSyncProducerMetrics

func NewSyncProducerMetrics(
	syncProducer SyncProducer,
) SyncProducer

NewSyncProducerMetrics creates a sync producer decorator that records metrics for all operations.

func NewSyncProducerModify

func NewSyncProducerModify(
	syncProducer SyncProducer,
	fn func(ctx context.Context, message *sarama.ProducerMessage) error,
) SyncProducer

NewSyncProducerModify creates a sync producer that applies a modification function to messages before sending.

func NewSyncProducerNop

func NewSyncProducerNop() SyncProducer

NewSyncProducerNop creates a no-operation sync producer that logs but doesn't actually send messages.

func NewSyncProducerWithHeader

func NewSyncProducerWithHeader(
	ctx context.Context,
	brokers Brokers,
	headers Header,
	opts ...SaramaConfigOptions,
) (SyncProducer, error)

NewSyncProducerWithHeader creates a sync producer that adds specified headers to all messages.

func NewSyncProducerWithName

func NewSyncProducerWithName(
	ctx context.Context,
	brokers Brokers,
	name string,
	opts ...SaramaConfigOptions,
) (SyncProducer, error)

NewSyncProducerWithName creates a sync producer that adds a 'name' header to all messages.

type Topic

type Topic string

Topic represents a Kafka topic name.

func TopicFromStrings

func TopicFromStrings(values ...string) Topic

TopicFromStrings creates a valid Kafka topic name from multiple string values by joining and sanitizing them.

func (Topic) String

func (t Topic) String() string

func (Topic) Validate

func (t Topic) Validate(ctx context.Context) error

Validate checks if the topic name is valid according to Kafka naming conventions.

type TopicPartition

type TopicPartition struct {
	Topic     Topic
	Partition Partition
}

TopicPartition represents a specific partition within a Kafka topic.

func (TopicPartition) Bytes

func (p TopicPartition) Bytes() []byte

Bytes returns a byte representation of the TopicPartition in the format "topic-partition".

type Topics

type Topics []Topic

Topics represents a collection of Kafka topics.

func ParseTopics

func ParseTopics(values []string) Topics

ParseTopics converts a slice of strings into a Topics slice.

func ParseTopicsFromString

func ParseTopicsFromString(value string) Topics

ParseTopicsFromString parses a comma-separated string into a Topics slice.

func (Topics) Contains

func (t Topics) Contains(topic Topic) bool

Contains returns true if the Topics collection contains the specified topic.

func (Topics) Interfaces

func (t Topics) Interfaces() []interface{}

Interfaces converts the Topics slice to a slice of interface{} values.

func (Topics) Len

func (t Topics) Len() int

func (Topics) Less

func (t Topics) Less(i, j int) bool

func (Topics) Strings

func (t Topics) Strings() []string

Strings converts the Topics slice to a slice of string values.

func (Topics) Swap

func (t Topics) Swap(i, j int)

func (Topics) Unique

func (t Topics) Unique() Topics

Unique returns a new Topics slice containing only unique topics.

type UpdaterHandler

type UpdaterHandler[KEY ~[]byte | ~string, OBJECT any] interface {
	Update(ctx context.Context, key KEY, object OBJECT) error
	Delete(ctx context.Context, key KEY) error
}

UpdaterHandler defines a generic interface for handling update and delete operations on objects identified by keys of type KEY.

func NewUpdaterHandlerFilter

func NewUpdaterHandlerFilter[KEY ~[]byte | ~string, OBJECT any](
	filter Filter[KEY, OBJECT],
	updateHandler UpdaterHandler[KEY, OBJECT],
) UpdaterHandler[KEY, OBJECT]

NewUpdaterHandlerFilter creates an updater handler that filters objects before updating.

func NewUpdaterHandlerSkipErrors

func NewUpdaterHandlerSkipErrors[KEY ~[]byte | ~string, OBJECT any](
	handler UpdaterHandler[KEY, OBJECT],
	logSamplerFactory log.SamplerFactory,
) UpdaterHandler[KEY, OBJECT]

NewUpdaterHandlerSkipErrors creates an updater handler that logs and skips errors.

func NewUpdaterHandlerTxUpdate

func NewUpdaterHandlerTxUpdate[KEY ~[]byte | ~string, OBJECT any](
	db libkv.DB,
	updaterHandlerTx UpdaterHandlerTx[KEY, OBJECT],
) UpdaterHandler[KEY, OBJECT]

NewUpdaterHandlerTxUpdate creates an updater handler that executes within a database update transaction.

func NewUpdaterHandlerTxView

func NewUpdaterHandlerTxView[KEY ~[]byte | ~string, OBJECT any](
	db libkv.DB,
	updaterHandlerTx UpdaterHandlerTx[KEY, OBJECT],
) UpdaterHandler[KEY, OBJECT]

NewUpdaterHandlerTxView creates a read-only updater handler that executes within a database view transaction.

func UpdaterHandlerFunc

func UpdaterHandlerFunc[KEY ~[]byte | ~string, OBJECT any](
	updateFn func(ctx context.Context, key KEY, object OBJECT) error,
	deleteFn func(ctx context.Context, key KEY) error,
) UpdaterHandler[KEY, OBJECT]

UpdaterHandlerFunc creates an UpdaterHandler from separate update and delete functions.

type UpdaterHandlerList

type UpdaterHandlerList[KEY ~[]byte | ~string, OBJECT any] []UpdaterHandler[KEY, OBJECT]

UpdaterHandlerList is a list of UpdaterHandler that executes handlers sequentially.

func (UpdaterHandlerList[KEY, OBJECT]) Delete

func (e UpdaterHandlerList[KEY, OBJECT]) Delete(ctx context.Context, key KEY) error

Delete executes all handlers in the list sequentially for delete operations.

func (UpdaterHandlerList[KEY, OBJECT]) Update

func (e UpdaterHandlerList[KEY, OBJECT]) Update(ctx context.Context, key KEY, object OBJECT) error

Update executes all handlers in the list sequentially for update operations.

type UpdaterHandlerTx

type UpdaterHandlerTx[KEY ~[]byte | ~string, OBJECT any] interface {
	Update(ctx context.Context, tx libkv.Tx, key KEY, object OBJECT) error
	Delete(ctx context.Context, tx libkv.Tx, key KEY) error
}

UpdaterHandlerTx defines the interface for handling CRUD operations within a transaction context.

func NewUpdaterHandlerTxFilter

func NewUpdaterHandlerTxFilter[KEY ~[]byte | ~string, OBJECT any](
	filterTx Filter[KEY, OBJECT],
	updateHandlerTx UpdaterHandlerTx[KEY, OBJECT],
) UpdaterHandlerTx[KEY, OBJECT]

NewUpdaterHandlerTxFilter creates a transaction updater handler that filters objects before updating.

func NewUpdaterHandlerTxSkipErrors

func NewUpdaterHandlerTxSkipErrors[KEY ~[]byte | ~string, OBJECT any](
	handler UpdaterHandlerTx[KEY, OBJECT],
	logSamplerFactory log.SamplerFactory,
) UpdaterHandlerTx[KEY, OBJECT]

NewUpdaterHandlerTxSkipErrors creates a transaction updater handler that logs and skips errors.

func UpdaterHandlerTxFunc

func UpdaterHandlerTxFunc[KEY ~[]byte | ~string, OBJECT any](
	updateFn func(ctx context.Context, tx libkv.Tx, key KEY, object OBJECT) error,
	deleteFn func(ctx context.Context, tx libkv.Tx, key KEY) error,
) UpdaterHandlerTx[KEY, OBJECT]

UpdaterHandlerTxFunc creates an UpdaterHandlerTx from separate updateFn and delete functions.

type UpdaterHandlerTxList

type UpdaterHandlerTxList[KEY ~[]byte | ~string, OBJECT any] []UpdaterHandlerTx[KEY, OBJECT]

UpdaterHandlerTxList is a list of UpdaterHandlerTx that executes handlers sequentially within a transaction.

func (UpdaterHandlerTxList[KEY, OBJECT]) Delete

func (e UpdaterHandlerTxList[KEY, OBJECT]) Delete(ctx context.Context, tx libkv.Tx, key KEY) error

Delete executes all transaction handlers in the list sequentially for delete operations.

func (UpdaterHandlerTxList[KEY, OBJECT]) Update

func (e UpdaterHandlerTxList[KEY, OBJECT]) Update(
	ctx context.Context,
	tx libkv.Tx,
	key KEY,
	object OBJECT,
) error

Update executes all transaction handlers in the list sequentially for update operations.

type Value

type Value interface {
	validation.HasValidation
}

Value represents a Kafka message value that supports validation.

Source Files

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL