kafkax

package
v1.41.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2025 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumerLog

func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware

func ConsumerRequestId

func ConsumerRequestId() consumer.Middleware

func GetHeaderValue

func GetHeaderValue(headers []kafka.Header, key string) string

func NewResultHandler

func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync

func PlainAuth

func PlainAuth(auth *Auth) sasl.Mechanism

func PublisherLog

func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware

func PublisherMetrics

func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware

func PublisherRequestId

func PublisherRequestId() publisher.Middleware

Types

type Auth

type Auth struct {
	Username string `validate:"required" schema:"Логин"`
	Password string `validate:"required" schema:"Пароль"`
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(logger log.Logger) *Client

func (*Client) Close

func (c *Client) Close()

func (*Client) Healthcheck

func (c *Client) Healthcheck(ctx context.Context) error

func (*Client) Shutdown added in v1.39.3

func (c *Client) Shutdown()

func (*Client) UpgradeAndServe

func (c *Client) UpgradeAndServe(ctx context.Context, config Config)

type Config

type Config struct {
	Publishers []*publisher.Publisher
	Consumers  []consumer.Consumer
}

func NewConfig

func NewConfig(opts ...ConfigOption) Config

type ConfigOption

type ConfigOption func(c *Config)

func WithConsumers

func WithConsumers(consumers ...consumer.Consumer) ConfigOption

func WithPublishers

func WithPublishers(publishers ...*publisher.Publisher) ConfigOption

type ConsumerConfig

type ConsumerConfig struct {
	Addresses         []string `validate:"required" schema:"Список адресов брокеров для чтения сообщений"`
	Topic             string   `validate:"required" schema:"Топик"`
	GroupId           string   `validate:"required" schema:"Идентификатор консьюмера"`
	Concurrency       int      `schema:"Кол-во обработчиков, по умолчанию 1"`
	MaxBatchSizeMb    int      `` /* 133-byte string literal not displayed */
	CommitIntervalSec *int     `` /* 141-byte string literal not displayed */
	Auth              *Auth    `schema:"Параметры аутентификации"`
	DialTimeoutMs     *int     `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
}

func (ConsumerConfig) DefaultConsumer

func (c ConsumerConfig) DefaultConsumer(
	logCtx context.Context,
	logger log.Logger,
	handler consumer.Handler,
	restMiddlewares ...consumer.Middleware,
) consumer.Consumer

func (ConsumerConfig) GetCommitInterval added in v1.37.1

func (c ConsumerConfig) GetCommitInterval() time.Duration

func (ConsumerConfig) GetDialTimeout added in v1.41.0

func (c ConsumerConfig) GetDialTimeout() time.Duration

func (ConsumerConfig) GetMaxBatchSizeMb added in v1.37.1

func (c ConsumerConfig) GetMaxBatchSizeMb() int

type LogObserver

type LogObserver struct {
	NoopObserver
	// contains filtered or unexported fields
}

func NewLogObserver

func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

func (LogObserver) ClientError

func (l LogObserver) ClientError(err error)

func (LogObserver) ClientReady

func (l LogObserver) ClientReady()

func (LogObserver) ShutdownDone

func (l LogObserver) ShutdownDone()

func (LogObserver) ShutdownStarted

func (l LogObserver) ShutdownStarted()

type NoopObserver

type NoopObserver struct{}

func (NoopObserver) ClientError

func (n NoopObserver) ClientError(err error)

func (NoopObserver) ClientReady

func (n NoopObserver) ClientReady()

func (NoopObserver) ShutdownDone

func (n NoopObserver) ShutdownDone()

func (NoopObserver) ShutdownStarted

func (n NoopObserver) ShutdownStarted()

type Observer

type Observer interface {
	ClientReady()
	ClientError(err error)
	ShutdownStarted()
	ShutdownDone()
}

type PublisherConfig

type PublisherConfig struct {
	Addresses                  []string `validate:"required" schema:"Список адресов брокеров для отправки сообщений"`
	Topic                      string   `` /* 160-byte string literal not displayed */
	MaxMsgSizeMbPerPartition   int64    `schema:"Максимальный размер сообщений в Мб, по умолчанию 64 Мб"`
	BatchSizePerPartition      int      `` /* 142-byte string literal not displayed */
	BatchTimeoutPerPartitionMs *int     `schema:"Периодичность записи батчей в кафку в мс, по умолчанию 500 мс"`
	WriteTimeoutSec            *int     `schema:"Таймаут отправки сообщений, по умолчанию 10 секунд"`
	RequiredAckLevel           int      `` /* 179-byte string literal not displayed */
	Auth                       *Auth    `schema:"Параметры аутентификации"`
	DialTimeoutMs              *int     `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
}

func (PublisherConfig) DefaultPublisher

func (p PublisherConfig) DefaultPublisher(
	logCtx context.Context,
	logger log.Logger,
	restMiddlewares ...publisher.Middleware,
) *publisher.Publisher

func (PublisherConfig) GetBatchSizePerPartition added in v1.38.1

func (p PublisherConfig) GetBatchSizePerPartition() int

func (PublisherConfig) GetBatchTimeoutPerPartition added in v1.38.1

func (p PublisherConfig) GetBatchTimeoutPerPartition() time.Duration

func (PublisherConfig) GetDialTimeout added in v1.41.0

func (p PublisherConfig) GetDialTimeout() time.Duration

func (PublisherConfig) GetMaxMessageSizePerPartition added in v1.38.1

func (p PublisherConfig) GetMaxMessageSizePerPartition() int64

func (PublisherConfig) GetRequiredAckLevel

func (p PublisherConfig) GetRequiredAckLevel() kafka.RequiredAcks

func (PublisherConfig) GetWriteTimeout added in v1.37.1

func (p PublisherConfig) GetWriteTimeout() time.Duration

type PublisherMetricStorage

type PublisherMetricStorage interface {
	ObservePublishDuration(topic string, t time.Duration)
	ObservePublishMsgSize(topic string, size int)
	IncPublishError(topic string)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL