eventrecorder

package
v0.33.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package eventrecorder provides a structured event recorder for significant Alertmanager events. Events are serialized as JSON and fanned out to one or more configured destinations (JSONL file, webhook, kafka).

RecordEvent never blocks the caller: events are serialized and placed on a bounded in-memory queue. A background goroutine drains the queue and sends to destinations. If the queue is full, events are dropped and a metric is incremented.

Package layout:

  • recorder.go Recorder core: types, write loop, fan-out.
  • metrics.go Prometheus metric definitions.
  • events.go Pure proto-conversion helpers and event constructors.
  • config.go Top-level Config: per-type output lists + equality.
  • file.go File output and its config.
  • webhook.go Webhook output and its config.
  • kafka.go Kafka output and its config.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AlertAsProto

func AlertAsProto(alert *types.Alert) *eventrecorderpb.Alert

AlertAsProto converts a types.Alert to an eventrecorderpb.Alert.

func EventRecordingEnabled

func EventRecordingEnabled(ctx context.Context) bool

EventRecordingEnabled reports whether event recording has been enabled in the given context via WithEventRecording.

func InhibitRuleAsProto

func InhibitRuleAsProto(sourceMatchers, targetMatchers labels.Matchers, equal map[model.LabelName]struct{}) *eventrecorderpb.InhibitRule

InhibitRuleAsProto converts inhibit rule fields to an eventrecorderpb.InhibitRule. It accepts the individual fields rather than the InhibitRule struct to avoid an import cycle.

func LabelSetAsProto

func LabelSetAsProto(ls model.LabelSet) *eventrecorderpb.LabelSet

LabelSetAsProto converts a model.LabelSet to an eventrecorderpb.LabelSet. Labels are sorted by name for deterministic output.

func MatcherAsProto

func MatcherAsProto(m *labels.Matcher) *eventrecorderpb.Matcher

MatcherAsProto converts a single *labels.Matcher to its protobuf representation.

func MatchersAsProto

func MatchersAsProto(matchers labels.Matchers) []*eventrecorderpb.Matcher

MatchersAsProto converts a slice of matchers to their protobuf representations.

func NewAlertCreatedEvent

func NewAlertCreatedEvent(alert *types.Alert) *eventrecorderpb.EventData

NewAlertCreatedEvent constructs an AlertCreated event.

func NewInhibitionMutedAlertEvent

func NewInhibitionMutedAlertEvent(rules []*eventrecorderpb.InhibitRule, fp model.Fingerprint, lset model.LabelSet, inhibitingFPs []model.Fingerprint) *eventrecorderpb.EventData

NewInhibitionMutedAlertEvent constructs an InhibitionMutedAlert event.

func NewSilenceCreatedEvent

func NewSilenceCreatedEvent(silence *eventrecorderpb.Silence) *eventrecorderpb.EventData

NewSilenceCreatedEvent constructs a SilenceCreated event.

func NewSilenceMutedAlertEvent

func NewSilenceMutedAlertEvent(silence *eventrecorderpb.Silence, fp model.Fingerprint, lset model.LabelSet) *eventrecorderpb.EventData

NewSilenceMutedAlertEvent constructs a SilenceMutedAlert event.

func NewSilenceUpdatedEvent

func NewSilenceUpdatedEvent(silence *eventrecorderpb.Silence) *eventrecorderpb.EventData

NewSilenceUpdatedEvent constructs a SilenceUpdated event.

func SilenceAsProto

func SilenceAsProto(sil *silencepb.Silence) *eventrecorderpb.Silence

SilenceAsProto converts a silencepb.Silence to an eventrecorderpb.Silence.

func SilenceMatcherAsProto

func SilenceMatcherAsProto(m *silencepb.Matcher) *eventrecorderpb.Matcher

SilenceMatcherAsProto converts a silencepb.Matcher to an eventrecorderpb.Matcher.

func WithEventRecording

func WithEventRecording(ctx context.Context) context.Context

