kafka

package
v0.33.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package kafka provides the transport-layer building blocks for Alertmanager components that talk to Apache Kafka via franz-go.

It deliberately stays narrow: configuration shared by producers and consumers, franz-go option construction, an error classifier, and a broker-list naming helper. Higher-level concerns (bounded producer buffers, consumer group orchestration, message serialisation) live in the calling package.

At the moment the only consumer of this package is the event recorder's Kafka output (eventrecorder/kafka.go). A future Kafka receiver (see github.com/prometheus/alertmanager/issues/1996) is the other intended user.

Index

Constants

View Source
const (
	DefaultClientID    = "alertmanager"
	DefaultPingTimeout = 5 * time.Second
	DefaultLinger      = 5 * time.Millisecond
	DefaultFlushBudget = 30 * time.Second
)

Default values applied by BuildOpts when ClientOptions leaves a field zero.

Variables

This section is empty.

Functions

func BrokerList

func BrokerList(brokers []string) string

BrokerList returns a deterministic representation of the broker list suitable for use in metric label values: brokers sorted alphabetically and joined with commas. Callers that want a fuller identifier (e.g. "kafka:<brokers>/<topic>") compose around this helper.

func BrokerListsEqual

func BrokerListsEqual(a, b []string) bool

BrokerListsEqual reports whether two broker lists are content-equal, ignoring order. Useful for hot-reload diffing where reordering a YAML broker list is semantically a no-op.

func BuildOpts

func BuildOpts(opts ClientOptions, logger *slog.Logger) ([]kgo.Opt, error)

BuildOpts converts the high-level ClientOptions into a slice of franz-go options. Callers append role-specific options (e.g. kgo.ConsumeTopics for consumers, or extra producer tuning) before passing the slice to kgo.NewClient.

The returned option set:

  • Seeds the supplied brokers and sets ClientID (defaulting to DefaultClientID).
  • Sets DefaultProduceTopic when Topic is non-empty.
  • Applies ProducerLinger(DefaultLinger) — harmless for consumers.
  • Applies RequiredAcks based on opts.Acks (defaults to LeaderAck).
  • Disables idempotent writes unless the caller explicitly opted into AcksAll; franz-go's idempotent producer mandates acks=all and our default is acks=leader for low latency.
  • Applies ProducerBatchCompression when opts.Compression is set.
  • Applies DialTLSConfig when opts.TLSConfig is non-nil.
  • Wires a kslog adapter so franz-go logs through the caller's slog.Logger. A nil logger is replaced with a discard logger.

Validate is called internally; callers do not need to call it separately.

func PingInBackground

func PingInBackground(client *kgo.Client, logger *slog.Logger)

PingInBackground performs a best-effort connectivity check against the supplied client without blocking the caller. Failure is logged at warn level; franz-go retries connections internally so subsequent produce/fetch calls will succeed once a broker becomes reachable.

The goroutine exits after DefaultPingTimeout or when the client is closed (which cancels any in-flight broker dial).

func ValidateFormat

func ValidateFormat(format Format) error

ValidateFormat checks that a wire-format value is recognised. It is provided as a free helper because some callers (e.g. the event recorder) carry the format on a per-output struct rather than on the shared ClientOptions.

Types

type Acks

type Acks string

Acks is the producer acknowledgement level.

const (
	AcksNone   Acks = "none"
	AcksLeader Acks = "leader"
	AcksAll    Acks = "all"
)

Producer acknowledgement levels.

type ClientOptions

type ClientOptions struct {
	// Brokers is the list of Kafka seed brokers in host:port form.
	// At least one entry is required.
	Brokers []string

	// Topic is the default produce topic.  Optional for consumers.
	Topic string

	// ClientID is reported to the brokers.  Defaults to "alertmanager".
	ClientID string

	// Acks is the producer acknowledgement level: "", AcksNone,
	// AcksLeader (default), or AcksAll.  Producer-only.
	Acks Acks

	// Compression is the producer compression codec: "" (default,
	// no compression), CompressionNone, CompressionGzip,
	// CompressionSnappy, CompressionLZ4, or CompressionZstd.
	// Producer-only.
	Compression Compression

	// TLSConfig configures TLS for the broker connection.  If nil,
	// PLAINTEXT is used.
	TLSConfig *commoncfg.TLSConfig
}

ClientOptions is the configuration shared between Kafka producers and consumers used in Alertmanager. It is the input to BuildOpts.

All fields are optional except Brokers; consumers ignore producer-only knobs (Acks, Compression) and vice versa. Topic doubles as the franz-go DefaultProduceTopic for producers and may be ignored or repurposed (e.g. as a subscription target) by consumers.

func (ClientOptions) Validate

func (o ClientOptions) Validate() error

Validate checks ClientOptions for obvious problems. It does not contact the brokers and does not mutate the receiver.

type Compression

type Compression string

Compression is the producer batch compression codec.

const (
	CompressionNone   Compression = "none"
	CompressionGzip   Compression = "gzip"
	CompressionSnappy Compression = "snappy"
	CompressionLZ4    Compression = "lz4"
	CompressionZstd   Compression = "zstd"
)

Compression codecs supported on the producer wire.

type ErrorCategory

type ErrorCategory string

ErrorCategory is a coarse bucket for a Kafka error, suitable for use as a bounded-cardinality Prometheus label value.

const (
	ErrorCategoryNone    ErrorCategory = "none"
	ErrorCategoryTimeout ErrorCategory = "timeout"
	ErrorCategoryNetwork ErrorCategory = "network"
	ErrorCategoryBroker  ErrorCategory = "broker"
	ErrorCategoryUnknown ErrorCategory = "unknown"
)

Error categories returned by ClassifyError.

func ClassifyError

func ClassifyError(err error) ErrorCategory

ClassifyError buckets a franz-go error into one of the ErrorCategory* constants. It keeps Prometheus metric label cardinality bounded regardless of the specific error string franz-go surfaces.

type Format

type Format string

Format is the wire encoding of a Kafka message value.

const (
	FormatJSON     Format = "json"
	FormatProtobuf Format = "protobuf"
)

Wire format identifiers used by producers and consumers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL