Documentation
¶
Index ¶
- Constants
- func ConsumerLog(logger log.Logger) consumer.Middleware
- func ConsumerRequestId() consumer.Middleware
- func GetHeaderValue(headers []kafka.Header, key string) string
- func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync
- func PlainAuth(auth *Auth) sasl.Mechanism
- func PublisherLog(logger log.Logger) publisher.Middleware
- func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
- func PublisherRequestId() publisher.Middleware
- type Auth
- type Client
- type Config
- type ConfigOption
- type ConsumerConfig
- type KafkaClient
- type LogObserver
- type NoopObserver
- type Observer
- type PublisherConfig
- func (p PublisherConfig) DefaultPublisher(logger log.Logger, restMiddlewares ...publisher.Middleware) *publisher.Publisher
- func (p PublisherConfig) GetBatchSize() int
- func (p PublisherConfig) GetBatchTimeout() time.Duration
- func (p PublisherConfig) GetMaxMessageSize() int64
- func (p PublisherConfig) GetRequiredAckLevel() kafka.RequiredAcks
- func (p PublisherConfig) GetWriteTimeout() time.Duration
- type PublisherMetricStorage
Constants ¶
View Source
const RequestIdHeader = "x-request-id"
Variables ¶
This section is empty.
Functions ¶
func ConsumerLog ¶
func ConsumerLog(logger log.Logger) consumer.Middleware
func ConsumerRequestId ¶
func ConsumerRequestId() consumer.Middleware
func NewResultHandler ¶
func PublisherLog ¶
func PublisherLog(logger log.Logger) publisher.Middleware
func PublisherMetrics ¶
func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
func PublisherRequestId ¶
func PublisherRequestId() publisher.Middleware
Types ¶
type ConfigOption ¶
type ConfigOption func(c *Config)
func WithConsumers ¶
func WithConsumers(consumers ...consumer.Consumer) ConfigOption
func WithPublishers ¶
func WithPublishers(publishers ...*publisher.Publisher) ConfigOption
type ConsumerConfig ¶
type ConsumerConfig struct {
Addresses []string `validate:"required" schema:"Список адресов брокеров для чтения сообщений"`
Topic string `validate:"required" schema:"Топик"`
GroupId string `validate:"required" schema:"Идентификатор консьюмера"`
Concurrency int `schema:"Кол-во обработчиков, по умолчанию 1"`
MaxBatchSizeMb int `` /* 133-byte string literal not displayed */
CommitIntervalSec *int `` /* 141-byte string literal not displayed */
Auth *Auth `schema:"Параметры аутентификации"`
}
func (ConsumerConfig) DefaultConsumer ¶
func (c ConsumerConfig) DefaultConsumer(logger log.Logger, handler consumer.Handler, restMiddlewares ...consumer.Middleware) consumer.Consumer
func (ConsumerConfig) GetCommitInterval ¶ added in v1.37.1
func (c ConsumerConfig) GetCommitInterval() time.Duration
func (ConsumerConfig) GetMaxBatchSizeMb ¶ added in v1.37.1
func (c ConsumerConfig) GetMaxBatchSizeMb() int
type KafkaClient ¶
type KafkaClient struct {
// contains filtered or unexported fields
}
type LogObserver ¶
type LogObserver struct {
NoopObserver
// contains filtered or unexported fields
}
func NewLogObserver ¶
func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver
func (LogObserver) ClientError ¶
func (l LogObserver) ClientError(err error)
func (LogObserver) ClientReady ¶
func (l LogObserver) ClientReady()
func (LogObserver) ShutdownDone ¶
func (l LogObserver) ShutdownDone()
func (LogObserver) ShutdownStarted ¶
func (l LogObserver) ShutdownStarted()
type NoopObserver ¶
type NoopObserver struct{}
func (NoopObserver) ClientError ¶
func (n NoopObserver) ClientError(err error)
func (NoopObserver) ClientReady ¶
func (n NoopObserver) ClientReady()
func (NoopObserver) ShutdownDone ¶
func (n NoopObserver) ShutdownDone()
func (NoopObserver) ShutdownStarted ¶
func (n NoopObserver) ShutdownStarted()
type Observer ¶
type Observer interface {
ClientReady()
ClientError(err error)
ShutdownStarted()
ShutdownDone()
}
type PublisherConfig ¶
type PublisherConfig struct {
Addresses []string `validate:"required" schema:"Список адресов брокеров для отправки сообщений"`
Topic string `` /* 160-byte string literal not displayed */
MaxMsgSizeMb int64 `schema:"Максимальный размер сообщений в Мб, по умолчанию 64 Мб"`
BatchSize int `` /* 142-byte string literal not displayed */
BatchTimeoutMs *int `schema:"Периодичность записи батчей в кафку в мс, по умолчанию 500 мс"`
WriteTimeoutSec *int `schema:"Таймаут отправки сообщений, по умолчанию 10 секунд"`
RequiredAckLevel int `` /* 179-byte string literal not displayed */
Auth *Auth `schema:"Параметры аутентификации"`
}
func (PublisherConfig) DefaultPublisher ¶
func (p PublisherConfig) DefaultPublisher(logger log.Logger, restMiddlewares ...publisher.Middleware) *publisher.Publisher
func (PublisherConfig) GetBatchSize ¶
func (p PublisherConfig) GetBatchSize() int
func (PublisherConfig) GetBatchTimeout ¶ added in v1.37.1
func (p PublisherConfig) GetBatchTimeout() time.Duration
func (PublisherConfig) GetMaxMessageSize ¶
func (p PublisherConfig) GetMaxMessageSize() int64
func (PublisherConfig) GetRequiredAckLevel ¶
func (p PublisherConfig) GetRequiredAckLevel() kafka.RequiredAcks
func (PublisherConfig) GetWriteTimeout ¶ added in v1.37.1
func (p PublisherConfig) GetWriteTimeout() time.Duration
Source Files
¶
Click to show internal directories.
Click to hide internal directories.