WithEventRecording returns a context that enables event recording. By default, event recording is disabled; callers must opt in by decorating their context with this function.

Types

type Config

type Config struct {
	FileOutputs    []FileOutputConfig    `yaml:"file_outputs,omitempty" json:"file_outputs,omitempty"`
	WebhookOutputs []WebhookOutputConfig `yaml:"webhook_outputs,omitempty" json:"webhook_outputs,omitempty"`
	KafkaOutputs   []KafkaOutputConfig   `yaml:"kafka_outputs,omitempty" json:"kafka_outputs,omitempty"`
}

Config configures the event recorder feature.

Outputs are grouped by type, one list per destination kind, mirroring the way receivers group their integrations (e.g. webhook_configs, email_configs). Every recorded event is fanned out to every output across all lists.

type Destination

type Destination interface {
	// Name returns a stable identifier for this destination, suitable
	// for use as a Prometheus label value (e.g. "file:/var/log/events.jsonl"
	// or "webhook:https://example.com/hook").
	Name() string
	// SendEvent encodes and delivers the event.  It returns the number
	// of payload bytes written (for the bytes-written metric) and any
	// delivery error.  A serialization failure should be returned
	// wrapped in *serializeError so the recorder can attribute it to
	// the serialize-errors metric.
	SendEvent(event *eventrecorderpb.Event) (size int, err error)
	io.Closer
}

Destination is a single event destination. Each implementation owns its own serialization: it receives the structured event and is responsible for encoding it (e.g. JSON or protobuf) and delivering it.

Owning serialization per destination — rather than handing every destination a pre-encoded JSON blob — avoids the footgun of, say, a protobuf-configured Kafka output silently shipping a JSON payload.

type FileOutput

type FileOutput struct {
	// contains filtered or unexported fields
}

FileOutput writes pre-serialized JSON event bytes to a JSONL file. The file is reopened when fsnotify detects a rename or remove (e.g. logrotate).

func NewFileOutput

func NewFileOutput(path string, logger *slog.Logger) (*FileOutput, error)

NewFileOutput creates a new file-based event recorder output at the given path. The file is watched with fsnotify so that external log rotation tools (e.g., logrotate) trigger an immediate reopen.

func (*FileOutput) Close

func (fo *FileOutput) Close() error

Close stops the watcher goroutine, waits for it to exit, and closes the file.

func (*FileOutput) Name

func (fo *FileOutput) Name() string

Name returns a stable identifier for this output.

func (*FileOutput) SendEvent

func (fo *FileOutput) SendEvent(event *eventrecorderpb.Event) (int, error)

SendEvent serializes the event as a JSON line and appends it to the file. It returns the number of bytes written (including the trailing newline) for the bytes-written metric.

type FileOutputConfig

type FileOutputConfig struct {
	// Path is the JSONL file to append events to.  Created if absent.
	Path string `yaml:"path" json:"path"`
}

FileOutputConfig configures a JSONL file event recorder output.

func (*FileOutputConfig) UnmarshalYAML

func (c *FileOutputConfig) UnmarshalYAML(unmarshal func(any) error) error

UnmarshalYAML implements the yaml.Unmarshaler interface, validating the file output configuration.

type KafkaOutput

type KafkaOutput struct {
	// contains filtered or unexported fields
}

KafkaOutput delivers serialized events to a Kafka topic via franz-go. Events are buffered in a bounded local channel and produced by a single dispatcher goroutine; franz-go handles batching, compression, and retries internally.

When the local buffer is full, events are dropped (with a log message and a metric increment) so that a slow or unreachable broker cannot block the upstream event recorder pipeline.

func NewKafkaOutput

func NewKafkaOutput(
	cfg KafkaOutputConfig,
	instance string,
	dropsCounter *prometheus.CounterVec,
	produceErrors *prometheus.CounterVec,
	logger *slog.Logger,
) (*KafkaOutput, error)

NewKafkaOutput constructs a KafkaOutput from the supplied configuration. A failure to reach the brokers at startup is logged at warn level but does not fail construction; franz-go retries connections in the background and records are buffered until delivery becomes possible.

