sinks

package
v5.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: BSD-3-Clause Imports: 35 Imported by: 0

Documentation

Overview

Package sinks provides functionality to store monitored data in different ways.

At the moment we provide sink connectors for

  • PostgreSQL and flavours,
  • Prometheus,
  • plain JSON files,
  • and RPC servers.

To ensure the simultaneous storage of data in several storages, the `MultiWriter` class is implemented.

Index

Constants

View Source
const MigrationsCount = 1

MigrationsCount is the total number of migrations in admin.migration table

Variables

View Source
var ErrNeedsMigration = errors.New("sink database schema is outdated, please run migrations using `pgwatch config upgrade` command")

Functions

func LoadTLSCredentials

func LoadTLSCredentials(CAFile string) (credentials.TransportCredentials, error)

func NewPostgresSinkMigrator

func NewPostgresSinkMigrator(ctx context.Context, connStr string) (db.Migrator, error)

Types

type CmdOpts

type CmdOpts struct {
	Sinks                 []string      `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
	BatchingDelay         time.Duration `` /* 170-byte string literal not displayed */
	PartitionInterval     string        `` /* 203-byte string literal not displayed */
	RetentionInterval     string        `` /* 161-byte string literal not displayed */
	MaintenanceInterval   string        `` /* 273-byte string literal not displayed */
	RealDbnameField       string        `` /* 151-byte string literal not displayed */
	SystemIdentifierField string        `` /* 169-byte string literal not displayed */
}

CmdOpts specifies the storage configuration to store metrics measurements

type DbStorageSchemaType

type DbStorageSchemaType int
const (
	DbStorageSchemaPostgres DbStorageSchemaType = iota
	DbStorageSchemaTimescale
)

type ExistingPartitionInfo

type ExistingPartitionInfo struct {
	StartTime time.Time
	EndTime   time.Time
}

type JSONWriter

type JSONWriter struct {
	// contains filtered or unexported fields
}

JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.

func NewJSONWriter

func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)

func (*JSONWriter) SyncMetric

func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error

func (*JSONWriter) Write

type MeasurementMessagePostgres

type MeasurementMessagePostgres struct {
	Time    time.Time
	DBName  string
	Metric  string
	Data    map[string]any
	TagData map[string]string
}

type MetricsDefiner

type MetricsDefiner interface {
	DefineMetrics(metrics *metrics.Metrics) error
}

MetricDefiner is an interface for passing metric definitions to a sink.

type MultiWriter

type MultiWriter struct {
	sync.Mutex
	// contains filtered or unexported fields
}

MultiWriter ensures the simultaneous storage of data in several storages.

func (*MultiWriter) AddWriter

func (mw *MultiWriter) AddWriter(w Writer)

func (*MultiWriter) Count

func (mw *MultiWriter) Count() int

func (*MultiWriter) DefineMetrics

func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)

func (*MultiWriter) Migrate

func (mw *MultiWriter) Migrate() (err error)

Migrate runs migrations on all writers that support it

func (*MultiWriter) NeedsMigration

func (mw *MultiWriter) NeedsMigration() (bool, error)

NeedsMigration checks if any writer needs migration

func (*MultiWriter) SyncMetric

func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error)

func (*MultiWriter) Write

func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)

type PostgresWriter

type PostgresWriter struct {
	// contains filtered or unexported fields
}

PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.

func NewPostgresWriter

func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error)

func NewWriterFromPostgresConn

func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)

func (*PostgresWriter) AddDBUniqueMetricToListingTable

func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error

func (*PostgresWriter) DeleteOldPartitions

func (pgw *PostgresWriter) DeleteOldPartitions()

DeleteOldPartitions is a background task that deletes old partitions from the measurements DB

func (*PostgresWriter) EnsureBuiltinMetricDummies

func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)

EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist

func (*PostgresWriter) EnsureMetricDbnameTime

func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error)

func (*PostgresWriter) EnsureMetricDummy

func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)

EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist

func (*PostgresWriter) EnsureMetricTimescale

func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error)

func (*PostgresWriter) MaintainUniqueSources

func (pgw *PostgresWriter) MaintainUniqueSources()

MaintainUniqueSources is a background task that maintains a mapping of unique sources in each metric table in admin.all_distinct_dbname_metrics. This is used to avoid listing the same source multiple times in Grafana dropdowns.

func (*PostgresWriter) Migrate

func (pgw *PostgresWriter) Migrate() error

Migrate upgrades database with all migrations

func (*PostgresWriter) NeedsMigration

func (pgw *PostgresWriter) NeedsMigration() (bool, error)

NeedsMigration checks if database needs migration

func (*PostgresWriter) ReadMetricSchemaType

func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)

func (*PostgresWriter) SyncMetric

func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error

SyncMetric ensures that tables exist for newly added metrics and/or sources

func (*PostgresWriter) Write

Write sends the measurements to the cache channel

type PromMetricCache

type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data

type PrometheusWriter

type PrometheusWriter struct {
	sync.RWMutex

	Namespace string
	Cache     PromMetricCache // [dbUnique][metric]lastly_fetched_data
	// contains filtered or unexported fields
}

PrometheusWriter is a Prometheus exporter that implements the prometheus.Collector interface using the "unchecked collector" pattern (empty Describe method).

Design decisions based on Prometheus exporter guidelines (https://prometheus.io/docs/instrumenting/writing_exporters/#collectors):

  • Metrics are collected periodically by reaper and cached in-memory. On scrape, the collector reads a snapshot of the cache and emits fresh NewConstMetric values. The cache is NOT consumed on scrape, so parallel or back-to-back scrapes see the same data until the next Write() updates arrive.

  • This is an "unchecked collector": Describe() sends no descriptors, which tells the Prometheus registry to skip consistency checks. This is necessary because the set of metrics is dynamic (driven by monitored databases and their query results). Safety is ensured by deduplicating metric identities within each Collect() call.

  • Label keys are always sorted lexicographically before building descriptors and label value slices. This guarantees deterministic descriptor identity regardless of Go map iteration order.

func NewPrometheusWriter

func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)

func (*PrometheusWriter) AddCacheEntry added in v5.1.0

func (promw *PrometheusWriter) AddCacheEntry(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)

func (*PrometheusWriter) Collect

func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)

Collect implements prometheus.Collector. It reads a snapshot of the metric cache and emits const metrics. Parallel scrapes see the same data until background Write() calls update it

func (*PrometheusWriter) DefineMetrics

func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)

DefineMetrics is called by reaper on startup and whenever metric definitions change

func (*PrometheusWriter) Describe

func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)

Describe is intentionally empty to makes PrometheusWriter an "unchecked collector" per the prometheus.Collector contract

func (*PrometheusWriter) InitCacheEntry added in v5.1.0

func (promw *PrometheusWriter) InitCacheEntry(dbUnique string)

func (*PrometheusWriter) Println

func (promw *PrometheusWriter) Println(v ...any)

Println implements promhttp.Logger

func (*PrometheusWriter) PurgeCacheEntry added in v5.1.0

func (promw *PrometheusWriter) PurgeCacheEntry(dbUnique, metric string)

func (*PrometheusWriter) SyncMetric

func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error

SyncMetric is called by reaper when a metric or monitored source is removed or added, allowing the writer to purge or initialize cache entries as needed

func (*PrometheusWriter) Write

Write is called by reaper whenever new measurement data arrives

func (*PrometheusWriter) WritePromMetrics added in v5.1.0

func (promw *PrometheusWriter) WritePromMetrics(msg metrics.MeasurementEnvelope, ch chan<- prometheus.Metric) (written int, errorCount int)

WritePromMetrics converts a MeasurementEnvelope into Prometheus const metrics and sends them directly to ch. Returns the count of metrics written and errors encountered.

type RPCWriter

type RPCWriter struct {
	// contains filtered or unexported fields
}

RPCWriter sends metric measurements to a remote server using gRPC. Remote servers should make use the .proto file under api/pb/ to integrate with it. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.

func NewRPCWriter

func NewRPCWriter(ctx context.Context, connStr string) (*RPCWriter, error)

func (*RPCWriter) DefineMetrics

func (rw *RPCWriter) DefineMetrics(metrics *metrics.Metrics) error

DefineMetrics sends metric definitions to the remote server

func (*RPCWriter) Ping

func (rw *RPCWriter) Ping() error

func (*RPCWriter) SyncMetric

func (rw *RPCWriter) SyncMetric(sourceName, metricName string, op SyncOp) error

SyncMetric synchronizes a metric and monitored source with the remote server

func (*RPCWriter) Write

func (rw *RPCWriter) Write(msg metrics.MeasurementEnvelope) error

Sends Measurement Message to RPC Sink

type SyncOp

type SyncOp int32

SyncOp represents synchronization operations for metrics. These constants are used both in Go code and protobuf definitions.

const (
	// InvalidOp represents an invalid or unrecognized operation
	InvalidOp SyncOp = iota // 0
	// AddOp represents adding a new metric
	AddOp // 1
	// DeleteOp represents deleting an existing metric or entire source
	DeleteOp // 2
	// DefineOp represents defining metric definitions
	DefineOp // 3
)

func (SyncOp) String

func (s SyncOp) String() string

String returns the string representation of the SyncOp

type Writer

type Writer interface {
	SyncMetric(sourceName, metricName string, op SyncOp) error
	Write(msgs metrics.MeasurementEnvelope) error
}

Writer is an interface that writes metrics values

func NewSinkWriter

func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error)

NewSinkWriter creates and returns new instance of MultiWriter struct.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL