Documentation
¶
Overview ¶
Package sinks provides functionality to store monitored data in different ways.
At the moment we provide sink connectors for
- PostgreSQL and flavours,
- Prometheus,
- plain JSON files,
- and RPC servers.
To ensure the simultaneous storage of data in several storages, the `MultiWriter` class is implemented.
Index ¶
- Constants
- Variables
- func LoadTLSCredentials(CAFile string) (credentials.TransportCredentials, error)
- func NewPostgresSinkMigrator(ctx context.Context, connStr string) (db.Migrator, error)
- type CmdOpts
- type DbStorageSchemaType
- type ExistingPartitionInfo
- type JSONWriter
- type MeasurementMessagePostgres
- type MetricsDefiner
- type MultiWriter
- func (mw *MultiWriter) AddWriter(w Writer)
- func (mw *MultiWriter) Count() int
- func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
- func (mw *MultiWriter) Migrate() (err error)
- func (mw *MultiWriter) NeedsMigration() (bool, error)
- func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error)
- func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)
- type PostgresWriter
- func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
- func (pgw *PostgresWriter) DeleteOldPartitions()
- func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
- func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error)
- func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
- func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error)
- func (pgw *PostgresWriter) MaintainUniqueSources()
- func (pgw *PostgresWriter) Migrate() error
- func (pgw *PostgresWriter) NeedsMigration() (bool, error)
- func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
- func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
- func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error
- type PromMetricCache
- type PrometheusWriter
- func (promw *PrometheusWriter) AddCacheEntry(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)
- func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
- func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
- func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
- func (promw *PrometheusWriter) InitCacheEntry(dbUnique string)
- func (promw *PrometheusWriter) Println(v ...any)
- func (promw *PrometheusWriter) PurgeCacheEntry(dbUnique, metric string)
- func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
- func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error
- func (promw *PrometheusWriter) WritePromMetrics(msg metrics.MeasurementEnvelope, ch chan<- prometheus.Metric) (written int, errorCount int)
- type RPCWriter
- type SyncOp
- type Writer
Constants ¶
const MigrationsCount = 1
MigrationsCount is the total number of migrations in admin.migration table
Variables ¶
var ErrNeedsMigration = errors.New("sink database schema is outdated, please run migrations using `pgwatch config upgrade` command")
Functions ¶
func LoadTLSCredentials ¶
func LoadTLSCredentials(CAFile string) (credentials.TransportCredentials, error)
Types ¶
type CmdOpts ¶
type CmdOpts struct {
Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
BatchingDelay time.Duration `` /* 170-byte string literal not displayed */
PartitionInterval string `` /* 203-byte string literal not displayed */
RetentionInterval string `` /* 161-byte string literal not displayed */
MaintenanceInterval string `` /* 273-byte string literal not displayed */
RealDbnameField string `` /* 151-byte string literal not displayed */
SystemIdentifierField string `` /* 169-byte string literal not displayed */
}
CmdOpts specifies the storage configuration to store metrics measurements
type DbStorageSchemaType ¶
type DbStorageSchemaType int
const ( DbStorageSchemaPostgres DbStorageSchemaType = iota DbStorageSchemaTimescale )
type ExistingPartitionInfo ¶
type JSONWriter ¶
type JSONWriter struct {
// contains filtered or unexported fields
}
JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.
func NewJSONWriter ¶
func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)
func (*JSONWriter) SyncMetric ¶
func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error
func (*JSONWriter) Write ¶
func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error
type MetricsDefiner ¶
MetricDefiner is an interface for passing metric definitions to a sink.
type MultiWriter ¶
MultiWriter ensures the simultaneous storage of data in several storages.
func (*MultiWriter) AddWriter ¶
func (mw *MultiWriter) AddWriter(w Writer)
func (*MultiWriter) Count ¶
func (mw *MultiWriter) Count() int
func (*MultiWriter) DefineMetrics ¶
func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
func (*MultiWriter) Migrate ¶
func (mw *MultiWriter) Migrate() (err error)
Migrate runs migrations on all writers that support it
func (*MultiWriter) NeedsMigration ¶
func (mw *MultiWriter) NeedsMigration() (bool, error)
NeedsMigration checks if any writer needs migration
func (*MultiWriter) SyncMetric ¶
func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error)
func (*MultiWriter) Write ¶
func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)
type PostgresWriter ¶
type PostgresWriter struct {
// contains filtered or unexported fields
}
PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
func NewPostgresWriter ¶
func NewWriterFromPostgresConn ¶
func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)
func (*PostgresWriter) AddDBUniqueMetricToListingTable ¶
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
func (*PostgresWriter) DeleteOldPartitions ¶
func (pgw *PostgresWriter) DeleteOldPartitions()
DeleteOldPartitions is a background task that deletes old partitions from the measurements DB
func (*PostgresWriter) EnsureBuiltinMetricDummies ¶
func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
func (*PostgresWriter) EnsureMetricDbnameTime ¶
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error)
func (*PostgresWriter) EnsureMetricDummy ¶
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
func (*PostgresWriter) EnsureMetricTimescale ¶
func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error)
func (*PostgresWriter) MaintainUniqueSources ¶
func (pgw *PostgresWriter) MaintainUniqueSources()
MaintainUniqueSources is a background task that maintains a mapping of unique sources in each metric table in admin.all_distinct_dbname_metrics. This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (*PostgresWriter) Migrate ¶
func (pgw *PostgresWriter) Migrate() error
Migrate upgrades database with all migrations
func (*PostgresWriter) NeedsMigration ¶
func (pgw *PostgresWriter) NeedsMigration() (bool, error)
NeedsMigration checks if database needs migration
func (*PostgresWriter) ReadMetricSchemaType ¶
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
func (*PostgresWriter) SyncMetric ¶
func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
SyncMetric ensures that tables exist for newly added metrics and/or sources
func (*PostgresWriter) Write ¶
func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error
Write sends the measurements to the cache channel
type PromMetricCache ¶
type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data
type PrometheusWriter ¶
type PrometheusWriter struct {
sync.RWMutex
Namespace string
Cache PromMetricCache // [dbUnique][metric]lastly_fetched_data
// contains filtered or unexported fields
}
PrometheusWriter is a Prometheus exporter that implements the prometheus.Collector interface using the "unchecked collector" pattern (empty Describe method).
Design decisions based on Prometheus exporter guidelines (https://prometheus.io/docs/instrumenting/writing_exporters/#collectors):
Metrics are collected periodically by reaper and cached in-memory. On scrape, the collector reads a snapshot of the cache and emits fresh NewConstMetric values. The cache is NOT consumed on scrape, so parallel or back-to-back scrapes see the same data until the next Write() updates arrive.
This is an "unchecked collector": Describe() sends no descriptors, which tells the Prometheus registry to skip consistency checks. This is necessary because the set of metrics is dynamic (driven by monitored databases and their query results). Safety is ensured by deduplicating metric identities within each Collect() call.
Label keys are always sorted lexicographically before building descriptors and label value slices. This guarantees deterministic descriptor identity regardless of Go map iteration order.
func NewPrometheusWriter ¶
func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)
func (*PrometheusWriter) AddCacheEntry ¶ added in v5.1.0
func (promw *PrometheusWriter) AddCacheEntry(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)
func (*PrometheusWriter) Collect ¶
func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
Collect implements prometheus.Collector. It reads a snapshot of the metric cache and emits const metrics. Parallel scrapes see the same data until background Write() calls update it
func (*PrometheusWriter) DefineMetrics ¶
func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
DefineMetrics is called by reaper on startup and whenever metric definitions change
func (*PrometheusWriter) Describe ¶
func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
Describe is intentionally empty to makes PrometheusWriter an "unchecked collector" per the prometheus.Collector contract
func (*PrometheusWriter) InitCacheEntry ¶ added in v5.1.0
func (promw *PrometheusWriter) InitCacheEntry(dbUnique string)
func (*PrometheusWriter) Println ¶
func (promw *PrometheusWriter) Println(v ...any)
Println implements promhttp.Logger
func (*PrometheusWriter) PurgeCacheEntry ¶ added in v5.1.0
func (promw *PrometheusWriter) PurgeCacheEntry(dbUnique, metric string)
func (*PrometheusWriter) SyncMetric ¶
func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
SyncMetric is called by reaper when a metric or monitored source is removed or added, allowing the writer to purge or initialize cache entries as needed
func (*PrometheusWriter) Write ¶
func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error
Write is called by reaper whenever new measurement data arrives
func (*PrometheusWriter) WritePromMetrics ¶ added in v5.1.0
func (promw *PrometheusWriter) WritePromMetrics(msg metrics.MeasurementEnvelope, ch chan<- prometheus.Metric) (written int, errorCount int)
WritePromMetrics converts a MeasurementEnvelope into Prometheus const metrics and sends them directly to ch. Returns the count of metrics written and errors encountered.
type RPCWriter ¶
type RPCWriter struct {
// contains filtered or unexported fields
}
RPCWriter sends metric measurements to a remote server using gRPC. Remote servers should make use the .proto file under api/pb/ to integrate with it. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.
func (*RPCWriter) DefineMetrics ¶
DefineMetrics sends metric definitions to the remote server
func (*RPCWriter) SyncMetric ¶
SyncMetric synchronizes a metric and monitored source with the remote server
type SyncOp ¶
type SyncOp int32
SyncOp represents synchronization operations for metrics. These constants are used both in Go code and protobuf definitions.