func (*KafkaOutput) Close

func (ko *KafkaOutput) Close() error

Close stops the dispatcher, drains any remaining buffered records into franz-go's producer, flushes the producer (up to flushBudget), and then closes the underlying client. Pending records that do not flush before the budget expires are dropped on client close.

Close is safe to call multiple times; subsequent calls are no-ops.

Note: franz-go's Client.Close has documented blocking behaviour around leaving consumer groups, but this client is configured as a producer only (no ConsumeTopics / ConsumePartitions / InstanceID), so the leave-group path is a no-op and Close will not block on it. The only bounded wait here is the explicit Flush above.

func (*KafkaOutput) Name

func (ko *KafkaOutput) Name() string

Name returns the stable identifier for this output.

func (*KafkaOutput) SendEvent

func (ko *KafkaOutput) SendEvent(event *eventrecorderpb.Event) (int, error)

SendEvent serializes the event in the configured format (JSON or protobuf) and queues it for asynchronous delivery. It returns the serialized size (for the bytes-written metric).

type KafkaOutputConfig

type KafkaOutputConfig struct {
	// Brokers is the list of Kafka seed brokers in host:port form.
	Brokers []string `yaml:"brokers" json:"brokers"`
	// Topic is the Kafka topic to produce events to.
	Topic string `yaml:"topic" json:"topic"`
	// ClientID is reported to the Kafka brokers (default "alertmanager").
	ClientID string `yaml:"client_id,omitempty" json:"client_id,omitempty"`
	// Format selects the on-the-wire encoding of each event value:
	// "json" (default, JSON via protojson) or "protobuf" (binary proto).
	Format kafka.Format `yaml:"format,omitempty" json:"format,omitempty"`
	// Acks controls the producer acknowledgement level:
	// "none", "leader" (default), or "all".
	Acks kafka.Acks `yaml:"acks,omitempty" json:"acks,omitempty"`
	// Compression selects the producer compression codec:
	// "" (default, no compression), "none", "gzip", "snappy", "lz4", or "zstd".
	Compression kafka.Compression `yaml:"compression,omitempty" json:"compression,omitempty"`
	// BufferSize is the capacity of the local channel between the event
	// recorder dispatcher and the franz-go producer (default 1024).
	BufferSize int `yaml:"buffer_size,omitempty" json:"buffer_size,omitempty"`
	// TLSConfig configures TLS for the Kafka broker connection.  If unset,
	// PLAINTEXT is used.
	TLSConfig *commoncfg.TLSConfig `yaml:"tls_config,omitempty" json:"tls_config,omitempty"`
}

KafkaOutputConfig configures a Kafka event recorder output.

func (*KafkaOutputConfig) UnmarshalYAML

func (c *KafkaOutputConfig) UnmarshalYAML(unmarshal func(any) error) error

UnmarshalYAML implements the yaml.Unmarshaler interface, validating and normalising the Kafka output configuration. Transport-layer validation (brokers, acks, compression, TLS) is delegated to the shared kafka package so this code and a future Kafka receiver share a single source of truth.

type Recorder

type Recorder struct {
	// contains filtered or unexported fields
}

Recorder is a concrete, non-nil-able handle to an event recorder. Because it is a struct (not an interface), passing nil where a Recorder is expected is a compile-time error.

The zero value (Recorder{}) is safe to use and silently discards all events, but prefer NopRecorder() for clarity.

func NewRecorderFromConfig

func NewRecorderFromConfig(cfg Config, instance string, logger *slog.Logger, r prometheus.Registerer) Recorder

NewRecorderFromConfig builds a Recorder from the given configuration. A background goroutine is started to drain the event queue; call Close to stop it.

func NopRecorder

func NopRecorder() Recorder

NopRecorder returns a Recorder that silently discards all events. Use this in tests or when the event recorder is not configured.

func (Recorder) ApplyConfig

func (r Recorder) ApplyConfig(cfg Config)

