stats

package
v1.52.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2025 License: MIT Imports: 3 Imported by: 0

README

Package stats

Пакет stats предоставляет инструменты для сбора и экспорта метрик Kafka-паблишеров и консумеров в Prometheus. Интегрируется с пакетами consumer и publisher для мониторинга производительности и диагностики проблем.

Основные возможности

  • Сбор детальных метрик работы паблишеров и консумеров
  • Поддержка гистограмм и суммарных показателей
  • Автоматическая регистрация метрик в Prometheus Registry

Types

ConsumerStorage

Реализует интерфейс MetricStorage для консумеров. Собирает:

  • Количественные показатели (сообщения, ошибки, ребалансировки)
  • Временные характеристики (задержки подключения, чтения, ожидания)
  • Размеры данных (батчи, сообщения)

Methods:

NewConsumerStorage(reg *metrics.Registry, consumerId string) *ConsumerStorage

Конструктор хралища метрик для консумера.

Metrics:

# Основные счетчики
kafka_reader_dial_count
kafka_reader_fetch_count
kafka_reader_message_count
kafka_reader_error_count
kafka_reader_rebalance_count
kafka_reader_timeout_count

# Задержки (миллисекунды)
kafka_reader_avg_dial_time_duration_ms
kafka_reader_min_dial_time_duration_ms
kafka_reader_max_dial_time_duration_ms
kafka_reader_avg_read_time_duration_ms
kafka_reader_min_read_time_duration_ms
kafka_reader_max_read_time_duration_ms
kafka_reader_avg_wait_time_duration_ms
kafka_reader_min_wait_time_duration_ms
kafka_reader_max_wait_time_duration_ms


# Размеры данных
kafka_reader_avg_fetch_size_count
kafka_reader_min_fetch_size_count
kafka_reader_max_fetch_size_count
kafka_reader_avg_fetch_bytes_count
kafka_reader_min_fetch_bytes_count
kafka_reader_max_fetch_bytes_count
kafka_reader_message_bytes_count
kafka_reader_max_fetch_size_count

# Состояние
kafka_reader_lag_count
kafka_reader_offset_count
kafka_reader_queue_capacity_count
kafka_reader_queue_length_count
PublisherStorage

Реализует интерфейс MetricStorage для продюсеров. Собирает:

  • Статистику отправки сообщений
  • Характеристики батчей
  • Ошибки и повторы

Methods:

NewPublisherStorage(reg *metrics.Registry, publisherId string) *PublisherStorage

Конструктор хралища метрик для паблишера.

Metrics:

# Основные счетчики
kafka_writer_write_count
kafka_writer_message_count
kafka_writer_message_bytes_count
kafka_writer_retries_count
kafka_writer_error_count

# Временные показатели
kafka_writer_avg_batch_time_duration_ms
kafka_writer_min_batch_time_duration_ms
kafka_writer_max_batch_time_duration_ms
kafka_writer_avg_batch_queue_time_duration_ms
kafka_writer_min_batch_queue_time_duration_ms
kafka_writer_max_batch_queue_time_duration_ms
kafka_writer_avg_write_time_duration_ms
kafka_writer_min_write_time_duration_ms
kafka_writer_max_write_time_duration_ms
kafka_writer_avg_wait_time_duration_ms
kafka_writer_min_wait_time_duration_ms
kafka_writer_max_wait_time_duration_ms

# Размеры батчей
kafka_writer_avg_batch_size_count
kafka_writer_min_batch_size_count
kafka_writer_max_batch_size_count
kafka_writer_avg_batch_bytes_count
kafka_writer_min_batch_bytes_count
kafka_writer_max_batch_bytes_count

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerStorage

type ConsumerStorage struct {
	// contains filtered or unexported fields
}

func NewConsumerStorage

func NewConsumerStorage(reg *metrics.Registry, consumerId string) *ConsumerStorage

nolint:funlen,promlinter

func (*ConsumerStorage) ObserveConsumerDialTime

func (c *ConsumerStorage) ObserveConsumerDialTime(dialTime kafka.DurationStats)

func (*ConsumerStorage) ObserveConsumerDials

