Documentation
¶
Overview ¶
Package kafka provides a production-ready Kafka abstraction library built on top of IBM's Sarama client.
This package offers a clean interface for Kafka operations while adding essential features like metrics integration, batch processing, transaction support, and comprehensive message handling patterns.
The library follows interface-driven architecture principles, making it composition-friendly with extensive interface support for testing and modularity.
Key Features:
- Interface-driven design with Consumer, SyncProducer, MessageHandler interfaces
- Built-in Prometheus metrics integration
- Batch processing with configurable parameters and delays
- Transaction support with atomic message processing
- TLS support for secure connections
- Multiple offset management strategies
- Decorator patterns for extending functionality
- Function types for functional programming support
Basic usage example:
ctx := context.Background()
brokers := kafka.ParseBrokers("localhost:9092")
// Create producer
producer, err := kafka.NewSyncProducer(ctx, brokers)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// Send message
msg := &sarama.ProducerMessage{
Topic: "my-topic",
Value: sarama.StringEncoder("Hello Kafka!"),
}
partition, offset, err := producer.SendMessage(ctx, msg)
if err != nil {
log.Fatal(err)
}
Index ¶
- Constants
- Variables
- func ConsumerGroupOffsets(ctx context.Context, saramaClient sarama.Client, offsetManager OffsetManager, ...) (map[Partition]Offset, error)
- func CreatePartitionConsumer(ctx context.Context, consumerFromClient sarama.Consumer, ...) (sarama.PartitionConsumer, error)
- func CreateSaramaConfig(ctx context.Context, brokers Brokers, opts ...SaramaConfigOptions) (*sarama.Config, error)
- func GetRealOffset(ctx context.Context, saramaClient sarama.Client, topic Topic, ...) (map[Partition]Offset, error)
- func GzipDecoder(ctx context.Context, compressedData []byte) ([]byte, error)
- func IsBrokenPipeError(err error) bool
- func NewGzipEncoder(ctx context.Context, value []byte) (sarama.ByteEncoder, error)
- func NewGzipEncoderWithLevel(ctx context.Context, value []byte, level int) (sarama.ByteEncoder, error)
- func NewJSONEncoder(ctx context.Context, value interface{}) (sarama.Encoder, error)
- func NewJsonEncoder(ctx context.Context, value interface{}) (sarama.Encoder, error)deprecated
- func NewOffsetManagerHandler(offsetManager OffsetManager, cancel context.CancelFunc) libhttp.WithError
- func NewTLSConfig(ctx context.Context, clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error)
- type BatchSize
- type Broker
- type BrokerSchema
- type BrokerSchemas
- type Brokers
- type Consumer
- func NewOffsetConsumer(saramaClient sarama.Client, topic Topic, offsetManager OffsetManager, ...) Consumer
- func NewOffsetConsumerBatch(saramaClient sarama.Client, topic Topic, offsetManager OffsetManager, ...) Consumer
- func NewOffsetConsumerBatchWithProvider(saramaClientProvider SaramaClientProvider, topic Topic, ...) Consumer
- func NewOffsetConsumerHighwaterMarks(saramaClient sarama.Client, topic Topic, offsetManager OffsetManager, ...) Consumer
- func NewOffsetConsumerHighwaterMarksBatch(saramaClient sarama.Client, topic Topic, offsetManager OffsetManager, ...) Consumer
- func NewOffsetConsumerHighwaterMarksBatchWithProvider(saramaClientProvider SaramaClientProvider, topic Topic, ...) Consumer
- func NewOffsetConsumerHighwaterMarksWithProvider(saramaClientProvider SaramaClientProvider, topic Topic, ...) Consumer
- func NewOffsetConsumerWithProvider(saramaClientProvider SaramaClientProvider, topic Topic, ...) Consumer
- func NewSimpleConsumer(saramaClient SaramaClient, topic Topic, initalOffset Offset, ...) Consumer
- func NewSimpleConsumerBatch(saramaClient SaramaClient, topic Topic, initalOffset Offset, ...) Consumer
- type ConsumerErrorHandler
- type ConsumerErrorHandlerFunc
- type ConsumerFunc
- type ConsumerOptions
- type Entries
- type Entry
- type Filter
- type FilterFunc
- type FilterTx
- type FilterTxFunc
- type Group
- type Header
- type HighwaterMarkProvider
- type JSONSender
- type JSONSenderOptions
- type JsonSenderdeprecated
- type JsonSenderOptionsdeprecated
- type Key
- type Keys
- type MessageHanderList
- type MessageHanderTxList
- type MessageHandler
- func NewMessageHandlerMetrics(messageHandler MessageHandler, metrics MetricsMessageHandler) MessageHandler
- func NewMessageHandlerSkipErrors(handler MessageHandler, logSamplerFactory log.SamplerFactory) MessageHandler
- func NewMessageHandlerUpdate[KEY ~[]byte | ~string, OBJECT any](updateHandler UpdaterHandler[KEY, OBJECT]) MessageHandler
- func NewMessageTxUpdate(db libkv.DB, messageHandlerTx MessageHandlerTx) MessageHandler
- func NewMessageTxView(db libkv.DB, messageHandlerTx MessageHandlerTx) MessageHandler
- func NewOffsetTriggerMessageHandler(triggerOffsets PartitionOffsets, topic Topic, trigger run.Fire) MessageHandler
- type MessageHandlerBatch
- func NewMessageHandlerBatch(messageHandler MessageHandler) MessageHandlerBatch
- func NewMessageHandlerBatchDelay(messageHandlerBatch MessageHandlerBatch, waiterDuration libtime.WaiterDuration, ...) MessageHandlerBatch
- func NewMessageHandlerBatchMetrics(messageHandler MessageHandlerBatch, metrics MetricsMessageHandler) MessageHandlerBatch
- func NewMessageHandlerBatchSkipErrors(handler MessageHandlerBatch, logSamplerFactory log.SamplerFactory) MessageHandlerBatch
- func NewMessageHandlerBatchTxUpdate(db libkv.DB, messageHandler MessageHandlerBatchTx) MessageHandlerBatch
- func NewMessageHandlerBatchTxView(db libkv.DB, messageHandler MessageHandlerBatchTx) MessageHandlerBatch
- type MessageHandlerBatchFunc
- type MessageHandlerBatchList
- type MessageHandlerBatchTx
- func NewMessageHandlerBatchTx(messageHandler MessageHandlerTx) MessageHandlerBatchTx
- func NewMessageHandlerBatchTxMetrics(messageHandler MessageHandlerBatchTx, metrics MetricsMessageHandler) MessageHandlerBatchTx
- func NewMessageHandlerBatchTxSkipErrors(handler MessageHandlerBatchTx, logSamplerFactory log.SamplerFactory) MessageHandlerBatchTx
- type MessageHandlerBatchTxFunc
- type MessageHandlerBatchTxList
- type MessageHandlerFunc
- type MessageHandlerTx
- func NewMessageHandlerTxMetrics(messageHandler MessageHandlerTx, metrics MetricsMessageHandler) MessageHandlerTx
- func NewMessageHandlerTxSkipErrors(handler MessageHandlerTx, logSamplerFactory log.SamplerFactory) MessageHandlerTx
- func NewMessageHandlerTxUpdate[KEY ~[]byte | ~string, OBJECT any](updateHandlerTx UpdaterHandlerTx[KEY, OBJECT]) MessageHandlerTx
- type MessageHandlerTxFunc
- type Metadata
- type Metrics
- type MetricsConsumer
- type MetricsMessageHandler
- type MetricsPartitionConsumer
- type MetricsSyncProducer
- type Offset
- type OffsetManager
- func NewSaramaOffsetManager(saramaClient SaramaClient, group Group, initalOffset Offset, ...) OffsetManager
- func NewSimpleOffsetManager(initalOffset Offset, fallbackOffset Offset) OffsetManager
- func NewStoreOffsetManager(offsetStore OffsetStore, initalOffset Offset, fallbackOffset Offset) OffsetManager
- type OffsetStore
- type Partition
- type PartitionOffsetItem
- type PartitionOffsetItems
- type PartitionOffsets
- type Partitions
- type SaramaClient
- type SaramaClientPool
- type SaramaClientPoolOptions
- type SaramaClientProvider
- func NewSaramaClientProviderByType(ctx context.Context, providerType SaramaClientProviderType, brokers Brokers, ...) (SaramaClientProvider, error)
- func NewSaramaClientProviderExisting(saramaClient SaramaClient) SaramaClientProvider
- func NewSaramaClientProviderNew(brokers Brokers, opts ...SaramaConfigOptions) SaramaClientProvider
- func NewSaramaClientProviderPool(brokers Brokers, poolOpts SaramaClientPoolOptions, opts ...SaramaConfigOptions) SaramaClientProvider
- func NewSaramaClientProviderReused(brokers Brokers, opts ...SaramaConfigOptions) SaramaClientProvider
- type SaramaClientProviderType
- type SaramaClientProviderTypes
- type SaramaConfigOptions
- type SaramaConsumer
- type SaramaPartitionConsumer
- type SaramaSyncProducer
- type SyncProducer
- func NewSyncProducer(ctx context.Context, brokers Brokers, opts ...SaramaConfigOptions) (SyncProducer, error)
- func NewSyncProducerFromSaramaClient(ctx context.Context, saramaClient SaramaClient) (SyncProducer, error)
- func NewSyncProducerFromSaramaClientProvider(ctx context.Context, saramaClientProvider SaramaClientProvider) (SyncProducer, error)
- func NewSyncProducerFromSaramaSyncProducer(saramaSyncProducer sarama.SyncProducer) SyncProducer
- func NewSyncProducerMetrics(syncProducer SyncProducer) SyncProducer
- func NewSyncProducerModify(syncProducer SyncProducer, ...) SyncProducer
- func NewSyncProducerNop() SyncProducer
- func NewSyncProducerWithHeader(ctx context.Context, brokers Brokers, headers Header, ...) (SyncProducer, error)
- func NewSyncProducerWithName(ctx context.Context, brokers Brokers, name string, opts ...SaramaConfigOptions) (SyncProducer, error)
- type Topic
- type TopicPartition
- type Topics
- type UpdaterHandler
- func NewUpdaterHandlerFilter[KEY ~[]byte | ~string, OBJECT any](filter Filter[KEY, OBJECT], updateHandler UpdaterHandler[KEY, OBJECT]) UpdaterHandler[KEY, OBJECT]
- func NewUpdaterHandlerSkipErrors[KEY ~[]byte | ~string, OBJECT any](handler UpdaterHandler[KEY, OBJECT], logSamplerFactory log.SamplerFactory) UpdaterHandler[KEY, OBJECT]
- func NewUpdaterHandlerTxUpdate[KEY ~[]byte | ~string, OBJECT any](db libkv.DB, updaterHandlerTx UpdaterHandlerTx[KEY, OBJECT]) UpdaterHandler[KEY, OBJECT]
- func NewUpdaterHandlerTxView[KEY ~[]byte | ~string, OBJECT any](db libkv.DB, updaterHandlerTx UpdaterHandlerTx[KEY, OBJECT]) UpdaterHandler[KEY, OBJECT]
- func UpdaterHandlerFunc[KEY ~[]byte | ~string, OBJECT any](updateFn func(ctx context.Context, key KEY, object OBJECT) error, ...) UpdaterHandler[KEY, OBJECT]
- type UpdaterHandlerList
- type UpdaterHandlerTx
- func NewUpdaterHandlerTxFilter[KEY ~[]byte | ~string, OBJECT any](filterTx Filter[KEY, OBJECT], updateHandlerTx UpdaterHandlerTx[KEY, OBJECT]) UpdaterHandlerTx[KEY, OBJECT]
- func NewUpdaterHandlerTxSkipErrors[KEY ~[]byte | ~string, OBJECT any](handler UpdaterHandlerTx[KEY, OBJECT], logSamplerFactory log.SamplerFactory) UpdaterHandlerTx[KEY, OBJECT]
- func UpdaterHandlerTxFunc[KEY ~[]byte | ~string, OBJECT any](updateFn func(ctx context.Context, tx libkv.Tx, key KEY, object OBJECT) error, ...) UpdaterHandlerTx[KEY, OBJECT]
- type UpdaterHandlerTxList
- type Value
Constants ¶
const DefaultOffsetStoreBucket = "offset-store"
const OffsetNewest = Offset(sarama.OffsetNewest)
OffsetNewest represents the most recent offset available for a partition.
const OffsetOldest = Offset(sarama.OffsetOldest)
OffsetOldest represents the oldest offset available for a partition.
const OutOfRangeErrorMessage = "The requested offset is outside the range of offsets maintained by the server for the given topic/partition"
OutOfRangeErrorMessage defines the error message returned when the requested offset is outside the range of offsets maintained by the server.
Variables ¶
var AvailableSaramaClientProviderTypes = SaramaClientProviderTypes{ SaramaClientProviderTypeReused, SaramaClientProviderTypeNew, SaramaClientProviderTypePool, }
AvailableSaramaClientProviderTypes contains all valid SaramaClientProviderType values.
var ClosedError = ErrClosed
ClosedError is deprecated: Use ErrClosed instead.
var DefaultSaramaClientPoolOptions = SaramaClientPoolOptions{ MaxPoolSize: 10, HealthCheckTimeout: 5 * time.Second, }
DefaultSaramaClientPoolOptions returns default pool configuration.
var ErrClosed = stderrors.New("closed")
ErrClosed is returned when operations are attempted on a closed offset manager.
Functions ¶
func ConsumerGroupOffsets ¶
func ConsumerGroupOffsets( ctx context.Context, saramaClient sarama.Client, offsetManager OffsetManager, topic Topic, ) (map[Partition]Offset, error)
ConsumerGroupOffsets retrieves the next offset for each partition of the specified topic using the provided offset manager. It returns a map of partitions to their corresponding next offsets.
func CreatePartitionConsumer ¶
func CreatePartitionConsumer( ctx context.Context, consumerFromClient sarama.Consumer, metricsConsumer MetricsPartitionConsumer, topic Topic, partition Partition, fallbackOffset Offset, nextOffset Offset, ) (sarama.PartitionConsumer, error)
CreatePartitionConsumer create partition consumer and use initial offset if out of range error
func CreateSaramaConfig ¶
func CreateSaramaConfig( ctx context.Context, brokers Brokers, opts ...SaramaConfigOptions, ) (*sarama.Config, error)
CreateSaramaConfig creates a new Sarama configuration with default settings and applies optional modifications. It configures producers for high durability, consumers for oldest offset consumption, and enables TLS if brokers use TLS schema.
func GetRealOffset ¶
func GetRealOffset( ctx context.Context, saramaClient sarama.Client, topic Topic, offsets map[Partition]Offset, ) (map[Partition]Offset, error)
GetRealOffset get offset for newest or oldest
func GzipDecoder ¶ added in v1.16.0
GzipDecoder decompresses gzip-compressed data. It takes compressed data as a byte slice, decompresses it using gzip, and returns the decompressed data as a byte slice. Returns an error if the data is nil, not valid gzip format, or decompression fails.
func IsBrokenPipeError ¶
IsBrokenPipeError checks if the given error represents a broken pipe error. It returns true if the error message ends with "broken pipe".
func NewGzipEncoder ¶ added in v1.16.0
NewGzipEncoder compresses a byte slice using gzip compression with default compression level. It returns the compressed data as a sarama.ByteEncoder. Returns an error if compression fails.
func NewGzipEncoderWithLevel ¶ added in v1.16.0
func NewGzipEncoderWithLevel( ctx context.Context, value []byte, level int, ) (sarama.ByteEncoder, error)
NewGzipEncoderWithLevel compresses a byte slice using gzip compression with a specified compression level. It returns the compressed data as a sarama.ByteEncoder. The level parameter should be one of: gzip.NoCompression, gzip.BestSpeed, gzip.BestCompression, or gzip.DefaultCompression. Returns an error if the compression level is invalid or compression fails.
func NewJSONEncoder ¶ added in v1.18.0
NewJSONEncoder creates a new JSON encoder that marshals the given value into a Sarama encoder.
func NewOffsetManagerHandler ¶
func NewOffsetManagerHandler( offsetManager OffsetManager, cancel context.CancelFunc, ) libhttp.WithError
NewOffsetManagerHandler creates an HTTP handler for managing Kafka consumer offsets.
Types ¶
type BatchSize ¶
type BatchSize int
BatchSize represents the number of messages to process in a single batch operation.
type Broker ¶
type Broker string
Broker represents a Kafka broker address with schema and host information.
func ParseBroker ¶
ParseBroker parses a string value into a Broker, adding a default plain schema if none is specified.
func (Broker) MarshalText ¶ added in v1.17.2
MarshalText implements encoding.TextMarshaler for Broker.
func (Broker) Schema ¶
func (b Broker) Schema() BrokerSchema
Schema extracts and returns the schema portion of the broker address.
func (*Broker) UnmarshalText ¶ added in v1.17.0
UnmarshalText implements encoding.TextUnmarshaler for Broker.
type BrokerSchema ¶
type BrokerSchema string
BrokerSchema represents the connection schema type for a Kafka broker (plain or tls).
const ( // PlainSchema represents a plain text connection to Kafka broker. PlainSchema BrokerSchema = "plain" // TLSSchema represents a TLS-encrypted connection to Kafka broker. TLSSchema BrokerSchema = "tls" )
func (BrokerSchema) String ¶
func (s BrokerSchema) String() string
String returns the string representation of the BrokerSchema.
type BrokerSchemas ¶
type BrokerSchemas []BrokerSchema
BrokerSchemas represents a slice of BrokerSchema values.
func (BrokerSchemas) Contains ¶
func (s BrokerSchemas) Contains(schema BrokerSchema) bool
Contains checks if the given schema exists in the BrokerSchemas slice.
type Brokers ¶
type Brokers []Broker
Brokers represents a collection of Kafka broker addresses.
func ParseBrokers ¶
ParseBrokers converts a slice of string broker addresses into a Brokers slice.
func ParseBrokersFromString ¶
ParseBrokersFromString parses a comma-separated string of broker addresses into a Brokers slice.
func (Brokers) Hosts ¶
Hosts returns a slice of all broker host addresses from the Brokers collection.
func (Brokers) MarshalText ¶ added in v1.17.2
MarshalText implements encoding.TextMarshaler for Brokers.
func (Brokers) Schemas ¶
func (b Brokers) Schemas() BrokerSchemas
Schemas returns a slice of all broker schemas from the Brokers collection.
func (*Brokers) UnmarshalText ¶ added in v1.17.0
UnmarshalText implements encoding.TextUnmarshaler for Brokers.
type Consumer ¶
type Consumer interface {
// Consume starts consuming messages and blocks until the context is cancelled or an error occurs.
Consume(ctx context.Context) error
}
Consumer defines the interface for consuming messages from Kafka.
func NewOffsetConsumer ¶
func NewOffsetConsumer( saramaClient sarama.Client, topic Topic, offsetManager OffsetManager, messageHandler MessageHandler, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewOffsetConsumer creates a new offset-based consumer that processes messages one at a time.
func NewOffsetConsumerBatch ¶
func NewOffsetConsumerBatch( saramaClient sarama.Client, topic Topic, offsetManager OffsetManager, messageHandlerBatch MessageHandlerBatch, batchSize BatchSize, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewOffsetConsumerBatch creates a new offset-based consumer that processes messages in batches.
func NewOffsetConsumerBatchWithProvider ¶ added in v1.19.0
func NewOffsetConsumerBatchWithProvider( saramaClientProvider SaramaClientProvider, topic Topic, offsetManager OffsetManager, messageHandlerBatch MessageHandlerBatch, batchSize BatchSize, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewOffsetConsumerBatchWithProvider creates a new offset-based consumer that processes messages in batches.
func NewOffsetConsumerHighwaterMarks ¶
func NewOffsetConsumerHighwaterMarks( saramaClient sarama.Client, topic Topic, offsetManager OffsetManager, messageHandler MessageHandler, trigger run.Fire, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewOffsetConsumerHighwaterMarks creates a consumer that processes messages up to high watermarks.
func NewOffsetConsumerHighwaterMarksBatch ¶
func NewOffsetConsumerHighwaterMarksBatch( saramaClient sarama.Client, topic Topic, offsetManager OffsetManager, messageHandlerBatch MessageHandlerBatch, batchSize BatchSize, trigger run.Fire, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewOffsetConsumerHighwaterMarksBatch creates a batch consumer that processes messages up to high watermarks.
func NewOffsetConsumerHighwaterMarksBatchWithProvider ¶ added in v1.19.0
func NewOffsetConsumerHighwaterMarksBatchWithProvider( saramaClientProvider SaramaClientProvider, topic Topic, offsetManager OffsetManager, messageHandlerBatch MessageHandlerBatch, batchSize BatchSize, trigger run.Fire, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewOffsetConsumerHighwaterMarksBatchWithProvider creates a batch consumer that processes messages up to high watermarks.
func NewOffsetConsumerHighwaterMarksWithProvider ¶ added in v1.19.0
func NewOffsetConsumerHighwaterMarksWithProvider( saramaClientProvider SaramaClientProvider, topic Topic, offsetManager OffsetManager, messageHandler MessageHandler, trigger run.Fire, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewOffsetConsumerHighwaterMarksWithProvider creates a consumer that processes messages up to high watermarks.
func NewOffsetConsumerWithProvider ¶ added in v1.19.0
func NewOffsetConsumerWithProvider( saramaClientProvider SaramaClientProvider, topic Topic, offsetManager OffsetManager, messageHandler MessageHandler, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewOffsetConsumerWithProvider creates a new offset-based consumer that processes messages one at a time.
func NewSimpleConsumer ¶
func NewSimpleConsumer( saramaClient SaramaClient, topic Topic, initalOffset Offset, messageHandler MessageHandler, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewSimpleConsumer creates a new simple consumer that processes messages individually. It wraps the provided MessageHandler in a batch handler with batch size 1.
func NewSimpleConsumerBatch ¶
func NewSimpleConsumerBatch( saramaClient SaramaClient, topic Topic, initalOffset Offset, messageHandler MessageHandlerBatch, batchSize BatchSize, logSamplerFactory log.SamplerFactory, options ...func(*ConsumerOptions), ) Consumer
NewSimpleConsumerBatch creates a new simple consumer that processes messages in batches. It uses a simple offset manager with the provided initial offset for both initial and fallback scenarios.
type ConsumerErrorHandler ¶
type ConsumerErrorHandler interface {
HandleError(err *sarama.ConsumerError) error
}
ConsumerErrorHandler defines an interface for handling Kafka consumer errors.
func NewConsumerErrorHandler ¶
func NewConsumerErrorHandler( metricsConsumer MetricsConsumer, ) ConsumerErrorHandler
NewConsumerErrorHandler creates a new ConsumerErrorHandler that logs errors and updates metrics.
type ConsumerErrorHandlerFunc ¶
type ConsumerErrorHandlerFunc func(err *sarama.ConsumerError) error
ConsumerErrorHandlerFunc is a function type that implements ConsumerErrorHandler.
func (ConsumerErrorHandlerFunc) HandleError ¶
func (c ConsumerErrorHandlerFunc) HandleError(err *sarama.ConsumerError) error
HandleError implements the ConsumerErrorHandler interface for ConsumerErrorHandlerFunc.
type ConsumerFunc ¶
ConsumerFunc is a function type that implements the Consumer interface.
type ConsumerOptions ¶
ConsumerOptions configures optional parameters for offset consumers.
type Entries ¶
type Entries []Entry
Entries represents a collection of Entry items for batch operations.
type Entry ¶
type Entry struct {
Topic Topic `json:"topic"`
Headers []sarama.RecordHeader `json:"headers"`
Key Key `json:"key"`
Value Value `json:"value"`
}
Entry represents a single Kafka message entry with topic, headers, key and value.
type Filter ¶
type Filter[KEY ~[]byte | ~string, OBJECT any] interface { // Filtered return true if should be filter out Filtered(ctx context.Context, key KEY, object OBJECT) (bool, error) }
Filter defines an interface for filtering objects based on key and object content.
type FilterFunc ¶
type FilterFunc[KEY ~[]byte | ~string, OBJECT any] func(ctx context.Context, key KEY, object OBJECT) (bool, error)
FilterFunc is a function type that implements the Filter interface.
type FilterTx ¶
type FilterTx[KEY ~[]byte | ~string, OBJECT any] interface { // Filtered return true if should be filter out Filtered(ctx context.Context, tx libkv.Tx, key KEY, object OBJECT) (bool, error) }
FilterTx defines an interface for filtering objects within a transaction context.
type FilterTxFunc ¶
type FilterTxFunc[KEY ~[]byte | ~string, OBJECT any] func(ctx context.Context, tx libkv.Tx, key KEY, object OBJECT) (bool, error)
FilterTxFunc is a function type that implements the FilterTx interface.
type Header ¶
Header represents HTTP-style headers for Kafka messages with support for multiple values per key.
func ParseHeader ¶
func ParseHeader(saramaHeaders []*sarama.RecordHeader) Header
ParseHeader converts Sarama record headers into a Header map.
func (Header) AsSaramaHeaders ¶
func (h Header) AsSaramaHeaders() []sarama.RecordHeader
AsSaramaHeaders converts the Header into Sarama record headers format.
func (Header) Get ¶
Get returns the first value for the given header key, or empty string if not found.
type HighwaterMarkProvider ¶
type HighwaterMarkProvider interface {
HighWaterMark(ctx context.Context, topic Topic, partition Partition) (*Offset, error)
}
HighwaterMarkProvider provides methods to retrieve high water mark offsets from Kafka topics.
func NewHighwaterMarkProvider ¶
func NewHighwaterMarkProvider( saramaClient sarama.Client, ) HighwaterMarkProvider
NewHighwaterMarkProvider creates a new HighwaterMarkProvider using the provided Sarama client.
type JSONSender ¶
type JSONSender interface {
SendUpdate(
ctx context.Context,
topic Topic,
key Key,
value Value,
headers ...sarama.RecordHeader,
) error
SendUpdates(ctx context.Context, entries Entries) error
SendDelete(ctx context.Context, topic Topic, key Key, headers ...sarama.RecordHeader) error
SendDeletes(ctx context.Context, entries Entries) error
}
JSONSender provides methods for sending JSON-encoded messages to Kafka topics.
func NewJSONSender ¶
func NewJSONSender( producer SyncProducer, logSamplerFactory log.SamplerFactory, optionsFns ...func(options *JSONSenderOptions), ) JSONSender
NewJSONSender creates a new JSONSender with the provided producer and options.
func NewJsonSender
deprecated
added in
v1.14.1
func NewJsonSender( producer SyncProducer, logSamplerFactory log.SamplerFactory, optionsFns ...func(options *JSONSenderOptions), ) JSONSender
NewJsonSender creates a new JSONSender with the provided producer and options.
Deprecated: Use NewJSONSender instead.
type JSONSenderOptions ¶
type JSONSenderOptions struct {
ValidationDisabled bool
}
JSONSenderOptions defines configuration options for JSONSender behavior.
type JsonSender
deprecated
added in
v1.14.1
type JsonSender = JSONSender
JsonSender provides methods for sending JSON-encoded messages to Kafka topics.
Deprecated: Use JSONSender instead.
type JsonSenderOptions
deprecated
added in
v1.14.1
type JsonSenderOptions = JSONSenderOptions
JsonSenderOptions defines configuration options for JsonSender behavior.
Deprecated: Use JSONSenderOptions instead.
type Key ¶
Key represents a Kafka message key that can be converted to bytes or string.
type MessageHanderList ¶
type MessageHanderList []MessageHandler
MessageHanderList is a list of MessageHandler that executes handlers sequentially.
func (MessageHanderList) ConsumeMessage ¶
func (m MessageHanderList) ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error
ConsumeMessage executes all handlers in the list sequentially for a single message.
type MessageHanderTxList ¶
type MessageHanderTxList []MessageHandlerTx
MessageHanderTxList is a list of MessageHandlerTx that executes handlers sequentially within a transaction.
func (MessageHanderTxList) ConsumeMessage ¶
func (m MessageHanderTxList) ConsumeMessage( ctx context.Context, tx libkv.Tx, msg *sarama.ConsumerMessage, ) error
ConsumeMessage executes all transaction handlers in the list sequentially for a single message.
type MessageHandler ¶
type MessageHandler interface {
// ConsumeMessage processes a single Kafka message and returns an error if processing fails.
ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error
}
MessageHandler defines the interface for processing individual Kafka messages.
func NewMessageHandlerMetrics ¶
func NewMessageHandlerMetrics( messageHandler MessageHandler, metrics MetricsMessageHandler, ) MessageHandler
NewMessageHandlerMetrics is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.
func NewMessageHandlerSkipErrors ¶
func NewMessageHandlerSkipErrors( handler MessageHandler, logSamplerFactory log.SamplerFactory, ) MessageHandler
NewMessageHandlerSkipErrors creates a message handler that logs and skips errors.
func NewMessageHandlerUpdate ¶
func NewMessageHandlerUpdate[KEY ~[]byte | ~string, OBJECT any]( updateHandler UpdaterHandler[KEY, OBJECT], ) MessageHandler
NewMessageHandlerUpdate creates a generic message handler that processes JSON messages for CRUD operations.
func NewMessageTxUpdate ¶
func NewMessageTxUpdate(db libkv.DB, messageHandlerTx MessageHandlerTx) MessageHandler
NewMessageTxUpdate creates a message handler that executes within a database update transaction.
func NewMessageTxView ¶
func NewMessageTxView(db libkv.DB, messageHandlerTx MessageHandlerTx) MessageHandler
NewMessageTxView creates a read-only message handler that executes within a database view transaction.
func NewOffsetTriggerMessageHandler ¶
func NewOffsetTriggerMessageHandler( triggerOffsets PartitionOffsets, topic Topic, trigger run.Fire, ) MessageHandler
NewOffsetTriggerMessageHandler returns message handler that call the given trigger if all offset are reached
type MessageHandlerBatch ¶
type MessageHandlerBatch interface {
ConsumeMessages(ctx context.Context, messages []*sarama.ConsumerMessage) error
}
MessageHandlerBatch defines an interface for processing batches of Kafka messages.
func NewMessageHandlerBatch ¶
func NewMessageHandlerBatch(messageHandler MessageHandler) MessageHandlerBatch
NewMessageHandlerBatch creates a new batch message handler that processes messages individually using the provided MessageHandler.
func NewMessageHandlerBatchDelay ¶
func NewMessageHandlerBatchDelay( messageHandlerBatch MessageHandlerBatch, waiterDuration libtime.WaiterDuration, delay libtime.Duration, ) MessageHandlerBatch
NewMessageHandlerBatchDelay returns a MessageHandlerBatch that sleep for the given duration after each consume this enabled the consumer to get more messages per consume
func NewMessageHandlerBatchMetrics ¶
func NewMessageHandlerBatchMetrics( messageHandler MessageHandlerBatch, metrics MetricsMessageHandler, ) MessageHandlerBatch
NewMessageHandlerBatchMetrics is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.
func NewMessageHandlerBatchSkipErrors ¶
func NewMessageHandlerBatchSkipErrors( handler MessageHandlerBatch, logSamplerFactory log.SamplerFactory, ) MessageHandlerBatch
NewMessageHandlerBatchSkipErrors creates a message handler that logs and skips errors.
func NewMessageHandlerBatchTxUpdate ¶
func NewMessageHandlerBatchTxUpdate( db libkv.DB, messageHandler MessageHandlerBatchTx, ) MessageHandlerBatch
NewMessageHandlerBatchTxUpdate creates a batch handler that executes within a database update transaction.
func NewMessageHandlerBatchTxView ¶
func NewMessageHandlerBatchTxView( db libkv.DB, messageHandler MessageHandlerBatchTx, ) MessageHandlerBatch
NewMessageHandlerBatchTxView creates a read-only batch handler that executes within a database view transaction.
type MessageHandlerBatchFunc ¶
type MessageHandlerBatchFunc func(ctx context.Context, messages []*sarama.ConsumerMessage) error
MessageHandlerBatchFunc is a function type that implements MessageHandlerBatch interface.
func (MessageHandlerBatchFunc) ConsumeMessages ¶
func (b MessageHandlerBatchFunc) ConsumeMessages( ctx context.Context, messages []*sarama.ConsumerMessage, ) error
ConsumeMessages implements the MessageHandlerBatch interface.
type MessageHandlerBatchList ¶
type MessageHandlerBatchList []MessageHandlerBatch
MessageHandlerBatchList is a list of MessageHandlerBatch that executes handlers sequentially.
func (MessageHandlerBatchList) ConsumeMessages ¶
func (m MessageHandlerBatchList) ConsumeMessages( ctx context.Context, messages []*sarama.ConsumerMessage, ) error
ConsumeMessages executes all handlers in the list sequentially.
type MessageHandlerBatchTx ¶
type MessageHandlerBatchTx interface {
ConsumeMessages(ctx context.Context, tx libkv.Tx, messages []*sarama.ConsumerMessage) error
}
MessageHandlerBatchTx defines the interface for handling batch messages within a transaction context.
func NewMessageHandlerBatchTx ¶
func NewMessageHandlerBatchTx(messageHandler MessageHandlerTx) MessageHandlerBatchTx
NewMessageHandlerBatchTx creates a batch transaction handler from a single message transaction handler.
func NewMessageHandlerBatchTxMetrics ¶
func NewMessageHandlerBatchTxMetrics( messageHandler MessageHandlerBatchTx, metrics MetricsMessageHandler, ) MessageHandlerBatchTx
NewMessageHandlerBatchTxMetrics is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.
func NewMessageHandlerBatchTxSkipErrors ¶
func NewMessageHandlerBatchTxSkipErrors( handler MessageHandlerBatchTx, logSamplerFactory log.SamplerFactory, ) MessageHandlerBatchTx
NewMessageHandlerBatchTxSkipErrors creates a transaction batch message handler that logs and skips errors.
type MessageHandlerBatchTxFunc ¶
type MessageHandlerBatchTxFunc func(ctx context.Context, tx libkv.Tx, messages []*sarama.ConsumerMessage) error
MessageHandlerBatchTxFunc is a function type that implements MessageHandlerBatchTx interface.
func (MessageHandlerBatchTxFunc) ConsumeMessages ¶
func (b MessageHandlerBatchTxFunc) ConsumeMessages( ctx context.Context, tx libkv.Tx, messages []*sarama.ConsumerMessage, ) error
ConsumeMessages implements the MessageHandlerBatchTx interface.
type MessageHandlerBatchTxList ¶
type MessageHandlerBatchTxList []MessageHandlerBatchTx
MessageHandlerBatchTxList is a list of MessageHandlerBatchTx that executes handlers sequentially within a transaction.
func (MessageHandlerBatchTxList) ConsumeMessages ¶
func (m MessageHandlerBatchTxList) ConsumeMessages( ctx context.Context, tx libkv.Tx, messages []*sarama.ConsumerMessage, ) error
ConsumeMessages executes all transaction handlers in the list sequentially.
type MessageHandlerFunc ¶
type MessageHandlerFunc func(ctx context.Context, msg *sarama.ConsumerMessage) error
MessageHandlerFunc allow use a function as MessageHandler.
func (MessageHandlerFunc) ConsumeMessage ¶
func (m MessageHandlerFunc) ConsumeMessage(ctx context.Context, msg *sarama.ConsumerMessage) error
ConsumeMessage forward to the function.
type MessageHandlerTx ¶
type MessageHandlerTx interface {
ConsumeMessage(ctx context.Context, tx libkv.Tx, msg *sarama.ConsumerMessage) error
}
MessageHandlerTx defines the interface for handling messages within a transaction context.
func NewMessageHandlerTxMetrics ¶
func NewMessageHandlerTxMetrics( messageHandler MessageHandlerTx, metrics MetricsMessageHandler, ) MessageHandlerTx
NewMessageHandlerTxMetrics is a MessageHandler adapter that create Prometheus metrics for started, completed and failed.
func NewMessageHandlerTxSkipErrors ¶
func NewMessageHandlerTxSkipErrors( handler MessageHandlerTx, logSamplerFactory log.SamplerFactory, ) MessageHandlerTx
NewMessageHandlerTxSkipErrors creates a transaction message handler that logs and skips errors.
func NewMessageHandlerTxUpdate ¶
func NewMessageHandlerTxUpdate[KEY ~[]byte | ~string, OBJECT any]( updateHandlerTx UpdaterHandlerTx[KEY, OBJECT], ) MessageHandlerTx
NewMessageHandlerTxUpdate creates a generic transaction message handler that processes JSON messages for CRUD operations.
type MessageHandlerTxFunc ¶
MessageHandlerTxFunc allow use a function as MessageHandler.
func (MessageHandlerTxFunc) ConsumeMessage ¶
func (m MessageHandlerTxFunc) ConsumeMessage( ctx context.Context, tx libkv.Tx, msg *sarama.ConsumerMessage, ) error
ConsumeMessage forward to the function.
type Metadata ¶
type Metadata string
Metadata represents a string-based metadata identifier used in Kafka operations.
const DefaultMetadata Metadata = "offsetmanager"
DefaultMetadata defines the default metadata value used for offset manager operations.
type Metrics ¶
type Metrics interface {
MetricsMessageHandler
MetricsConsumer
MetricsPartitionConsumer
MetricsSyncProducer
}
Metrics provides a comprehensive interface for collecting Kafka-related metrics. It combines all metric collection interfaces for consumers, producers, and message handlers.
func NewMetrics ¶
func NewMetrics() Metrics
NewMetrics creates a new Metrics implementation that collects Prometheus metrics for Kafka operations including consumer, producer, and message handler activities.
type MetricsConsumer ¶
type MetricsConsumer interface {
CurrentOffset(topic Topic, partition Partition, offset Offset)
HighWaterMarkOffset(topic Topic, partition Partition, offset Offset)
ErrorCounterInc(topic Topic, partition Partition)
}
MetricsConsumer provides metrics collection methods for Kafka consumer operations. It tracks offset positions, high watermarks, and consumer errors.
type MetricsMessageHandler ¶
type MetricsMessageHandler interface {
MessageHandlerTotalCounterInc(topic Topic, partition Partition)
MessageHandlerSuccessCounterInc(topic Topic, partition Partition)
MessageHandlerFailureCounterInc(topic Topic, partition Partition)
MessageHandlerDurationMeasure(topic Topic, partition Partition, duration time.Duration)
}
MetricsMessageHandler provides metrics collection methods for message handler operations. It tracks total processed messages, success/failure rates, and processing durations.
type MetricsPartitionConsumer ¶
type MetricsPartitionConsumer interface {
ConsumePartitionCreateOutOfRangeErrorInitialize(topic Topic, partition Partition)
ConsumePartitionCreateOutOfRangeErrorInc(topic Topic, partition Partition)
ConsumePartitionCreateFailureInc(topic Topic, partition Partition)
ConsumePartitionCreateSuccessInc(topic Topic, partition Partition)
ConsumePartitionCreateTotalInc(topic Topic, partition Partition)
}
MetricsPartitionConsumer provides metrics collection methods for partition consumer creation operations. It tracks the success, failure, and out-of-range error rates when creating partition consumers.
type MetricsSyncProducer ¶
type MetricsSyncProducer interface {
SyncProducerTotalCounterInc(topic Topic)
SyncProducerFailureCounterInc(topic Topic)
SyncProducerSuccessCounterInc(topic Topic)
SyncProducerDurationMeasure(topic Topic, duration time.Duration)
}
MetricsSyncProducer provides metrics collection methods for synchronous Kafka producer operations. It tracks total messages sent, success/failure rates, and send durations.
type Offset ¶
type Offset int64
Offset in the Kafka topic.
func HighWaterMark ¶
func HighWaterMark( ctx context.Context, saramaClient sarama.Client, topic Topic, partition Partition, ) (*Offset, error)
HighWaterMark returns the high water mark offset for the specified topic and partition.
func OffsetFromBytes ¶
OffsetFromBytes returns the offset for the given bytes.
func ParseOffset ¶
ParseOffset from a string
type OffsetManager ¶
type OffsetManager interface {
// InitialOffset returns the offset to use when no previous offset exists for a partition.
InitialOffset() Offset
// FallbackOffset returns the offset to use when the stored offset is invalid or unavailable.
FallbackOffset() Offset
// NextOffset retrieves the next offset to consume for the given topic and partition.
NextOffset(ctx context.Context, topic Topic, partition Partition) (Offset, error)
// MarkOffset marks the provided offset as processed. This only allows forward movement (incrementing).
// To follow upstream conventions, you should mark the offset of the next message to read, not the last message read.
MarkOffset(ctx context.Context, topic Topic, partition Partition, nextOffset Offset) error
// ResetOffset resets to the provided offset, allowing backward movement to earlier or smaller values.
// This acts as a counterpart to MarkOffset and should be called before MarkOffset when setting offsets backwards.
ResetOffset(ctx context.Context, topic Topic, partition Partition, nextOffset Offset) error
io.Closer
}
OffsetManager manages Kafka consumer offsets for topics and partitions. It provides methods to retrieve initial and fallback offsets, track the next offset to consume, and mark offsets as processed.
func NewSaramaOffsetManager ¶
func NewSaramaOffsetManager( saramaClient SaramaClient, group Group, initalOffset Offset, fallbackOffset Offset, ) OffsetManager
NewSaramaOffsetManager creates a new offset manager using Sarama's built-in offset management.
func NewSimpleOffsetManager ¶
func NewSimpleOffsetManager( initalOffset Offset, fallbackOffset Offset, ) OffsetManager
NewSimpleOffsetManager creates a new simple in-memory offset manager.
func NewStoreOffsetManager ¶
func NewStoreOffsetManager( offsetStore OffsetStore, initalOffset Offset, fallbackOffset Offset, ) OffsetManager
NewStoreOffsetManager creates a new offset manager that persists offsets using a store.
type OffsetStore ¶
type OffsetStore interface {
Get(ctx context.Context, topic Topic, partition Partition) (Offset, error)
Set(ctx context.Context, topic Topic, partition Partition, offset Offset) error
}
OffsetStore provides persistent storage for Kafka topic partition offsets.
func NewOffsetStore ¶
func NewOffsetStore( db libkv.DB, ) OffsetStore
NewOffsetStore creates a new OffsetStore using the provided database.
func NewOffsetStoreGroup ¶
func NewOffsetStoreGroup( db libkv.DB, group Group, ) OffsetStore
NewOffsetStoreGroup creates a new OffsetStore for a specific consumer group.
type Partition ¶
type Partition int32
Partition in Kafka.
func ParsePartition ¶
ParsePartition from a string
func PartitionFromBytes ¶
PartitionFromBytes returns the partition for the given bytes.
type PartitionOffsetItem ¶
PartitionOffsetItem represents a single partition and its offset.
type PartitionOffsetItems ¶
type PartitionOffsetItems []PartitionOffsetItem
PartitionOffsetItems represents a slice of partition offset items.
func (PartitionOffsetItems) Offsets ¶
func (o PartitionOffsetItems) Offsets() PartitionOffsets
Offsets converts the slice to a PartitionOffsets map.
type PartitionOffsets ¶
PartitionOffsets represents a mapping of partitions to their corresponding offsets.
func HighWaterMarks ¶
func HighWaterMarks( ctx context.Context, saramaClient sarama.Client, topic Topic, ) (PartitionOffsets, error)
HighWaterMarks returns the high water marks for all partitions of the specified topic.
func ParsePartitionOffsetFromBytes ¶
func ParsePartitionOffsetFromBytes( ctx context.Context, offsetBytes []byte, ) (PartitionOffsets, error)
ParsePartitionOffsetFromBytes parses partition offsets from JSON bytes.
func (PartitionOffsets) Bytes ¶
func (o PartitionOffsets) Bytes() ([]byte, error)
Bytes serializes the partition offsets to JSON bytes.
func (PartitionOffsets) Clone ¶
func (o PartitionOffsets) Clone() PartitionOffsets
Clone creates a deep copy of the partition offsets.
func (PartitionOffsets) OffsetPartitions ¶
func (o PartitionOffsets) OffsetPartitions() PartitionOffsetItems
OffsetPartitions converts the map to a slice of PartitionOffsetItem.
type Partitions ¶
type Partitions []Partition
Partitions represents a collection of Kafka partitions.
func PartitionsFromInt32 ¶
func PartitionsFromInt32(partitions []int32) Partitions
PartitionsFromInt32 converts a slice of int32 values to Partitions.
type SaramaClient ¶
SaramaClient defines the interface for Sarama Kafka client operations.
func CreateSaramaClient ¶
func CreateSaramaClient( ctx context.Context, brokers Brokers, opts ...SaramaConfigOptions, ) (SaramaClient, error)
CreateSaramaClient creates a new Sarama Kafka client with the specified configuration.
type SaramaClientPool ¶ added in v1.22.0
type SaramaClientPool interface {
// Acquire returns a healthy client from pool or creates new one.
// The client is health-checked before being returned.
Acquire(ctx context.Context) (SaramaClient, error)
// Release returns client to pool (discards if unhealthy).
// The healthy parameter indicates whether the client is still healthy.
Release(client SaramaClient, healthy bool)
// Close closes all pooled connections.
Close() error
}
SaramaClientPool manages a pool of Sarama clients with health checks. It reuses healthy clients and discards unhealthy ones automatically.
func NewSaramaClientPool ¶ added in v1.22.0
func NewSaramaClientPool( factory func(context.Context) (SaramaClient, error), opts SaramaClientPoolOptions, ) SaramaClientPool
NewSaramaClientPool creates a new client pool with the specified factory function. Use DefaultSaramaClientPoolOptions for default configuration.
type SaramaClientPoolOptions ¶ added in v1.22.0
type SaramaClientPoolOptions struct {
// MaxPoolSize is the maximum number of clients to keep in the pool.
// Default: 10
MaxPoolSize int
// HealthCheckTimeout is the timeout for health checks.
// Default: 5s
HealthCheckTimeout time.Duration
}
SaramaClientPoolOptions configures the client pool behavior.
type SaramaClientProvider ¶ added in v1.18.0
type SaramaClientProvider interface {
// Client creates and returns a Sarama client.
// The behavior depends on the implementation - it may return a new client or reuse an existing one.
Client(ctx context.Context) (SaramaClient, error)
// Close closes all Sarama clients that were created by this provider.
// This method is safe to call multiple times and safe to defer immediately after creation.
Close() error
}
SaramaClientProvider provides Sarama Kafka clients and manages their lifecycle. It creates clients on demand and tracks them for proper cleanup via Close().
func NewSaramaClientProviderByType ¶ added in v1.18.0
func NewSaramaClientProviderByType( ctx context.Context, providerType SaramaClientProviderType, brokers Brokers, opts ...SaramaConfigOptions, ) (SaramaClientProvider, error)
NewSaramaClientProviderByType creates a SaramaClientProvider based on the specified type.
func NewSaramaClientProviderExisting ¶ added in v1.19.0
func NewSaramaClientProviderExisting( saramaClient SaramaClient, ) SaramaClientProvider
NewSaramaClientProviderExisting creates a SaramaClientProvider that wraps an existing Sarama client. This adapter allows existing Sarama clients to be used with the provider pattern without breaking backward compatibility. The provider returns the same client instance on every call to Client() and delegates Close() to the wrapped client.
func NewSaramaClientProviderNew ¶ added in v1.18.0
func NewSaramaClientProviderNew( brokers Brokers, opts ...SaramaConfigOptions, ) SaramaClientProvider
NewSaramaClientProviderNew creates a SaramaClientProvider that creates a new client for each call. All created clients are tracked and closed when Close() is called. The provided options will be applied to all created clients.
func NewSaramaClientProviderPool ¶ added in v1.22.0
func NewSaramaClientProviderPool( brokers Brokers, poolOpts SaramaClientPoolOptions, opts ...SaramaConfigOptions, ) SaramaClientProvider
NewSaramaClientProviderPool creates a SaramaClientProvider that uses a connection pool. The pool manages client lifecycle with health checks and automatic reconnection. Pool configuration can be customized via poolOpts.
func NewSaramaClientProviderReused ¶ added in v1.18.0
func NewSaramaClientProviderReused( brokers Brokers, opts ...SaramaConfigOptions, ) SaramaClientProvider
NewSaramaClientProviderReused creates a SaramaClientProvider that reuses a single client for all calls. The client is created lazily on the first call to Client(). The provided options will be applied when creating the client.
type SaramaClientProviderType ¶ added in v1.18.0
type SaramaClientProviderType string
SaramaClientProviderType defines the type of Sarama client provider to use.
const ( // SaramaClientProviderTypeReused creates a provider that reuses a single client for all calls. SaramaClientProviderTypeReused SaramaClientProviderType = "reused" // SaramaClientProviderTypeNew creates a provider that creates a new client for each call. SaramaClientProviderTypeNew SaramaClientProviderType = "new" // SaramaClientProviderTypePool creates a provider that uses a connection pool with health checks. SaramaClientProviderTypePool SaramaClientProviderType = "pool" )
func ParseSaramaClientProviderType ¶ added in v1.18.0
func ParseSaramaClientProviderType( ctx context.Context, value interface{}, ) (*SaramaClientProviderType, error)
ParseSaramaClientProviderType parses a value into a SaramaClientProviderType.
func (SaramaClientProviderType) Ptr ¶ added in v1.18.0
func (s SaramaClientProviderType) Ptr() *SaramaClientProviderType
Ptr returns a pointer to the SaramaClientProviderType.
func (SaramaClientProviderType) String ¶ added in v1.18.0
func (s SaramaClientProviderType) String() string
String returns the string representation of the SaramaClientProviderType.
type SaramaClientProviderTypes ¶ added in v1.18.0
type SaramaClientProviderTypes []SaramaClientProviderType
SaramaClientProviderTypes is a slice of SaramaClientProviderType values.
func (SaramaClientProviderTypes) Contains ¶ added in v1.18.0
func (s SaramaClientProviderTypes) Contains(providerType SaramaClientProviderType) bool
Contains checks if the given SaramaClientProviderType is in the slice.
type SaramaConfigOptions ¶
SaramaConfigOptions defines a function type for modifying Sarama configuration.
type SaramaConsumer ¶
SaramaConsumer provides an interface wrapper around sarama.Consumer for testing and abstraction purposes.
type SaramaPartitionConsumer ¶
type SaramaPartitionConsumer interface {
sarama.PartitionConsumer
}
SaramaPartitionConsumer represents a wrapper interface for sarama.PartitionConsumer.
type SaramaSyncProducer ¶
type SaramaSyncProducer interface {
sarama.SyncProducer
}
SaramaSyncProducer wraps the Sarama SyncProducer interface to enable dependency injection and testing.
type SyncProducer ¶
type SyncProducer interface {
// SendMessage sends a single message to Kafka and returns the partition and offset where it was stored.
SendMessage(
ctx context.Context,
msg *sarama.ProducerMessage,
) (partition int32, offset int64, err error)
// SendMessages sends multiple messages to Kafka in a single batch operation.
SendMessages(ctx context.Context, msgs []*sarama.ProducerMessage) error
// Close closes the producer and releases its resources.
Close() error
}
SyncProducer defines the interface for synchronously sending messages to Kafka.
func NewSyncProducer ¶
func NewSyncProducer( ctx context.Context, brokers Brokers, opts ...SaramaConfigOptions, ) (SyncProducer, error)
NewSyncProducer creates a new synchronous Kafka producer with the given brokers and configuration options.
func NewSyncProducerFromSaramaClient ¶ added in v1.19.0
func NewSyncProducerFromSaramaClient( ctx context.Context, saramaClient SaramaClient, ) (SyncProducer, error)
NewSyncProducerFromSaramaClient creates a new SyncProducer from an existing Sarama client. This is useful when you already have a configured Sarama client and want to create a producer from it.
func NewSyncProducerFromSaramaClientProvider ¶ added in v1.19.0
func NewSyncProducerFromSaramaClientProvider( ctx context.Context, saramaClientProvider SaramaClientProvider, ) (SyncProducer, error)
NewSyncProducerFromSaramaClientProvider creates a new SyncProducer using a SaramaClientProvider. It obtains a client from the provider and creates a sync producer from it. This enables flexible client lifecycle management strategies (reused, new per call, etc.).
func NewSyncProducerFromSaramaSyncProducer ¶
func NewSyncProducerFromSaramaSyncProducer(saramaSyncProducer sarama.SyncProducer) SyncProducer
NewSyncProducerFromSaramaSyncProducer creates a new SyncProducer wrapper around an existing Sarama SyncProducer.
func NewSyncProducerMetrics ¶
func NewSyncProducerMetrics( syncProducer SyncProducer, ) SyncProducer
NewSyncProducerMetrics creates a sync producer decorator that records metrics for all operations.
func NewSyncProducerModify ¶
func NewSyncProducerModify( syncProducer SyncProducer, fn func(ctx context.Context, message *sarama.ProducerMessage) error, ) SyncProducer
NewSyncProducerModify creates a sync producer that applies a modification function to messages before sending.
func NewSyncProducerNop ¶
func NewSyncProducerNop() SyncProducer
NewSyncProducerNop creates a no-operation sync producer that logs but doesn't actually send messages.
func NewSyncProducerWithHeader ¶
func NewSyncProducerWithHeader( ctx context.Context, brokers Brokers, headers Header, opts ...SaramaConfigOptions, ) (SyncProducer, error)
NewSyncProducerWithHeader creates a sync producer that adds specified headers to all messages.
func NewSyncProducerWithName ¶
func NewSyncProducerWithName( ctx context.Context, brokers Brokers, name string, opts ...SaramaConfigOptions, ) (SyncProducer, error)
NewSyncProducerWithName creates a sync producer that adds a 'name' header to all messages.
type Topic ¶
type Topic string
Topic represents a Kafka topic name.
func TopicFromStrings ¶
TopicFromStrings creates a valid Kafka topic name from multiple string values by joining and sanitizing them.
type TopicPartition ¶
TopicPartition represents a specific partition within a Kafka topic.
func (TopicPartition) Bytes ¶
func (p TopicPartition) Bytes() []byte
Bytes returns a byte representation of the TopicPartition in the format "topic-partition".
type Topics ¶
type Topics []Topic
Topics represents a collection of Kafka topics.
func ParseTopics ¶
ParseTopics converts a slice of strings into a Topics slice.
func ParseTopicsFromString ¶
ParseTopicsFromString parses a comma-separated string into a Topics slice.
func (Topics) Contains ¶
Contains returns true if the Topics collection contains the specified topic.
func (Topics) Interfaces ¶
func (t Topics) Interfaces() []interface{}
Interfaces converts the Topics slice to a slice of interface{} values.
type UpdaterHandler ¶
type UpdaterHandler[KEY ~[]byte | ~string, OBJECT any] interface { Update(ctx context.Context, key KEY, object OBJECT) error Delete(ctx context.Context, key KEY) error }
UpdaterHandler defines a generic interface for handling update and delete operations on objects identified by keys of type KEY.
func NewUpdaterHandlerFilter ¶
func NewUpdaterHandlerFilter[KEY ~[]byte | ~string, OBJECT any]( filter Filter[KEY, OBJECT], updateHandler UpdaterHandler[KEY, OBJECT], ) UpdaterHandler[KEY, OBJECT]
NewUpdaterHandlerFilter creates an updater handler that filters objects before updating.
func NewUpdaterHandlerSkipErrors ¶
func NewUpdaterHandlerSkipErrors[KEY ~[]byte | ~string, OBJECT any]( handler UpdaterHandler[KEY, OBJECT], logSamplerFactory log.SamplerFactory, ) UpdaterHandler[KEY, OBJECT]
NewUpdaterHandlerSkipErrors creates an updater handler that logs and skips errors.
func NewUpdaterHandlerTxUpdate ¶
func NewUpdaterHandlerTxUpdate[KEY ~[]byte | ~string, OBJECT any]( db libkv.DB, updaterHandlerTx UpdaterHandlerTx[KEY, OBJECT], ) UpdaterHandler[KEY, OBJECT]
NewUpdaterHandlerTxUpdate creates an updater handler that executes within a database update transaction.
func NewUpdaterHandlerTxView ¶
func NewUpdaterHandlerTxView[KEY ~[]byte | ~string, OBJECT any]( db libkv.DB, updaterHandlerTx UpdaterHandlerTx[KEY, OBJECT], ) UpdaterHandler[KEY, OBJECT]
NewUpdaterHandlerTxView creates a read-only updater handler that executes within a database view transaction.
func UpdaterHandlerFunc ¶
func UpdaterHandlerFunc[KEY ~[]byte | ~string, OBJECT any]( updateFn func(ctx context.Context, key KEY, object OBJECT) error, deleteFn func(ctx context.Context, key KEY) error, ) UpdaterHandler[KEY, OBJECT]
UpdaterHandlerFunc creates an UpdaterHandler from separate update and delete functions.
type UpdaterHandlerList ¶
type UpdaterHandlerList[KEY ~[]byte | ~string, OBJECT any] []UpdaterHandler[KEY, OBJECT]
UpdaterHandlerList is a list of UpdaterHandler that executes handlers sequentially.
type UpdaterHandlerTx ¶
type UpdaterHandlerTx[KEY ~[]byte | ~string, OBJECT any] interface { Update(ctx context.Context, tx libkv.Tx, key KEY, object OBJECT) error Delete(ctx context.Context, tx libkv.Tx, key KEY) error }
UpdaterHandlerTx defines the interface for handling CRUD operations within a transaction context.
func NewUpdaterHandlerTxFilter ¶
func NewUpdaterHandlerTxFilter[KEY ~[]byte | ~string, OBJECT any]( filterTx Filter[KEY, OBJECT], updateHandlerTx UpdaterHandlerTx[KEY, OBJECT], ) UpdaterHandlerTx[KEY, OBJECT]
NewUpdaterHandlerTxFilter creates a transaction updater handler that filters objects before updating.
func NewUpdaterHandlerTxSkipErrors ¶
func NewUpdaterHandlerTxSkipErrors[KEY ~[]byte | ~string, OBJECT any]( handler UpdaterHandlerTx[KEY, OBJECT], logSamplerFactory log.SamplerFactory, ) UpdaterHandlerTx[KEY, OBJECT]
NewUpdaterHandlerTxSkipErrors creates a transaction updater handler that logs and skips errors.
func UpdaterHandlerTxFunc ¶
func UpdaterHandlerTxFunc[KEY ~[]byte | ~string, OBJECT any]( updateFn func(ctx context.Context, tx libkv.Tx, key KEY, object OBJECT) error, deleteFn func(ctx context.Context, tx libkv.Tx, key KEY) error, ) UpdaterHandlerTx[KEY, OBJECT]
UpdaterHandlerTxFunc creates an UpdaterHandlerTx from separate updateFn and delete functions.
type UpdaterHandlerTxList ¶
type UpdaterHandlerTxList[KEY ~[]byte | ~string, OBJECT any] []UpdaterHandlerTx[KEY, OBJECT]
UpdaterHandlerTxList is a list of UpdaterHandlerTx that executes handlers sequentially within a transaction.
type Value ¶
type Value interface {
validation.HasValidation
}
Value represents a Kafka message value that supports validation.
Source Files
¶
- doc.go
- kafka_batch-size.go
- kafka_broken-pipe-error.go
- kafka_broker-schema.go
- kafka_broker.go
- kafka_brokers.go
- kafka_consumer-error-handler.go
- kafka_consumer-group-offset.go
- kafka_consumer-offset-highwater-marks.go
- kafka_consumer-offset.go
- kafka_consumer-partition.go
- kafka_consumer-simple.go
- kafka_consumer.go
- kafka_group.go
- kafka_gzip-decoder.go
- kafka_gzip-encoder.go
- kafka_header.go
- kafka_highwatermarks.go
- kafka_json-encoder.go
- kafka_json-sender.go
- kafka_message-handler-batch-delay.go
- kafka_message-handler-batch-func.go
- kafka_message-handler-batch-list.go
- kafka_message-handler-batch-metrics.go
- kafka_message-handler-batch-skip.go
- kafka_message-handler-batch-tx-func.go
- kafka_message-handler-batch-tx-list.go
- kafka_message-handler-batch-tx-metrics.go
- kafka_message-handler-batch-tx-skip.go
- kafka_message-handler-batch-tx.go
- kafka_message-handler-batch.go
- kafka_message-handler-func.go
- kafka_message-handler-list.go
- kafka_message-handler-metrics.go
- kafka_message-handler-offset-trigger.go
- kafka_message-handler-skip.go
- kafka_message-handler-tx-func.go
- kafka_message-handler-tx-list.go
- kafka_message-handler-tx-metrics.go
- kafka_message-handler-tx-skip.go
- kafka_message-handler-tx-update.go
- kafka_message-handler-tx.go
- kafka_message-handler-update.go
- kafka_message-handler.go
- kafka_metadata.go
- kafka_metrics.go
- kafka_offset-manager-handler.go
- kafka_offset-manager-sarama.go
- kafka_offset-manager-simple.go
- kafka_offset-manager-store.go
- kafka_offset-manager.go
- kafka_offset-store.go
- kafka_offset.go
- kafka_partition-offsets.go
- kafka_partition.go
- kafka_sarama-client-creator.go
- kafka_sarama-client-pool.go
- kafka_sarama-client-provider-existing.go
- kafka_sarama-client-provider-new.go
- kafka_sarama-client-provider-pool.go
- kafka_sarama-client-provider-reused.go
- kafka_sarama-client-provider-type.go
- kafka_sarama-client-provider.go
- kafka_sarama-config.go
- kafka_sarama-consumer.go
- kafka_sarama-partition-consumer.go
- kafka_sarama-sync-producer.go
- kafka_sync-producer-header.go
- kafka_sync-producer-metrics.go
- kafka_sync-producer-modify.go
- kafka_sync-producer-nop.go
- kafka_sync-producer.go
- kafka_tls-config.go
- kafka_topic.go
- kafka_topics.go
- kafka_update-handler-filter.go
- kafka_update-handler-func.go
- kafka_update-handler-list.go
- kafka_update-handler-skip.go
- kafka_update-handler-tx-filter.go
- kafka_update-handler-tx-func.go
- kafka_update-handler-tx-list.go
- kafka_update-handler-tx-skip.go
- kafka_update-handler-tx.go
- kafka_update-handler.go