Versions in this module Expand all Collapse all v1 v1.1.0 Mar 15, 2026 v1.0.1 Nov 22, 2025 Changes in this version + var ErrAllBrokersFailed = errors.New("all brokers failed") + var ErrAuthenticationFailed = errors.New("authentication failed") + var ErrBatchFull = errors.New("batch is full") + var ErrClientClosed = errors.New("client is closed") + var ErrConnectionClosed = errors.New("connection closed") + var ErrConnectionPoolClosed = errors.New("connection pool closed") + var ErrConsumerClosed = errors.New("consumer is closed") + var ErrInvalidMaxConnections = errors.New("invalid max connections value") + var ErrInvalidOffset = errors.New("invalid offset") + var ErrInvalidPartition = errors.New("invalid partition") + var ErrInvalidResponse = errors.New("invalid response") + var ErrInvalidRetries = errors.New("invalid retry count") + var ErrInvalidSASLConfig = errors.New("invalid SASL configuration") + var ErrInvalidTLSConfig = errors.New("invalid TLS configuration") + var ErrInvalidTimeout = errors.New("invalid timeout value") + var ErrInvalidTopic = errors.New("invalid topic name") + var ErrMessageTooLarge = errors.New("message too large") + var ErrNoBrokers = errors.New("no brokers specified") + var ErrNoConnection = errors.New("no connection available") + var ErrNoMessages = errors.New("no messages available") + var ErrProducerClosed = errors.New("producer is closed") + var ErrRequestFailed = errors.New("request failed") + var ErrRequestTimeout = errors.New("request timeout") + var ErrTLSHandshakeFailed = errors.New("TLS handshake failed") + type BrokerStats struct + Active int + Idle int + Total int + type Client struct + func New(config *Config) (*Client, error) + func (c *Client) Close() error + func (c *Client) CreateTopic(topic string, numPartitions uint32, replicationFactor uint16) error + func (c *Client) DeleteTopic(topic string) error + func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error) + func (c *Client) HealthCheck(broker string) error + func (c *Client) ListTopics() ([]string, error) + func (c *Client) Stats() ClientStats + type ClientStats struct + BytesRead int64 + BytesWritten int64 + PoolStats PoolStats + RequestsFailed int64 + RequestsSent int64 + Uptime time.Duration + type Config struct + Brokers []string + ConnectTimeout time.Duration + ConsumerConfig ConsumerConfig + KeepAlive bool + KeepAlivePeriod time.Duration + MaxConnectionsPerBroker int + MaxRetries int + ProducerConfig ProducerConfig + ReadTimeout time.Duration + RequestTimeout time.Duration + RetryBackoff time.Duration + RetryMaxDelay time.Duration + Security *SecurityConfig + WriteTimeout time.Duration + func DefaultConfig() *Config + func (c *Config) Validate() error + type ConnectionPool struct + func NewConnectionPool(config *Config) *ConnectionPool + func (p *ConnectionPool) Close() error + func (p *ConnectionPool) Get(broker string) (*connection, error) + func (p *ConnectionPool) Put(conn *connection) + func (p *ConnectionPool) Remove(conn *connection) error + func (p *ConnectionPool) Stats() PoolStats + type Consumer struct + func NewConsumer(client *Client, topic string, partitionID uint32) *Consumer + func NewConsumerWithConfig(client *Client, topic string, partitionID uint32, config ConsumerConfig) *Consumer + func (c *Consumer) Close() error + func (c *Consumer) CurrentOffset() int64 + func (c *Consumer) Fetch() ([]protocol.Message, error) + func (c *Consumer) FetchN(maxMessages int) ([]protocol.Message, error) + func (c *Consumer) FetchOne() (*protocol.Message, error) + func (c *Consumer) GetEndOffset() (int64, error) + func (c *Consumer) Partition() uint32 + func (c *Consumer) Poll(interval time.Duration, handler func([]protocol.Message) error) error + func (c *Consumer) Seek(offset int64) error + func (c *Consumer) SeekToBeginning() error + func (c *Consumer) SeekToEnd() error + func (c *Consumer) Stats() ConsumerStats + func (c *Consumer) Topic() string + type ConsumerConfig struct + AutoCommitInterval time.Duration + GroupID string + MaxFetchBytes uint32 + MaxWaitTime time.Duration + MinFetchBytes uint32 + StartOffset int64 + type ConsumerMetrics struct + BytesRead int64 + FetchCount int64 + MessagesRead int64 + type ConsumerRecord struct + Message protocol.Message + Offset int64 + Partition int32 + Topic string + type ConsumerState int + const StateJoining + const StateRebalancing + const StateStable + const StateUnjoined + type ConsumerStats struct + BytesRead int64 + CurrentOffset int64 + FetchCount int64 + MessagesRead int64 + PartitionID uint32 + Topic string + type DefaultRebalanceListener struct + func (l *DefaultRebalanceListener) OnPartitionsAssigned(partitions map[string][]int32) + func (l *DefaultRebalanceListener) OnPartitionsRevoked(partitions map[string][]int32) + type FetchRequest struct + MaxBytes int32 + Offset int64 + Partition int32 + Topic string + type FetchResponse struct + HighWaterMark int64 + Messages []protocol.Message + Partition int32 + Topic string + type GroupConsumer struct + func NewGroupConsumer(client *Client, config GroupConsumerConfig) (*GroupConsumer, error) + func (gc *GroupConsumer) Assignment() map[string][]int32 + func (gc *GroupConsumer) Close() error + func (gc *GroupConsumer) CommitSync(ctx context.Context, offsets map[string]map[int32]int64) error + func (gc *GroupConsumer) Poll(ctx context.Context) (map[string]map[int32][]protocol.Message, error) + func (gc *GroupConsumer) SetRebalanceListener(listener RebalanceListener) + func (gc *GroupConsumer) Stats() GroupConsumerStats + func (gc *GroupConsumer) Subscribe(ctx context.Context) error + type GroupConsumerConfig struct + AssignmentStrategy string + AutoCommit bool + AutoCommitIntervalMs int32 + ClientID string + GroupID string + HeartbeatIntervalMs int32 + RebalanceTimeoutMs int32 + SessionTimeoutMs int32 + Topics []string + func DefaultGroupConsumerConfig() GroupConsumerConfig + type GroupConsumerStats struct + GroupID string + MemberID string + MessagesRead int64 + OffsetsCommitted int64 + RebalanceCount int64 + State ConsumerState + type IsolationLevel int + const ReadCommitted + const ReadUncommitted + type PartitionConsumer struct + func NewPartitionConsumer(client *Client, topic string, partitions []uint32) *PartitionConsumer + func NewPartitionConsumerWithConfig(client *Client, topic string, partitions []uint32, config ConsumerConfig) *PartitionConsumer + func (pc *PartitionConsumer) Close() error + func (pc *PartitionConsumer) FetchAll() (map[uint32][]protocol.Message, error) + func (pc *PartitionConsumer) FetchFromPartition(partitionID uint32) ([]protocol.Message, error) + func (pc *PartitionConsumer) FetchRoundRobin() ([]protocol.Message, error) + func (pc *PartitionConsumer) GetOffsets() map[uint32]int64 + func (pc *PartitionConsumer) GetPartitionInfo(partitionID uint32) (PartitionInfo, error) + func (pc *PartitionConsumer) Metrics() ConsumerMetrics + func (pc *PartitionConsumer) PollPartitions(ctx context.Context, interval time.Duration, ...) error + func (pc *PartitionConsumer) SeekAll(offset int64) error + func (pc *PartitionConsumer) SeekPartition(partitionID uint32, offset int64) error + type PartitionInfo struct + HighWater int64 + Lag int64 + LastFetch time.Time + Offset int64 + PartitionID uint32 + type PendingMessage struct + Message protocol.Message + Partition int32 + Topic string + type PoolStats struct + ActiveConnections int + BrokerStats map[string]BrokerStats + IdleConnections int + TotalConnections int + type Producer struct + func NewProducer(client *Client) *Producer + func NewProducerWithConfig(client *Client, config ProducerConfig) *Producer + func (p *Producer) Close() error + func (p *Producer) Flush(topic string) error + func (p *Producer) FlushAll() error + func (p *Producer) Send(topic string, key, value []byte) error + func (p *Producer) SendMessages(topic string, messages []protocol.Message) error + func (p *Producer) SendMessagesToPartition(topic string, partitionID uint32, messages []protocol.Message) error + func (p *Producer) SendToPartition(topic string, partitionID uint32, key, value []byte) error + func (p *Producer) Stats() ProducerStats + type ProducerConfig struct + BatchSize int + BatchTimeout time.Duration + Compression string + MaxInFlightRequests int + RequireAck bool + type ProducerState int + const ProducerStateAborting + const ProducerStateClosed + const ProducerStateCommitting + const ProducerStateFenced + const ProducerStateInTransaction + const ProducerStateReady + const ProducerStateUninitialized + type ProducerStats struct + BatchesSent int64 + BytesSent int64 + MessagesFailed int64 + MessagesSent int64 + type RebalanceListener interface + OnPartitionsAssigned func(partitions map[string][]int32) + OnPartitionsRevoked func(partitions map[string][]int32) + type SASLConfig struct + Enabled bool + Mechanism string + Password string + Username string + type SecurityConfig struct + APIKey string + SASL *SASLConfig + TLS *TLSConfig + func (s *SecurityConfig) Validate() error + type TLSConfig struct + CAFile string + CertFile string + Enabled bool + InsecureSkipVerify bool + KeyFile string + ServerName string + type Transaction struct + ID transaction.TransactionID + Messages []PendingMessage + Partitions map[string][]int32 + StartTime time.Time + type TransactionalConsumer struct + func NewTransactionalConsumer(config *TransactionalConsumerConfig) (*TransactionalConsumer, error) + func (tc *TransactionalConsumer) Close() error + func (tc *TransactionalConsumer) Commit(ctx context.Context) error + func (tc *TransactionalConsumer) CommitSync(ctx context.Context, offsets map[string]map[int32]int64) error + func (tc *TransactionalConsumer) Committed(topic string, partition int32) (int64, error) + func (tc *TransactionalConsumer) MarkTransactionAborted(txnID transaction.TransactionID) + func (tc *TransactionalConsumer) Poll(ctx context.Context) ([]ConsumerRecord, error) + func (tc *TransactionalConsumer) Position(topic string, partition int32) (int64, error) + func (tc *TransactionalConsumer) Seek(topic string, partition int32, offset int64) error + func (tc *TransactionalConsumer) Stats() TransactionalConsumerStats + func (tc *TransactionalConsumer) UpdateLastStableOffset(topic string, partition int32, lso int64) + type TransactionalConsumerConfig struct + AutoCommitEnabled bool + Client *Client + GroupID string + IsolationLevel IsolationLevel + MaxPollRecords int + Topics []string + func DefaultTransactionalConsumerConfig() *TransactionalConsumerConfig + type TransactionalConsumerStats struct + AbortedTransactions int + IsolationLevel IsolationLevel + MessagesConsumed int64 + MessagesFiltered int64 + type TransactionalProducer struct + func NewTransactionalProducer(client *Client, config TransactionalProducerConfig) (*TransactionalProducer, error) + func (tp *TransactionalProducer) AbortTransaction(ctx context.Context) error + func (tp *TransactionalProducer) BeginTransaction(ctx context.Context) error + func (tp *TransactionalProducer) Close() error + func (tp *TransactionalProducer) CommitTransaction(ctx context.Context) error + func (tp *TransactionalProducer) Send(ctx context.Context, topic string, partition int32, message protocol.Message) error + func (tp *TransactionalProducer) SendOffsetsToTransaction(ctx context.Context, groupID string, offsets map[string]map[int32]int64) error + func (tp *TransactionalProducer) Stats() TransactionalProducerStats + type TransactionalProducerConfig struct + RequestTimeout time.Duration + TransactionID string + TransactionTimeout time.Duration + func DefaultTransactionalProducerConfig() TransactionalProducerConfig + type TransactionalProducerStats struct + MessagesProduced int64 + ProducerEpoch transaction.ProducerEpoch + ProducerID transaction.ProducerID + State ProducerState + TransactionsAborted int64 + TransactionsCommitted int64