Documentation
¶
Overview ¶
nolint:ireturn,lll,unparam,gosec
Index ¶
- Constants
- func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware
- func ConsumerRequestId() consumer.Middleware
- func GetHeaderValue(headers []kgo.RecordHeader, key string) string
- func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync
- func PlainAuth(auth *Auth) sasl.Mechanism
- func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware
- func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
- func PublisherRequestId() publisher.Middleware
- func PublisherRetry(retrier Retrier) publisher.Middleware
- func ScramAuth(auth *Auth) (sasl.Mechanism, error)
- type Auth
- type Client
- type Config
- type ConfigOption
- type ConsumerConfig
- type LogObserver
- type Logger
- type NoopObserver
- type Observer
- type PublisherConfig
- func (p PublisherConfig) DefaultPublisher(logCtx context.Context, logger log.Logger, ...) *publisher.Publisher
- func (p PublisherConfig) GetBatchSizePerPartition() int
- func (p PublisherConfig) GetBatchTimeoutPerPartition() time.Duration
- func (p PublisherConfig) GetDialTimeout() time.Duration
- func (p PublisherConfig) GetMaxMessageSizePerPartition() int32
- func (p PublisherConfig) GetRequiredAckLevel() kgo.Acks
- func (p PublisherConfig) GetWriteTimeout() time.Duration
- type PublisherMetricStorage
- type Retrier
- type TLS
Constants ¶
View Source
const ( AuthTypePlain = "SASL/PLAIN" AuthTypeSCRAM = "SASL/SCRAM" ScramTypeSHA256 = "SHA256" ScramTypeSHA512 = "SHA512" )
Variables ¶
This section is empty.
Functions ¶
func ConsumerLog ¶
func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware
func ConsumerRequestId ¶
func ConsumerRequestId() consumer.Middleware
func GetHeaderValue ¶
func GetHeaderValue(headers []kgo.RecordHeader, key string) string
func NewResultHandler ¶
func PublisherLog ¶
func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware
func PublisherMetrics ¶
func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
func PublisherRequestId ¶
func PublisherRequestId() publisher.Middleware
func PublisherRetry ¶ added in v1.50.0
func PublisherRetry(retrier Retrier) publisher.Middleware
PublisherRetry creates a middleware for retrying message publications. It is recommended to use this middleware after logging, to avoid duplicate logging of publication attempts.
Types ¶
type ConfigOption ¶
type ConfigOption func(c *Config)
func WithConsumers ¶
func WithConsumers(consumers ...consumer.Consumer) ConfigOption
func WithPublishers ¶
func WithPublishers(publishers ...*publisher.Publisher) ConfigOption
type ConsumerConfig ¶
type ConsumerConfig struct {
Addresses []string `validate:"required" schema:"Список адресов брокеров для чтения сообщений"`
Topic string `validate:"required" schema:"Топик"`
GroupId string `validate:"required" schema:"Идентификатор консьюмера"`
Concurrency int `schema:"Кол-во обработчиков, по умолчанию 1"`
MaxBatchSizeMb int32 `` /* 133-byte string literal not displayed */
CommitIntervalSec *int `` /* 141-byte string literal not displayed */
Auth *Auth `schema:"Параметры аутентификации"`
TLS *TLS `schema:"Данные для установки TLS-соединения"`
DialTimeoutMs *int `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
MetricConsumerId *string `` /* 150-byte string literal not displayed */
}
func (ConsumerConfig) DefaultConsumer ¶
func (ConsumerConfig) GetCommitInterval ¶ added in v1.37.1
func (c ConsumerConfig) GetCommitInterval() time.Duration
func (ConsumerConfig) GetDialTimeout ¶ added in v1.41.0
func (c ConsumerConfig) GetDialTimeout() time.Duration
func (ConsumerConfig) GetMaxBatchSizeMb ¶ added in v1.37.1
func (c ConsumerConfig) GetMaxBatchSizeMb() int32
type LogObserver ¶
type LogObserver struct {
NoopObserver
// contains filtered or unexported fields
}
nolint:containedctx
func NewLogObserver ¶
func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver
func (LogObserver) ClientError ¶
func (l LogObserver) ClientError(err error)
func (LogObserver) ClientReady ¶
func (l LogObserver) ClientReady()
func (LogObserver) ShutdownDone ¶
func (l LogObserver) ShutdownDone()
func (LogObserver) ShutdownStarted ¶
func (l LogObserver) ShutdownStarted()
type Logger ¶ added in v1.64.1
type Logger struct {
// contains filtered or unexported fields
}
type NoopObserver ¶
type NoopObserver struct{}
func (NoopObserver) ClientError ¶
func (n NoopObserver) ClientError(err error)
func (NoopObserver) ClientReady ¶
func (n NoopObserver) ClientReady()
func (NoopObserver) ShutdownDone ¶
func (n NoopObserver) ShutdownDone()
func (NoopObserver) ShutdownStarted ¶
func (n NoopObserver) ShutdownStarted()
type Observer ¶
type Observer interface {
ClientReady()
ClientError(err error)
ShutdownStarted()
ShutdownDone()
}
type PublisherConfig ¶
type PublisherConfig struct {
Addresses []string `validate:"required" schema:"Список адресов брокеров для отправки сообщений"`
Topic string `` /* 160-byte string literal not displayed */
MaxMsgSizeMbPerPartition int32 `schema:"Максимальный размер сообщений в Мб, по умолчанию 64 Мб"`
BatchSizePerPartition int `` /* 142-byte string literal not displayed */
BatchTimeoutPerPartitionMs *int `schema:"Периодичность записи батчей в кафку в мс, по умолчанию 500 мс"`
WriteTimeoutSec *int `schema:"Таймаут отправки сообщений, по умолчанию 10 секунд"`
RequiredAckLevel int `` /* 179-byte string literal not displayed */
Auth *Auth `schema:"Параметры аутентификации"`
TLS *TLS `schema:"Данные для установки TLS-соединения"`
DialTimeoutMs *int `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
MetricPublisherId *string `` /* 148-byte string literal not displayed */
}
func (PublisherConfig) DefaultPublisher ¶
func (p PublisherConfig) DefaultPublisher( logCtx context.Context, logger log.Logger, restMiddlewares ...publisher.Middleware, ) *publisher.Publisher
func (PublisherConfig) GetBatchSizePerPartition ¶ added in v1.38.1
func (p PublisherConfig) GetBatchSizePerPartition() int
func (PublisherConfig) GetBatchTimeoutPerPartition ¶ added in v1.38.1
func (p PublisherConfig) GetBatchTimeoutPerPartition() time.Duration
func (PublisherConfig) GetDialTimeout ¶ added in v1.41.0
func (p PublisherConfig) GetDialTimeout() time.Duration
func (PublisherConfig) GetMaxMessageSizePerPartition ¶ added in v1.38.1
func (p PublisherConfig) GetMaxMessageSizePerPartition() int32
func (PublisherConfig) GetRequiredAckLevel ¶
func (p PublisherConfig) GetRequiredAckLevel() kgo.Acks
func (PublisherConfig) GetWriteTimeout ¶ added in v1.37.1
func (p PublisherConfig) GetWriteTimeout() time.Duration
type PublisherMetricStorage ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.