func (c *ConsumerStorage) ObserveConsumerDials(dials int64)

func (*ConsumerStorage) ObserveConsumerError

func (c *ConsumerStorage) ObserveConsumerError(errors int64)

func (*ConsumerStorage) ObserveConsumerFetchBytes

func (c *ConsumerStorage) ObserveConsumerFetchBytes(fetchBytes kafka.SummaryStats)

func (*ConsumerStorage) ObserveConsumerFetchSize

func (c *ConsumerStorage) ObserveConsumerFetchSize(fetchSize kafka.SummaryStats)

func (*ConsumerStorage) ObserveConsumerFetches

func (c *ConsumerStorage) ObserveConsumerFetches(fetches int64)

func (*ConsumerStorage) ObserveConsumerLag

func (c *ConsumerStorage) ObserveConsumerLag(lag int64)

func (*ConsumerStorage) ObserveConsumerMessageBytes

func (c *ConsumerStorage) ObserveConsumerMessageBytes(messageBytes int64)

func (*ConsumerStorage) ObserveConsumerMessages

func (c *ConsumerStorage) ObserveConsumerMessages(messages int64)

func (*ConsumerStorage) ObserveConsumerOffset

func (c *ConsumerStorage) ObserveConsumerOffset(offset int64)

func (*ConsumerStorage) ObserveConsumerQueueCapacity

func (c *ConsumerStorage) ObserveConsumerQueueCapacity(queueCapacity int64)

func (*ConsumerStorage) ObserveConsumerQueueLength

func (c *ConsumerStorage) ObserveConsumerQueueLength(queueLength int64)

func (*ConsumerStorage) ObserveConsumerReadTime

func (c *ConsumerStorage) ObserveConsumerReadTime(readTime kafka.DurationStats)

func (*ConsumerStorage) ObserveConsumerRebalances

func (c *ConsumerStorage) ObserveConsumerRebalances(rebalances int64)

func (*ConsumerStorage) ObserveConsumerTimeouts

func (c *ConsumerStorage) ObserveConsumerTimeouts(timeouts int64)

func (*ConsumerStorage) ObserveConsumerWaitTime

func (c *ConsumerStorage) ObserveConsumerWaitTime(waitTime kafka.DurationStats)

type PublisherStorage

type PublisherStorage struct {
	// contains filtered or unexported fields
}

func NewPublisherStorage

func NewPublisherStorage(reg *metrics.Registry, publisherId string) *PublisherStorage

nolint:funlen,promlinter

func (*PublisherStorage) ObserveConsumerBatchBytes

func (p *PublisherStorage) ObserveConsumerBatchBytes(batchBytes kafka.SummaryStats)

func (*PublisherStorage) ObserveConsumerBatchQueueTime

func (p *PublisherStorage) ObserveConsumerBatchQueueTime(batchQueueTime kafka.DurationStats)

func (*PublisherStorage) ObserveConsumerBatchSize

func (p *PublisherStorage) ObserveConsumerBatchSize(batchSize kafka.SummaryStats)

func (*PublisherStorage) ObserveConsumerBatchTime

func (p *PublisherStorage) ObserveConsumerBatchTime(batchTime kafka.DurationStats)

func (*PublisherStorage) ObserveConsumerWaitTime

func (p *PublisherStorage) ObserveConsumerWaitTime(waitTime kafka.DurationStats)

func (*PublisherStorage) ObserveConsumerWriteTime

func (p *PublisherStorage) ObserveConsumerWriteTime(writeTime kafka.DurationStats)

func (*PublisherStorage) ObservePublisherErrors

func (p *PublisherStorage) ObservePublisherErrors(errors int64)

func (*PublisherStorage) ObservePublisherMessageBytes

func (p *PublisherStorage) ObservePublisherMessageBytes(messageBytes int64)

func (*PublisherStorage) ObservePublisherMessages

func (p *PublisherStorage) ObservePublisherMessages(messages int64)

func (*PublisherStorage) ObservePublisherRetries

func (p *PublisherStorage) ObservePublisherRetries(retries int64)

func (*PublisherStorage) ObservePublisherWrites

func (p *PublisherStorage) ObservePublisherWrites(writes int64)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL