Documentation
¶
Overview ¶
Package sinks provides functionality to store monitored data in different ways.
At the moment we provide sink connectors for
- PostgreSQL and flavours,
- Prometheus,
- plain JSON files,
- and RPC servers.
To ensure the simultaneous storage of data in several storages, the `MultiWriter` class is implemented.
Index ¶
- Constants
- Variables
- func LoadTLSCredentials(CAFile string) (credentials.TransportCredentials, error)
- func NewPostgresSinkMigrator(ctx context.Context, connStr string) (db.Migrator, error)
- type CmdOpts
- type DbStorageSchemaType
- type ExistingPartitionInfo
- type JSONWriter
- type MeasurementMessagePostgres
- type MetricsDefiner
- type MultiWriter
- func (mw *MultiWriter) AddWriter(w Writer)
- func (mw *MultiWriter) Count() int
- func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
- func (mw *MultiWriter) Migrate() (err error)
- func (mw *MultiWriter) NeedsMigration() (bool, error)
- func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error)
- func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)
- type PostgresWriter
- func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
- func (pgw *PostgresWriter) DeleteOldPartitions()
- func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
- func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error)
- func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
- func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error)
- func (pgw *PostgresWriter) MaintainUniqueSources()
- func (pgw *PostgresWriter) Migrate() error
- func (pgw *PostgresWriter) NeedsMigration() (bool, error)
- func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
- func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
- func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error
- type PromMetricCache
- type PrometheusWriter
- func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
- func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
- func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
- func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric
- func (promw *PrometheusWriter) Println(v ...any)
- func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)
- func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)
- func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)
- func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
- func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error
- type RPCWriter
- type SyncOp
- type Writer
Constants ¶
const MigrationsCount = 1
MigrationsCount is the total number of migrations in admin.migration table
Variables ¶
var ErrNeedsMigration = errors.New("sink database schema is outdated, please run migrations using `pgwatch config upgrade` command")
Functions ¶
func LoadTLSCredentials ¶
func LoadTLSCredentials(CAFile string) (credentials.TransportCredentials, error)
Types ¶
type CmdOpts ¶
type CmdOpts struct {
Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
BatchingDelay time.Duration `` /* 170-byte string literal not displayed */
PartitionInterval string `` /* 203-byte string literal not displayed */
RetentionInterval string `` /* 161-byte string literal not displayed */
MaintenanceInterval string `` /* 273-byte string literal not displayed */
RealDbnameField string `` /* 151-byte string literal not displayed */
SystemIdentifierField string `` /* 169-byte string literal not displayed */
}
CmdOpts specifies the storage configuration to store metrics measurements
type DbStorageSchemaType ¶
type DbStorageSchemaType int
const ( DbStorageSchemaPostgres DbStorageSchemaType = iota DbStorageSchemaTimescale )
type ExistingPartitionInfo ¶
type JSONWriter ¶
type JSONWriter struct {
// contains filtered or unexported fields
}
JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.
func NewJSONWriter ¶
func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)
func (*JSONWriter) SyncMetric ¶
func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error
func (*JSONWriter) Write ¶
func (jw *JSONWriter) Write(msg metrics.MeasurementEnvelope) error
type MetricsDefiner ¶
MetricDefiner is an interface for passing metric definitions to a sink.
type MultiWriter ¶
MultiWriter ensures the simultaneous storage of data in several storages.
func (*MultiWriter) AddWriter ¶
func (mw *MultiWriter) AddWriter(w Writer)
func (*MultiWriter) Count ¶
func (mw *MultiWriter) Count() int
func (*MultiWriter) DefineMetrics ¶
func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
func (*MultiWriter) Migrate ¶
func (mw *MultiWriter) Migrate() (err error)
Migrate runs migrations on all writers that support it
func (*MultiWriter) NeedsMigration ¶
func (mw *MultiWriter) NeedsMigration() (bool, error)
NeedsMigration checks if any writer needs migration
func (*MultiWriter) SyncMetric ¶
func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error)
func (*MultiWriter) Write ¶
func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)
type PostgresWriter ¶
type PostgresWriter struct {
// contains filtered or unexported fields
}
PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
func NewPostgresWriter ¶
func NewWriterFromPostgresConn ¶
func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)
func (*PostgresWriter) AddDBUniqueMetricToListingTable ¶
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error
func (*PostgresWriter) DeleteOldPartitions ¶
func (pgw *PostgresWriter) DeleteOldPartitions()
DeleteOldPartitions is a background task that deletes old partitions from the measurements DB
func (*PostgresWriter) EnsureBuiltinMetricDummies ¶
func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)
EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
func (*PostgresWriter) EnsureMetricDbnameTime ¶
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error)
func (*PostgresWriter) EnsureMetricDummy ¶
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)
EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
func (*PostgresWriter) EnsureMetricTimescale ¶
func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error)
func (*PostgresWriter) MaintainUniqueSources ¶
func (pgw *PostgresWriter) MaintainUniqueSources()
MaintainUniqueSources is a background task that maintains a mapping of unique sources in each metric table in admin.all_distinct_dbname_metrics. This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (*PostgresWriter) Migrate ¶
func (pgw *PostgresWriter) Migrate() error
Migrate upgrades database with all migrations
func (*PostgresWriter) NeedsMigration ¶
func (pgw *PostgresWriter) NeedsMigration() (bool, error)
NeedsMigration checks if database needs migration
func (*PostgresWriter) ReadMetricSchemaType ¶
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)
func (*PostgresWriter) SyncMetric ¶
func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
SyncMetric ensures that tables exist for newly added metrics and/or sources
func (*PostgresWriter) Write ¶
func (pgw *PostgresWriter) Write(msg metrics.MeasurementEnvelope) error
Write sends the measurements to the cache channel
type PromMetricCache ¶
type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data
type PrometheusWriter ¶
type PrometheusWriter struct {
sync.RWMutex
Namespace string
Cache PromMetricCache // [dbUnique][metric]lastly_fetched_data
// contains filtered or unexported fields
}
PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper. Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.
func NewPrometheusWriter ¶
func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)
func (*PrometheusWriter) Collect ¶
func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)
func (*PrometheusWriter) DefineMetrics ¶
func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)
func (*PrometheusWriter) Describe ¶
func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)
func (*PrometheusWriter) MetricStoreMessageToPromMetrics ¶
func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric
func (*PrometheusWriter) Println ¶
func (promw *PrometheusWriter) Println(v ...any)
func (*PrometheusWriter) PromAsyncCacheAddMetricData ¶
func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)
func (*PrometheusWriter) PromAsyncCacheInitIfRequired ¶
func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)
func (*PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny ¶
func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)
func (*PrometheusWriter) SyncMetric ¶
func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error
func (*PrometheusWriter) Write ¶
func (promw *PrometheusWriter) Write(msg metrics.MeasurementEnvelope) error
type RPCWriter ¶
type RPCWriter struct {
// contains filtered or unexported fields
}
RPCWriter sends metric measurements to a remote server using gRPC. Remote servers should make use the .proto file under api/pb/ to integrate with it. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.
func (*RPCWriter) DefineMetrics ¶
DefineMetrics sends metric definitions to the remote server
func (*RPCWriter) SyncMetric ¶
SyncMetric synchronizes a metric and monitored source with the remote server
type SyncOp ¶
type SyncOp int32
SyncOp represents synchronization operations for metrics. These constants are used both in Go code and protobuf definitions.