ApplyConfig hot-reloads the event recorder configuration. The update is sent to the writeLoop goroutine, which owns the outputs; this method blocks until the writeLoop has acknowledged the update.

func (Recorder) Close

func (r Recorder) Close() error

Close signals the background goroutine to drain remaining events and stop. The writeLoop closes all outputs before returning.

func (Recorder) RecordEvent

func (r Recorder) RecordEvent(ctx context.Context, event *eventrecorderpb.EventData)

RecordEvent wraps the event and places it on a bounded queue for background serialization and delivery. If the queue is full the event is dropped (never blocks the caller). Recording only occurs when the context has been decorated with WithEventRecording.

The expensive protojson.Marshal call is deferred to the write-loop goroutine so that the caller's hot path only pays for the proto wrapping and a channel send.

func (Recorder) SetClusterPeer

func (r Recorder) SetClusterPeer(peer *cluster.Peer)

SetClusterPeer sets the cluster peer for HA position tracking.

type WebhookOutput

type WebhookOutput struct {
	// contains filtered or unexported fields
}

WebhookOutput POSTs each event as a JSON body to a configured URL. Events are processed by a bounded worker pool so that a slow or temporarily unavailable webhook does not block the event recorder queue. Events are dropped (with a log message) when the internal queue is full.

func NewWebhookOutput

func NewWebhookOutput(cfg WebhookOutputConfig, dropsCounter *prometheus.CounterVec, logger *slog.Logger) (*WebhookOutput, error)

NewWebhookOutput creates a new webhook-based event recorder output.

func (*WebhookOutput) Close

func (wo *WebhookOutput) Close() error

Close signals all workers to stop, drains remaining events, and waits. If the drain takes longer than 30 seconds, remaining retries are canceled.

func (*WebhookOutput) Name

func (wo *WebhookOutput) Name() string

Name returns a stable identifier for this output. The URL is sanitized to avoid leaking credentials.

func (*WebhookOutput) SendEvent

func (wo *WebhookOutput) SendEvent(event *eventrecorderpb.Event) (int, error)

SendEvent serializes the event as JSON and queues it for delivery by a worker. It returns the serialized size (for the bytes-written metric). If the internal queue is full the event is dropped and counted via the output-drops metric.

type WebhookOutputConfig

type WebhookOutputConfig struct {
	// URL is the endpoint to POST each event to.
	URL *amcommoncfg.SecretURL `yaml:"url" json:"url"`
	// HTTPConfig configures the HTTP client used for webhook delivery.
	HTTPConfig *commoncfg.HTTPClientConfig `yaml:"http_config,omitempty" json:"http_config,omitempty"`
	// Timeout for webhook HTTP requests (default 10s).
	Timeout model.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"`
	// Workers is the number of concurrent delivery goroutines (default 4).
	Workers int `yaml:"workers,omitempty" json:"workers,omitempty"`
	// MaxRetries is the maximum number of delivery attempts per event
	// (default 3).
	MaxRetries int `yaml:"max_retries,omitempty" json:"max_retries,omitempty"`
	// RetryBackoff is the base backoff between retry attempts (default
	// 500ms).  Successive attempts use exponential backoff (base *
	// 2^attempt).
	RetryBackoff model.Duration `yaml:"retry_backoff,omitempty" json:"retry_backoff,omitempty"`
}

WebhookOutputConfig configures an HTTP webhook event recorder output.

func (*WebhookOutputConfig) UnmarshalYAML

func (c *WebhookOutputConfig) UnmarshalYAML(unmarshal func(any) error) error

UnmarshalYAML implements the yaml.Unmarshaler interface, validating the webhook output configuration.

Note: SecretURL.UnmarshalYAML delegates to ParseURL, which already enforces a non-empty host and an http(s) scheme. The only way an otherwise-valid config reaches this function with a degenerate URL is via the "<secret>" placeholder shortcut in SecretURL.UnmarshalYAML, which sets URL to an empty url.URL{}. We catch that case here.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL