Documentation
¶
Overview ¶
Package eventrecorder provides a structured event recorder for significant Alertmanager events. Events are serialized as JSON and fanned out to one or more configured destinations (JSONL file, webhook, kafka).
RecordEvent never blocks the caller: events are serialized and placed on a bounded in-memory queue. A background goroutine drains the queue and sends to destinations. If the queue is full, events are dropped and a metric is incremented.
Package layout:
- recorder.go Recorder core: types, write loop, fan-out.
- metrics.go Prometheus metric definitions.
- events.go Pure proto-conversion helpers and event constructors.
- config.go Top-level Config: per-type output lists + equality.
- file.go File output and its config.
- webhook.go Webhook output and its config.
- kafka.go Kafka output and its config.
Index ¶
- func AlertAsProto(alert *types.Alert) *eventrecorderpb.Alert
- func EventRecordingEnabled(ctx context.Context) bool
- func InhibitRuleAsProto(sourceMatchers, targetMatchers labels.Matchers, ...) *eventrecorderpb.InhibitRule
- func LabelSetAsProto(ls model.LabelSet) *eventrecorderpb.LabelSet
- func MatcherAsProto(m *labels.Matcher) *eventrecorderpb.Matcher
- func MatchersAsProto(matchers labels.Matchers) []*eventrecorderpb.Matcher
- func NewAlertCreatedEvent(alert *types.Alert) *eventrecorderpb.EventData
- func NewInhibitionMutedAlertEvent(rules []*eventrecorderpb.InhibitRule, fp model.Fingerprint, ...) *eventrecorderpb.EventData
- func NewSilenceCreatedEvent(silence *eventrecorderpb.Silence) *eventrecorderpb.EventData
- func NewSilenceMutedAlertEvent(silence *eventrecorderpb.Silence, fp model.Fingerprint, lset model.LabelSet) *eventrecorderpb.EventData
- func NewSilenceUpdatedEvent(silence *eventrecorderpb.Silence) *eventrecorderpb.EventData
- func SilenceAsProto(sil *silencepb.Silence) *eventrecorderpb.Silence
- func SilenceMatcherAsProto(m *silencepb.Matcher) *eventrecorderpb.Matcher
- func WithEventRecording(ctx context.Context) context.Context
- type Config
- type Destination
- type FileOutput
- type FileOutputConfig
- type KafkaOutput
- type KafkaOutputConfig
- type Recorder
- type WebhookOutput
- type WebhookOutputConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AlertAsProto ¶
func AlertAsProto(alert *types.Alert) *eventrecorderpb.Alert
AlertAsProto converts a types.Alert to an eventrecorderpb.Alert.
func EventRecordingEnabled ¶
EventRecordingEnabled reports whether event recording has been enabled in the given context via WithEventRecording.
func InhibitRuleAsProto ¶
func InhibitRuleAsProto(sourceMatchers, targetMatchers labels.Matchers, equal map[model.LabelName]struct{}) *eventrecorderpb.InhibitRule
InhibitRuleAsProto converts inhibit rule fields to an eventrecorderpb.InhibitRule. It accepts the individual fields rather than the InhibitRule struct to avoid an import cycle.
func LabelSetAsProto ¶
func LabelSetAsProto(ls model.LabelSet) *eventrecorderpb.LabelSet
LabelSetAsProto converts a model.LabelSet to an eventrecorderpb.LabelSet. Labels are sorted by name for deterministic output.
func MatcherAsProto ¶
func MatcherAsProto(m *labels.Matcher) *eventrecorderpb.Matcher
MatcherAsProto converts a single *labels.Matcher to its protobuf representation.
func MatchersAsProto ¶
func MatchersAsProto(matchers labels.Matchers) []*eventrecorderpb.Matcher
MatchersAsProto converts a slice of matchers to their protobuf representations.
func NewAlertCreatedEvent ¶
func NewAlertCreatedEvent(alert *types.Alert) *eventrecorderpb.EventData
NewAlertCreatedEvent constructs an AlertCreated event.
func NewInhibitionMutedAlertEvent ¶
func NewInhibitionMutedAlertEvent(rules []*eventrecorderpb.InhibitRule, fp model.Fingerprint, lset model.LabelSet, inhibitingFPs []model.Fingerprint) *eventrecorderpb.EventData
NewInhibitionMutedAlertEvent constructs an InhibitionMutedAlert event.
func NewSilenceCreatedEvent ¶
func NewSilenceCreatedEvent(silence *eventrecorderpb.Silence) *eventrecorderpb.EventData
NewSilenceCreatedEvent constructs a SilenceCreated event.
func NewSilenceMutedAlertEvent ¶
func NewSilenceMutedAlertEvent(silence *eventrecorderpb.Silence, fp model.Fingerprint, lset model.LabelSet) *eventrecorderpb.EventData
NewSilenceMutedAlertEvent constructs a SilenceMutedAlert event.
func NewSilenceUpdatedEvent ¶
func NewSilenceUpdatedEvent(silence *eventrecorderpb.Silence) *eventrecorderpb.EventData
NewSilenceUpdatedEvent constructs a SilenceUpdated event.
func SilenceAsProto ¶
func SilenceAsProto(sil *silencepb.Silence) *eventrecorderpb.Silence
SilenceAsProto converts a silencepb.Silence to an eventrecorderpb.Silence.
func SilenceMatcherAsProto ¶
func SilenceMatcherAsProto(m *silencepb.Matcher) *eventrecorderpb.Matcher
SilenceMatcherAsProto converts a silencepb.Matcher to an eventrecorderpb.Matcher.
Types ¶
type Config ¶
type Config struct {
FileOutputs []FileOutputConfig `yaml:"file_outputs,omitempty" json:"file_outputs,omitempty"`
WebhookOutputs []WebhookOutputConfig `yaml:"webhook_outputs,omitempty" json:"webhook_outputs,omitempty"`
KafkaOutputs []KafkaOutputConfig `yaml:"kafka_outputs,omitempty" json:"kafka_outputs,omitempty"`
}
Config configures the event recorder feature.
Outputs are grouped by type, one list per destination kind, mirroring the way receivers group their integrations (e.g. webhook_configs, email_configs). Every recorded event is fanned out to every output across all lists.
type Destination ¶
type Destination interface {
// Name returns a stable identifier for this destination, suitable
// for use as a Prometheus label value (e.g. "file:/var/log/events.jsonl"
// or "webhook:https://example.com/hook").
Name() string
// SendEvent encodes and delivers the event. It returns the number
// of payload bytes written (for the bytes-written metric) and any
// delivery error. A serialization failure should be returned
// wrapped in *serializeError so the recorder can attribute it to
// the serialize-errors metric.
SendEvent(event *eventrecorderpb.Event) (size int, err error)
io.Closer
}
Destination is a single event destination. Each implementation owns its own serialization: it receives the structured event and is responsible for encoding it (e.g. JSON or protobuf) and delivering it.
Owning serialization per destination — rather than handing every destination a pre-encoded JSON blob — avoids the footgun of, say, a protobuf-configured Kafka output silently shipping a JSON payload.
type FileOutput ¶
type FileOutput struct {
// contains filtered or unexported fields
}
FileOutput writes pre-serialized JSON event bytes to a JSONL file. The file is reopened when fsnotify detects a rename or remove (e.g. logrotate).
func NewFileOutput ¶
func NewFileOutput(path string, logger *slog.Logger) (*FileOutput, error)
NewFileOutput creates a new file-based event recorder output at the given path. The file is watched with fsnotify so that external log rotation tools (e.g., logrotate) trigger an immediate reopen.
func (*FileOutput) Close ¶
func (fo *FileOutput) Close() error
Close stops the watcher goroutine, waits for it to exit, and closes the file.
func (*FileOutput) Name ¶
func (fo *FileOutput) Name() string
Name returns a stable identifier for this output.
func (*FileOutput) SendEvent ¶
func (fo *FileOutput) SendEvent(event *eventrecorderpb.Event) (int, error)
SendEvent serializes the event as a JSON line and appends it to the file. It returns the number of bytes written (including the trailing newline) for the bytes-written metric.
type FileOutputConfig ¶
type FileOutputConfig struct {
// Path is the JSONL file to append events to. Created if absent.
Path string `yaml:"path" json:"path"`
}
FileOutputConfig configures a JSONL file event recorder output.
func (*FileOutputConfig) UnmarshalYAML ¶
func (c *FileOutputConfig) UnmarshalYAML(unmarshal func(any) error) error
UnmarshalYAML implements the yaml.Unmarshaler interface, validating the file output configuration.
type KafkaOutput ¶
type KafkaOutput struct {
// contains filtered or unexported fields
}
KafkaOutput delivers serialized events to a Kafka topic via franz-go. Events are buffered in a bounded local channel and produced by a single dispatcher goroutine; franz-go handles batching, compression, and retries internally.
When the local buffer is full, events are dropped (with a log message and a metric increment) so that a slow or unreachable broker cannot block the upstream event recorder pipeline.
func NewKafkaOutput ¶
func NewKafkaOutput( cfg KafkaOutputConfig, instance string, dropsCounter *prometheus.CounterVec, produceErrors *prometheus.CounterVec, logger *slog.Logger, ) (*KafkaOutput, error)
NewKafkaOutput constructs a KafkaOutput from the supplied configuration. A failure to reach the brokers at startup is logged at warn level but does not fail construction; franz-go retries connections in the background and records are buffered until delivery becomes possible.
func (*KafkaOutput) Close ¶
func (ko *KafkaOutput) Close() error
Close stops the dispatcher, drains any remaining buffered records into franz-go's producer, flushes the producer (up to flushBudget), and then closes the underlying client. Pending records that do not flush before the budget expires are dropped on client close.
Close is safe to call multiple times; subsequent calls are no-ops.
Note: franz-go's Client.Close has documented blocking behaviour around leaving consumer groups, but this client is configured as a producer only (no ConsumeTopics / ConsumePartitions / InstanceID), so the leave-group path is a no-op and Close will not block on it. The only bounded wait here is the explicit Flush above.
func (*KafkaOutput) Name ¶
func (ko *KafkaOutput) Name() string
Name returns the stable identifier for this output.
func (*KafkaOutput) SendEvent ¶
func (ko *KafkaOutput) SendEvent(event *eventrecorderpb.Event) (int, error)
SendEvent serializes the event in the configured format (JSON or protobuf) and queues it for asynchronous delivery. It returns the serialized size (for the bytes-written metric).
type KafkaOutputConfig ¶
type KafkaOutputConfig struct {
// Brokers is the list of Kafka seed brokers in host:port form.
Brokers []string `yaml:"brokers" json:"brokers"`
// Topic is the Kafka topic to produce events to.
Topic string `yaml:"topic" json:"topic"`
// ClientID is reported to the Kafka brokers (default "alertmanager").
ClientID string `yaml:"client_id,omitempty" json:"client_id,omitempty"`
// Format selects the on-the-wire encoding of each event value:
// "json" (default, JSON via protojson) or "protobuf" (binary proto).
Format kafka.Format `yaml:"format,omitempty" json:"format,omitempty"`
// Acks controls the producer acknowledgement level:
// "none", "leader" (default), or "all".
Acks kafka.Acks `yaml:"acks,omitempty" json:"acks,omitempty"`
// Compression selects the producer compression codec:
// "" (default, no compression), "none", "gzip", "snappy", "lz4", or "zstd".
Compression kafka.Compression `yaml:"compression,omitempty" json:"compression,omitempty"`
// BufferSize is the capacity of the local channel between the event
// recorder dispatcher and the franz-go producer (default 1024).
BufferSize int `yaml:"buffer_size,omitempty" json:"buffer_size,omitempty"`
// TLSConfig configures TLS for the Kafka broker connection. If unset,
// PLAINTEXT is used.
TLSConfig *commoncfg.TLSConfig `yaml:"tls_config,omitempty" json:"tls_config,omitempty"`
}
KafkaOutputConfig configures a Kafka event recorder output.
func (*KafkaOutputConfig) UnmarshalYAML ¶
func (c *KafkaOutputConfig) UnmarshalYAML(unmarshal func(any) error) error
UnmarshalYAML implements the yaml.Unmarshaler interface, validating and normalising the Kafka output configuration. Transport-layer validation (brokers, acks, compression, TLS) is delegated to the shared kafka package so this code and a future Kafka receiver share a single source of truth.
type Recorder ¶
type Recorder struct {
// contains filtered or unexported fields
}
Recorder is a concrete, non-nil-able handle to an event recorder. Because it is a struct (not an interface), passing nil where a Recorder is expected is a compile-time error.
The zero value (Recorder{}) is safe to use and silently discards all events, but prefer NopRecorder() for clarity.
func NewRecorderFromConfig ¶
func NewRecorderFromConfig(cfg Config, instance string, logger *slog.Logger, r prometheus.Registerer) Recorder
NewRecorderFromConfig builds a Recorder from the given configuration. A background goroutine is started to drain the event queue; call Close to stop it.
func NopRecorder ¶
func NopRecorder() Recorder
NopRecorder returns a Recorder that silently discards all events. Use this in tests or when the event recorder is not configured.
func (Recorder) ApplyConfig ¶
ApplyConfig hot-reloads the event recorder configuration. The update is sent to the writeLoop goroutine, which owns the outputs; this method blocks until the writeLoop has acknowledged the update.
func (Recorder) Close ¶
Close signals the background goroutine to drain remaining events and stop. The writeLoop closes all outputs before returning.
func (Recorder) RecordEvent ¶
func (r Recorder) RecordEvent(ctx context.Context, event *eventrecorderpb.EventData)
RecordEvent wraps the event and places it on a bounded queue for background serialization and delivery. If the queue is full the event is dropped (never blocks the caller). Recording only occurs when the context has been decorated with WithEventRecording.
The expensive protojson.Marshal call is deferred to the write-loop goroutine so that the caller's hot path only pays for the proto wrapping and a channel send.
func (Recorder) SetClusterPeer ¶
SetClusterPeer sets the cluster peer for HA position tracking.
type WebhookOutput ¶
type WebhookOutput struct {
// contains filtered or unexported fields
}
WebhookOutput POSTs each event as a JSON body to a configured URL. Events are processed by a bounded worker pool so that a slow or temporarily unavailable webhook does not block the event recorder queue. Events are dropped (with a log message) when the internal queue is full.
func NewWebhookOutput ¶
func NewWebhookOutput(cfg WebhookOutputConfig, dropsCounter *prometheus.CounterVec, logger *slog.Logger) (*WebhookOutput, error)
NewWebhookOutput creates a new webhook-based event recorder output.
func (*WebhookOutput) Close ¶
func (wo *WebhookOutput) Close() error
Close signals all workers to stop, drains remaining events, and waits. If the drain takes longer than 30 seconds, remaining retries are canceled.
func (*WebhookOutput) Name ¶
func (wo *WebhookOutput) Name() string
Name returns a stable identifier for this output. The URL is sanitized to avoid leaking credentials.
func (*WebhookOutput) SendEvent ¶
func (wo *WebhookOutput) SendEvent(event *eventrecorderpb.Event) (int, error)
SendEvent serializes the event as JSON and queues it for delivery by a worker. It returns the serialized size (for the bytes-written metric). If the internal queue is full the event is dropped and counted via the output-drops metric.
type WebhookOutputConfig ¶
type WebhookOutputConfig struct {
// URL is the endpoint to POST each event to.
URL *amcommoncfg.SecretURL `yaml:"url" json:"url"`
// HTTPConfig configures the HTTP client used for webhook delivery.
HTTPConfig *commoncfg.HTTPClientConfig `yaml:"http_config,omitempty" json:"http_config,omitempty"`
// Timeout for webhook HTTP requests (default 10s).
Timeout model.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"`
// Workers is the number of concurrent delivery goroutines (default 4).
Workers int `yaml:"workers,omitempty" json:"workers,omitempty"`
// MaxRetries is the maximum number of delivery attempts per event
// (default 3).
MaxRetries int `yaml:"max_retries,omitempty" json:"max_retries,omitempty"`
// RetryBackoff is the base backoff between retry attempts (default
// 500ms). Successive attempts use exponential backoff (base *
// 2^attempt).
RetryBackoff model.Duration `yaml:"retry_backoff,omitempty" json:"retry_backoff,omitempty"`
}
WebhookOutputConfig configures an HTTP webhook event recorder output.
func (*WebhookOutputConfig) UnmarshalYAML ¶
func (c *WebhookOutputConfig) UnmarshalYAML(unmarshal func(any) error) error
UnmarshalYAML implements the yaml.Unmarshaler interface, validating the webhook output configuration.
Note: SecretURL.UnmarshalYAML delegates to ParseURL, which already enforces a non-empty host and an http(s) scheme. The only way an otherwise-valid config reaches this function with a degenerate URL is via the "<secret>" placeholder shortcut in SecretURL.UnmarshalYAML, which sets URL to an empty url.URL{}. We catch that case here.