Versions in this module Expand all Collapse all v1 v1.2.0 Feb 20, 2026 v1.1.0 Feb 18, 2026 Changes in this version + const DefaultBatchSize + const DefaultBatchTimeout + const DefaultCommitInterval + const DefaultMaxAttempts + const DefaultMaxBytes + const DefaultMaxWait + const DefaultMinBytes + const DefaultPartition + const DefaultRebalanceTimeout + const DefaultRequiredAcks + const DefaultStartOffset + const DefaultWriteTimeout + const FirstOffset + const LastOffset + const RequireAll + const RequireNone + const RequireOne + var ErrAuthenticationFailed = errors.New("authentication failed") + var ErrAuthorizationFailed = errors.New("authorization failed") + var ErrBrokerNotAvailable = errors.New("broker not available") + var ErrConnectionFailed = errors.New("connection failed") + var ErrConnectionLost = errors.New("connection lost") + var ErrContextCanceled = errors.New("context canceled") + var ErrContextDeadlineExceeded = errors.New("context deadline exceeded") + var ErrDuplicateSequence = errors.New("duplicate sequence") + var ErrGroupCoordinatorNotAvailable = errors.New("group coordinator not available") + var ErrGroupNotFound = errors.New("consumer group not found") + var ErrInvalidCommitOffset = errors.New("invalid commit offset") + var ErrInvalidConfig = errors.New("invalid config") + var ErrInvalidCredentials = errors.New("invalid credentials") + var ErrInvalidFetchSize = errors.New("invalid fetch size") + var ErrInvalidGroupID = errors.New("invalid group id") + var ErrInvalidMessage = errors.New("invalid message") + var ErrInvalidMessageSize = errors.New("invalid message size") + var ErrInvalidPartition = errors.New("invalid partition") + var ErrInvalidProducerEpoch = errors.New("invalid producer epoch") + var ErrInvalidProducerIDMapping = errors.New("invalid producer id mapping") + var ErrInvalidReplica = errors.New("invalid replica") + var ErrInvalidReplicationFactor = errors.New("invalid replication factor") + var ErrInvalidRequest = errors.New("invalid request") + var ErrInvalidSessionTimeout = errors.New("invalid session timeout") + var ErrInvalidTransaction = errors.New("invalid transaction") + var ErrInvalidTransactionState = errors.New("invalid transaction state") + var ErrInvalidTxnState = errors.New("invalid transaction state") + var ErrLeaderNotAvailable = errors.New("leader not available") + var ErrMessageTooLarge = errors.New("message too large") + var ErrNetworkError = errors.New("network error") + var ErrNotController = errors.New("not controller") + var ErrNotGroupCoordinator = errors.New("not group coordinator") + var ErrNotLeaderForPartition = errors.New("not leader for partition") + var ErrOffsetOutOfRange = errors.New("offset out of range") + var ErrOutOfOrderSequence = errors.New("out of order sequence") + var ErrPartitionNotFound = errors.New("partition not found") + var ErrPolicyViolation = errors.New("policy violation") + var ErrProducerFenced = errors.New("producer fenced") + var ErrReaderNotInitialized = errors.New("reader not initialized") + var ErrRebalanceInProgress = errors.New("rebalance in progress") + var ErrReplicaNotAvailable = errors.New("replica not available") + var ErrReplicaNotAvailableError = errors.New("replica not available") + var ErrRequestTimedOut = errors.New("request timed out") + var ErrTooManyInFlightRequests = errors.New("too many in-flight requests") + var ErrTopicAlreadyExists = errors.New("topic already exists") + var ErrTopicNotFound = errors.New("topic not found") + var ErrTransactionCoordinatorFenced = errors.New("transaction coordinator fenced") + var ErrUnknownError = errors.New("unknown error") + var ErrUnknownMemberID = errors.New("unknown member id") + var ErrUnsupportedForMessageFormat = errors.New("unsupported for message format") + var ErrUnsupportedVersion = errors.New("unsupported version") + var ErrWriterNotInitialized = errors.New("writer not initialized") + var FXModule = fx.Module("kafka", fx.Provide(NewClientWithDI, ...), fx.Invoke(RegisterKafkaLifecycle)) + func RegisterKafkaLifecycle(params KafkaLifecycleParams) + func ValidateDataType(dataType string, hasSerializer, hasDeserializer bool) error + type AvroDeserializer struct + UnmarshalFunc func([]byte, interface{}) error + func (a *AvroDeserializer) Deserialize(data []byte, target interface{}) error + type AvroSerializer struct + MarshalFunc func(interface{}) ([]byte, error) + func (a *AvroSerializer) Serialize(data interface{}) ([]byte, error) + type Client interface + Consume func(ctx context.Context, wg *sync.WaitGroup) <-chan Message + ConsumeParallel func(ctx context.Context, wg *sync.WaitGroup, numWorkers int) <-chan Message + Deserialize func(msg Message, target interface{}) error + GracefulShutdown func() + IsAuthenticationError func(err error) bool + IsPermanentError func(err error) bool + IsRetryableError func(err error) bool + IsTemporaryError func(err error) bool + Publish func(ctx context.Context, key string, data interface{}, ...) error + SetDefaultSerializers func() + SetDeserializer func(d Deserializer) + SetSerializer func(s Serializer) + TranslateError func(err error) error + type Config struct + AllowAutoTopicCreation bool + Async bool + BatchSize int + BatchTimeout time.Duration + Brokers []string + CommitInterval time.Duration + CompressionCodec string + DataType string + EnableAutoCommit bool + EnableAutoOffsetStore bool + GroupID string + IsConsumer bool + MaxAttempts int + MaxBytes int + MaxWait time.Duration + MinBytes int + Partition int + RequiredAcks int + SASL SASLConfig + StartOffset int64 + TLS TLSConfig + Topic string + WriteTimeout time.Duration + type ConsumerMessage struct + func (cm *ConsumerMessage) Body() []byte + func (cm *ConsumerMessage) BodyAs(target interface{}) error + func (cm *ConsumerMessage) CommitMsg() error + func (cm *ConsumerMessage) Header() map[string]interface{} + func (cm *ConsumerMessage) Key() string + func (cm *ConsumerMessage) Offset() int64 + func (cm *ConsumerMessage) Partition() int + type Deserializer interface + Deserialize func(data []byte, target interface{}) error + type GobDeserializer struct + func (g *GobDeserializer) Deserialize(data []byte, target interface{}) error + type GobSerializer struct + func (g *GobSerializer) Serialize(data interface{}) ([]byte, error) + type JSONDeserializer struct + func (j *JSONDeserializer) Deserialize(data []byte, target interface{}) error + type JSONSerializer struct + func (j *JSONSerializer) Serialize(data interface{}) ([]byte, error) + type KafkaClient struct + func NewClient(cfg Config) (*KafkaClient, error) + func NewClientWithDI(params KafkaParams) (*KafkaClient, error) + func (k *KafkaClient) Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message + func (k *KafkaClient) ConsumeParallel(ctx context.Context, wg *sync.WaitGroup, numWorkers int) <-chan Message + func (k *KafkaClient) Deserialize(msg Message, target interface{}) error + func (k *KafkaClient) GracefulShutdown() + func (k *KafkaClient) IsAuthenticationError(err error) bool + func (k *KafkaClient) IsPermanentError(err error) bool + func (k *KafkaClient) IsRetryableError(err error) bool + func (k *KafkaClient) IsTemporaryError(err error) bool + func (k *KafkaClient) Publish(ctx context.Context, key string, data interface{}, ...) error + func (k *KafkaClient) SetDefaultSerializers() + func (k *KafkaClient) SetDeserializer(d Deserializer) + func (k *KafkaClient) SetSerializer(s Serializer) + func (k *KafkaClient) TranslateError(err error) error + func (k *KafkaClient) WithDeserializer(deserializer Deserializer) *KafkaClient + func (k *KafkaClient) WithLogger(logger Logger) *KafkaClient + func (k *KafkaClient) WithObserver(observer observability.Observer) *KafkaClient + func (k *KafkaClient) WithSerializer(serializer Serializer) *KafkaClient + type KafkaLifecycleParams struct + Client *KafkaClient + Lifecycle fx.Lifecycle + type KafkaParams struct + Config Config + Deserializer Deserializer + Logger Logger + Observer observability.Observer + Serializer Serializer + type Logger interface + ErrorWithContext func(ctx context.Context, msg string, err error, fields ...map[string]interface{}) + InfoWithContext func(ctx context.Context, msg string, err error, fields ...map[string]interface{}) + WarnWithContext func(ctx context.Context, msg string, err error, fields ...map[string]interface{}) + type Message interface + Body func() []byte + BodyAs func(target interface{}) error + CommitMsg func() error + Header func() map[string]interface{} + Key func() string + Offset func() int64 + Partition func() int + type MultiFormatDeserializer struct + DefaultFormat string + Deserializers map[string]Deserializer + func NewMultiFormatDeserializer() *MultiFormatDeserializer + func (m *MultiFormatDeserializer) Deserialize(data []byte, target interface{}) error + func (m *MultiFormatDeserializer) DeserializeWithFormat(format string, data []byte, target interface{}) error + func (m *MultiFormatDeserializer) RegisterDeserializer(format string, deserializer Deserializer) + type MultiFormatSerializer struct + DefaultFormat string + Serializers map[string]Serializer + func NewMultiFormatSerializer() *MultiFormatSerializer + func (m *MultiFormatSerializer) RegisterSerializer(format string, serializer Serializer) + func (m *MultiFormatSerializer) Serialize(data interface{}) ([]byte, error) + func (m *MultiFormatSerializer) SerializeWithFormat(format string, data interface{}) ([]byte, error) + type NoOpDeserializer struct + func (n *NoOpDeserializer) Deserialize(data []byte, target interface{}) error + type NoOpSerializer struct + func (n *NoOpSerializer) Serialize(data interface{}) ([]byte, error) + type ProtoMarshaler interface + Marshal func() ([]byte, error) + type ProtoMessage interface + ProtoReflect func() interface{} + Reset func() + String func() string + type ProtoUnmarshaler interface + Unmarshal func([]byte) error + type ProtobufDeserializer struct + UnmarshalFunc func([]byte, interface{}) error + func (p *ProtobufDeserializer) Deserialize(data []byte, target interface{}) error + type ProtobufSerializer struct + MarshalFunc func(interface{}) ([]byte, error) + func (p *ProtobufSerializer) Serialize(data interface{}) ([]byte, error) + type SASLConfig struct + Enabled bool + Mechanism string + Password string + Username string + type Serializer interface + Serialize func(data interface{}) ([]byte, error) + type StringDeserializer struct + func (s *StringDeserializer) Deserialize(data []byte, target interface{}) error + type StringSerializer struct + Encoding string + func (s *StringSerializer) Serialize(data interface{}) ([]byte, error) + type TLSConfig struct + CACertPath string + ClientCertPath string + ClientKeyPath string + Enabled bool + InsecureSkipVerify bool