Documentation
¶
Overview ¶
Package kafka provides the transport-layer building blocks for Alertmanager components that talk to Apache Kafka via franz-go.
It deliberately stays narrow: configuration shared by producers and consumers, franz-go option construction, an error classifier, and a broker-list naming helper. Higher-level concerns (bounded producer buffers, consumer group orchestration, message serialisation) live in the calling package.
At the moment the only consumer of this package is the event recorder's Kafka output (eventrecorder/kafka.go). A future Kafka receiver (see github.com/prometheus/alertmanager/issues/1996) is the other intended user.
Index ¶
- Constants
- func BrokerList(brokers []string) string
- func BrokerListsEqual(a, b []string) bool
- func BuildOpts(opts ClientOptions, logger *slog.Logger) ([]kgo.Opt, error)
- func PingInBackground(client *kgo.Client, logger *slog.Logger)
- func ValidateFormat(format Format) error
- type Acks
- type ClientOptions
- type Compression
- type ErrorCategory
- type Format
Constants ¶
const ( DefaultClientID = "alertmanager" DefaultPingTimeout = 5 * time.Second DefaultLinger = 5 * time.Millisecond DefaultFlushBudget = 30 * time.Second )
Default values applied by BuildOpts when ClientOptions leaves a field zero.
Variables ¶
This section is empty.
Functions ¶
func BrokerList ¶
BrokerList returns a deterministic representation of the broker list suitable for use in metric label values: brokers sorted alphabetically and joined with commas. Callers that want a fuller identifier (e.g. "kafka:<brokers>/<topic>") compose around this helper.
func BrokerListsEqual ¶
BrokerListsEqual reports whether two broker lists are content-equal, ignoring order. Useful for hot-reload diffing where reordering a YAML broker list is semantically a no-op.
func BuildOpts ¶
BuildOpts converts the high-level ClientOptions into a slice of franz-go options. Callers append role-specific options (e.g. kgo.ConsumeTopics for consumers, or extra producer tuning) before passing the slice to kgo.NewClient.
The returned option set:
- Seeds the supplied brokers and sets ClientID (defaulting to DefaultClientID).
- Sets DefaultProduceTopic when Topic is non-empty.
- Applies ProducerLinger(DefaultLinger) — harmless for consumers.
- Applies RequiredAcks based on opts.Acks (defaults to LeaderAck).
- Disables idempotent writes unless the caller explicitly opted into AcksAll; franz-go's idempotent producer mandates acks=all and our default is acks=leader for low latency.
- Applies ProducerBatchCompression when opts.Compression is set.
- Applies DialTLSConfig when opts.TLSConfig is non-nil.
- Wires a kslog adapter so franz-go logs through the caller's slog.Logger. A nil logger is replaced with a discard logger.
Validate is called internally; callers do not need to call it separately.
func PingInBackground ¶
PingInBackground performs a best-effort connectivity check against the supplied client without blocking the caller. Failure is logged at warn level; franz-go retries connections internally so subsequent produce/fetch calls will succeed once a broker becomes reachable.
The goroutine exits after DefaultPingTimeout or when the client is closed (which cancels any in-flight broker dial).
func ValidateFormat ¶
ValidateFormat checks that a wire-format value is recognised. It is provided as a free helper because some callers (e.g. the event recorder) carry the format on a per-output struct rather than on the shared ClientOptions.
Types ¶
type ClientOptions ¶
type ClientOptions struct {
// Brokers is the list of Kafka seed brokers in host:port form.
// At least one entry is required.
Brokers []string
// Topic is the default produce topic. Optional for consumers.
Topic string
// ClientID is reported to the brokers. Defaults to "alertmanager".
ClientID string
// Acks is the producer acknowledgement level: "", AcksNone,
// AcksLeader (default), or AcksAll. Producer-only.
Acks Acks
// Compression is the producer compression codec: "" (default,
// no compression), CompressionNone, CompressionGzip,
// CompressionSnappy, CompressionLZ4, or CompressionZstd.
// Producer-only.
Compression Compression
// TLSConfig configures TLS for the broker connection. If nil,
// PLAINTEXT is used.
TLSConfig *commoncfg.TLSConfig
}
ClientOptions is the configuration shared between Kafka producers and consumers used in Alertmanager. It is the input to BuildOpts.
All fields are optional except Brokers; consumers ignore producer-only knobs (Acks, Compression) and vice versa. Topic doubles as the franz-go DefaultProduceTopic for producers and may be ignored or repurposed (e.g. as a subscription target) by consumers.
func (ClientOptions) Validate ¶
func (o ClientOptions) Validate() error
Validate checks ClientOptions for obvious problems. It does not contact the brokers and does not mutate the receiver.
type Compression ¶
type Compression string
Compression is the producer batch compression codec.
const ( CompressionNone Compression = "none" CompressionGzip Compression = "gzip" CompressionSnappy Compression = "snappy" CompressionLZ4 Compression = "lz4" CompressionZstd Compression = "zstd" )
Compression codecs supported on the producer wire.
type ErrorCategory ¶
type ErrorCategory string
ErrorCategory is a coarse bucket for a Kafka error, suitable for use as a bounded-cardinality Prometheus label value.
const ( ErrorCategoryNone ErrorCategory = "none" ErrorCategoryTimeout ErrorCategory = "timeout" ErrorCategoryNetwork ErrorCategory = "network" ErrorCategoryBroker ErrorCategory = "broker" ErrorCategoryUnknown ErrorCategory = "unknown" )
Error categories returned by ClassifyError.
func ClassifyError ¶
func ClassifyError(err error) ErrorCategory
ClassifyError buckets a franz-go error into one of the ErrorCategory* constants. It keeps Prometheus metric label cardinality bounded regardless of the specific error string franz-go surfaces.