Documentation
¶
Overview ¶
nolint:ireturn,lll,unparam,gosec
Package kafkax provides a high-level abstraction over Apache Kafka for publishing and consuming messages. It builds on top of the franz-go client library and includes built-in support for metrics, logging, middlewares, and graceful shutdown.
The package supports both synchronous publishing and concurrent consuming with configurable concurrency levels, automatic offset committing, and middleware chains for cross-cutting concerns like logging, metrics, and request IDs.
Index ¶
- Constants
- func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware
- func ConsumerRequestId() consumer.Middleware
- func GetHeaderValue(headers []kgo.RecordHeader, key string) string
- func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync
- func PlainAuth(auth *Auth) sasl.Mechanism
- func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware
- func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
- func PublisherRequestId() publisher.Middleware
- func PublisherRetry(retrier Retrier) publisher.Middleware
- func ScramAuth(auth *Auth) (sasl.Mechanism, error)
- type Auth
- type Client
- type Config
- type ConfigOption
- type ConsumerConfig
- type LogObserver
- type Logger
- type NoopObserver
- type Observer
- type PublisherConfig
- func (p PublisherConfig) DefaultPublisher(logCtx context.Context, logger log.Logger, ...) *publisher.Publisher
- func (p PublisherConfig) GetBatchSizePerPartition() int
- func (p PublisherConfig) GetBatchTimeoutPerPartition() time.Duration
- func (p PublisherConfig) GetDialTimeout() time.Duration
- func (p PublisherConfig) GetMaxMessageSizePerPartition() int32
- func (p PublisherConfig) GetRequiredAckLevel() kgo.Acks
- func (p PublisherConfig) GetWriteTimeout() time.Duration
- type PublisherMetricStorage
- type Retrier
- type TLS
Constants ¶
const ( AuthTypePlain = "SASL/PLAIN" AuthTypeSCRAM = "SASL/SCRAM" ScramTypeSHA256 = "SHA256" ScramTypeSHA512 = "SHA512" )
Variables ¶
This section is empty.
Functions ¶
func ConsumerLog ¶
func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware
ConsumerLog creates a middleware that logs consume operations. When logBody is true, the message body is included in the log output.
func ConsumerRequestId ¶
func ConsumerRequestId() consumer.Middleware
ConsumerRequestId creates a middleware that extracts request IDs from Kafka message headers and adds them to the context. If no request ID is found, a new one is generated.
func GetHeaderValue ¶
func GetHeaderValue(headers []kgo.RecordHeader, key string) string
GetHeaderValue retrieves the value of a header with the specified key from the provided headers slice. Returns an empty string if the key is not found.
func NewResultHandler ¶
NewResultHandler creates a new synchronous message handler with default middlewares including logging, metrics, and panic recovery. The provided adapter implements the business logic for handling messages.
func PlainAuth ¶
PlainAuth creates a SASL/PLAIN authentication mechanism from the provided credentials. Returns nil if auth is nil.
func PublisherLog ¶
func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware
PublisherLog creates a middleware that logs publish operations. When logBody is true, the message body is included in the log output.
func PublisherMetrics ¶
func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
PublisherMetrics creates a middleware that records metrics for message publishing operations, including duration, message size, and error counts.
func PublisherRequestId ¶
func PublisherRequestId() publisher.Middleware
PublisherRequestId creates a middleware that propagates request IDs to Kafka message headers. If no request ID is present in the context, a new one is generated.
func PublisherRetry ¶ added in v1.50.0
func PublisherRetry(retrier Retrier) publisher.Middleware
PublisherRetry creates a middleware for retrying message publications. It is recommended to use this middleware after logging, to avoid duplicate logging of publication attempts.
Types ¶
type Auth ¶
type Auth struct {
Mechanism *string `` /* 173-byte string literal not displayed */
ScramType *string `` /* 187-byte string literal not displayed */
Username string `validate:"required" schema:"Логин"`
Password string `validate:"required" schema:"Пароль"`
}
Auth holds authentication configuration for connecting to Kafka.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client manages the lifecycle of Kafka publishers and consumers. It supports dynamic reconfiguration through UpgradeAndServe and provides health checking and graceful shutdown capabilities.
Client is safe for concurrent use.
func (*Client) Close ¶
func (c *Client) Close()
Close gracefully shuts down the client and releases all resources. It is safe to call Close multiple times.
func (*Client) Healthcheck ¶
Healthcheck verifies the health of all consumers and publishers. Returns an error if any component is unhealthy.
func (*Client) Shutdown ¶ added in v1.39.3
func (c *Client) Shutdown()
Shutdown gracefully stops all consumers and publishers, waiting for pending operations to complete.
func (*Client) UpgradeAndServe ¶
UpgradeAndServe initializes or reinitializes Kafka publishers and consumers based on the provided configuration. If the new configuration is identical to the previous one, initialization is skipped. Existing consumers and publishers are gracefully shut down before new ones are started.
type Config ¶
Config holds the configuration for Kafka publishers and consumers.
func NewConfig ¶
func NewConfig(opts ...ConfigOption) Config
NewConfig creates a new Config instance using the provided options.
type ConfigOption ¶
type ConfigOption func(c *Config)
ConfigOption is a function that configures a Config instance.
func WithConsumers ¶
func WithConsumers(consumers ...consumer.Consumer) ConfigOption
WithConsumers sets the consumers for the Config.
func WithPublishers ¶
func WithPublishers(publishers ...*publisher.Publisher) ConfigOption
WithPublishers sets the publishers for the Config.
type ConsumerConfig ¶
type ConsumerConfig struct {
Addresses []string `validate:"required" schema:"Список адресов брокеров для чтения сообщений"`
Topic string `validate:"required" schema:"Топик"`
GroupId string `validate:"required" schema:"Идентификатор консьюмера"`
Concurrency int `schema:"Кол-во обработчиков, по умолчанию 1"`
MaxBatchSizeMb int32 `` /* 133-byte string literal not displayed */
CommitIntervalSec *int `` /* 141-byte string literal not displayed */
Auth *Auth `schema:"Параметры аутентификации"`
TLS *TLS `schema:"Данные для установки TLS-соединения"`
DialTimeoutMs *int `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
MetricConsumerId *string `` /* 150-byte string literal not displayed */
}
ConsumerConfig holds configuration for a Kafka consumer.
func (ConsumerConfig) DefaultConsumer ¶
func (c ConsumerConfig) DefaultConsumer( logCtx context.Context, logger log.Logger, handler consumer.Handler, restMiddlewares ...consumer.Middleware, ) consumer.Consumer
DefaultConsumer creates a new consumer with default configuration, including built-in request ID middleware and log observer. Additional middlewares can be provided via restMiddlewares.
func (ConsumerConfig) GetCommitInterval ¶ added in v1.37.1
func (c ConsumerConfig) GetCommitInterval() time.Duration
GetCommitInterval returns the auto-commit interval duration. Returns 1 second by default if not configured.
func (ConsumerConfig) GetDialTimeout ¶ added in v1.41.0
func (c ConsumerConfig) GetDialTimeout() time.Duration
GetDialTimeout returns the dial timeout duration. Returns 5 seconds by default if not configured.
func (ConsumerConfig) GetMaxBatchSizeMb ¶ added in v1.37.1
func (c ConsumerConfig) GetMaxBatchSizeMb() int32
GetMaxBatchSizeMb returns the maximum batch size in MB. Returns 64MB by default if not configured or set to a non-positive value.
type LogObserver ¶
type LogObserver struct {
NoopObserver
// contains filtered or unexported fields
}
LogObserver is an Observer implementation that logs lifecycle events to the provided logger.
func NewLogObserver ¶
func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver
NewLogObserver creates a new LogObserver with the provided context and logger.
func (LogObserver) ClientError ¶
func (l LogObserver) ClientError(err error)
ClientError logs an unexpected client error.
func (LogObserver) ClientReady ¶
func (l LogObserver) ClientReady()
ClientReady logs that the Kafka client has successfully connected.
func (LogObserver) ShutdownDone ¶
func (l LogObserver) ShutdownDone()
ShutdownDone logs that the shutdown process has completed.
func (LogObserver) ShutdownStarted ¶
func (l LogObserver) ShutdownStarted()
ShutdownStarted logs that the shutdown process has begun.
type Logger ¶ added in v1.64.1
type Logger struct {
// contains filtered or unexported fields
}
Logger is an adapter that bridges franz-go's logging interface to the application's logger. It prefixes log messages with a configurable name and maps franz-go log levels to the application's log levels.
func NewLogger ¶ added in v1.64.1
NewLogger creates a new Logger instance with the provided configuration.
type NoopObserver ¶
type NoopObserver struct{}
NoopObserver is a no-op implementation of the Observer interface that ignores all events.
func (NoopObserver) ClientError ¶
func (n NoopObserver) ClientError(err error)
ClientError does nothing.
func (NoopObserver) ShutdownStarted ¶
func (n NoopObserver) ShutdownStarted()
ShutdownStarted does nothing.
type Observer ¶
type Observer interface {
ClientReady()
ClientError(err error)
ShutdownStarted()
ShutdownDone()
}
Observer defines an interface for receiving lifecycle events from the Kafka client, such as connection status and shutdown notifications.
type PublisherConfig ¶
type PublisherConfig struct {
Addresses []string `validate:"required" schema:"Список адресов брокеров для отправки сообщений"`
Topic string `` /* 160-byte string literal not displayed */
MaxMsgSizeMbPerPartition int32 `schema:"Максимальный размер сообщений в Мб, по умолчанию 64 Мб"`
BatchSizePerPartition int `` /* 142-byte string literal not displayed */
BatchTimeoutPerPartitionMs *int `schema:"Периодичность записи батчей в кафку в мс, по умолчанию 500 мс"`
WriteTimeoutSec *int `schema:"Таймаут отправки сообщений, по умолчанию 10 секунд"`
RequiredAckLevel int `` /* 179-byte string literal not displayed */
Auth *Auth `schema:"Параметры аутентификации"`
TLS *TLS `schema:"Данные для установки TLS-соединения"`
DialTimeoutMs *int `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
MetricPublisherId *string `` /* 148-byte string literal not displayed */
}
PublisherConfig holds configuration for a Kafka publisher.
func (PublisherConfig) DefaultPublisher ¶
func (p PublisherConfig) DefaultPublisher( logCtx context.Context, logger log.Logger, restMiddlewares ...publisher.Middleware, ) *publisher.Publisher
DefaultPublisher creates a new publisher with default configuration, including built-in metrics and request ID middlewares. Additional middlewares can be provided via restMiddlewares.
func (PublisherConfig) GetBatchSizePerPartition ¶ added in v1.38.1
func (p PublisherConfig) GetBatchSizePerPartition() int
GetBatchSizePerPartition returns the batch size per partition. Returns 10 by default if not configured or set to a non-positive value.
func (PublisherConfig) GetBatchTimeoutPerPartition ¶ added in v1.38.1
func (p PublisherConfig) GetBatchTimeoutPerPartition() time.Duration
GetBatchTimeoutPerPartition returns the batch timeout duration per partition. Returns 500ms by default if not configured.
func (PublisherConfig) GetDialTimeout ¶ added in v1.41.0
func (p PublisherConfig) GetDialTimeout() time.Duration
GetDialTimeout returns the dial timeout duration. Returns 5 seconds by default if not configured.
func (PublisherConfig) GetMaxMessageSizePerPartition ¶ added in v1.38.1
func (p PublisherConfig) GetMaxMessageSizePerPartition() int32
GetMaxMessageSizePerPartition returns the maximum message size per partition in bytes. Returns 64MB by default if not configured or set to a non-positive value.
func (PublisherConfig) GetRequiredAckLevel ¶
func (p PublisherConfig) GetRequiredAckLevel() kgo.Acks
GetRequiredAckLevel returns the required acknowledgment level for message production. Returns LeaderAck() for level 1, AllISRAcks() for level -1, and NoAck() for all other values.
func (PublisherConfig) GetWriteTimeout ¶ added in v1.37.1
func (p PublisherConfig) GetWriteTimeout() time.Duration
GetWriteTimeout returns the write timeout duration. Returns 10 seconds by default if not configured.
type PublisherMetricStorage ¶
type PublisherMetricStorage interface {
ObservePublishDuration(topic string, t time.Duration)
ObservePublishMsgSize(topic string, size int)
IncPublishError(topic string)
}
PublisherMetricStorage defines the interface for publisher metrics storage.
type TLS ¶ added in v1.47.0
type TLS struct {
RootCA *string `schema:"Корневой сертификат"`
PrivateKey *string `schema:"Закрытый ключ"`
Certificate *string `schema:"Сертификат клиента"`
InsecureSkipVerify bool `schema:"Пропуск проверки сертификатов"`
}
TLS holds TLS configuration for secure connections to Kafka.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package consumer provides a high-level abstraction for consuming messages from Apache Kafka topics.
|
Package consumer provides a high-level abstraction for consuming messages from Apache Kafka topics. |
|
Package handler provides synchronous message processing with support for retry logic, commit handling, and middleware chains for cross-cutting concerns like logging, metrics, and panic recovery.
|
Package handler provides synchronous message processing with support for retry logic, commit handling, and middleware chains for cross-cutting concerns like logging, metrics, and panic recovery. |
|
Package publisher provides a high-level abstraction for publishing messages to Apache Kafka topics.
|
Package publisher provides a high-level abstraction for publishing messages to Apache Kafka topics. |