sinks

package
v5.0.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2026 License: BSD-3-Clause Imports: 35 Imported by: 0

Documentation

Overview

Package sinks provides functionality to store monitored data in different ways.

At the moment we provide sink connectors for

  • PostgreSQL and flavours,
  • Prometheus,
  • plain JSON files,
  • and RPC servers.

To ensure the simultaneous storage of data in several storages, the `MultiWriter` class is implemented.

Index

Constants

View Source
const MigrationsCount = 1

MigrationsCount is the total number of migrations in admin.migration table

Variables

View Source
var ErrNeedsMigration = errors.New("sink database schema is outdated, please run migrations using `pgwatch config upgrade` command")

Functions

func LoadTLSCredentials

func LoadTLSCredentials(CAFile string) (credentials.TransportCredentials, error)

func NewPostgresSinkMigrator

func NewPostgresSinkMigrator(ctx context.Context, connStr string) (db.Migrator, error)

Types

type CmdOpts

type CmdOpts struct {
	Sinks                 []string      `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
	BatchingDelay         time.Duration `` /* 170-byte string literal not displayed */
	PartitionInterval     string        `` /* 203-byte string literal not displayed */
	RetentionInterval     string        `` /* 161-byte string literal not displayed */
	MaintenanceInterval   string        `` /* 273-byte string literal not displayed */
	RealDbnameField       string        `` /* 151-byte string literal not displayed */
	SystemIdentifierField string        `` /* 169-byte string literal not displayed */
}

CmdOpts specifies the storage configuration to store metrics measurements

type DbStorageSchemaType

type DbStorageSchemaType int
const (
	DbStorageSchemaPostgres DbStorageSchemaType = iota
	DbStorageSchemaTimescale
)

type ExistingPartitionInfo

type ExistingPartitionInfo struct {
	StartTime time.Time
	EndTime   time.Time
}

type JSONWriter

type JSONWriter struct {
	// contains filtered or unexported fields
}

JSONWriter is a sink that writes metric measurements to a file in JSON format. It supports compression and rotation of output files. The default rotation is based on the file size (100Mb). JSONWriter is useful for debugging and testing purposes, as well as for integration with other systems, such as log aggregators, analytics systems, and data processing pipelines, ML models, etc.

func NewJSONWriter

func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error)

func (*JSONWriter) SyncMetric

func (jw *JSONWriter) SyncMetric(_, _ string, _ SyncOp) error

func (*JSONWriter) Write

type MeasurementMessagePostgres

type MeasurementMessagePostgres struct {
	Time    time.Time
	DBName  string
	Metric  string
	Data    map[string]any
	TagData map[string]string
}

type MetricsDefiner

type MetricsDefiner interface {
	DefineMetrics(metrics *metrics.Metrics) error
}

MetricDefiner is an interface for passing metric definitions to a sink.

type MultiWriter

type MultiWriter struct {
	sync.Mutex
	// contains filtered or unexported fields
}

MultiWriter ensures the simultaneous storage of data in several storages.

func (*MultiWriter) AddWriter

func (mw *MultiWriter) AddWriter(w Writer)

func (*MultiWriter) Count

func (mw *MultiWriter) Count() int

func (*MultiWriter) DefineMetrics

func (mw *MultiWriter) DefineMetrics(metrics *metrics.Metrics) (err error)

func (*MultiWriter) Migrate

func (mw *MultiWriter) Migrate() (err error)

Migrate runs migrations on all writers that support it

func (*MultiWriter) NeedsMigration

func (mw *MultiWriter) NeedsMigration() (bool, error)

NeedsMigration checks if any writer needs migration

func (*MultiWriter) SyncMetric

func (mw *MultiWriter) SyncMetric(sourceName, metricName string, op SyncOp) (err error)

func (*MultiWriter) Write

func (mw *MultiWriter) Write(msg metrics.MeasurementEnvelope) (err error)

type PostgresWriter

type PostgresWriter struct {
	// contains filtered or unexported fields
}

PostgresWriter is a sink that writes metric measurements to a Postgres database. At the moment, it supports both Postgres and TimescaleDB as a storage backend. However, one is able to use any Postgres-compatible database as a storage backend, e.g. PGEE, Citus, Greenplum, CockroachDB, etc.

func NewPostgresWriter

func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error)

func NewWriterFromPostgresConn

func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *CmdOpts) (pgw *PostgresWriter, err error)

func (*PostgresWriter) AddDBUniqueMetricToListingTable

func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error

func (*PostgresWriter) DeleteOldPartitions

func (pgw *PostgresWriter) DeleteOldPartitions()

DeleteOldPartitions is a background task that deletes old partitions from the measurements DB

func (*PostgresWriter) EnsureBuiltinMetricDummies

func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error)

EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist

func (*PostgresWriter) EnsureMetricDbnameTime

func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error)

func (*PostgresWriter) EnsureMetricDummy

func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error)

EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist

func (*PostgresWriter) EnsureMetricTimescale

func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo) (err error)

func (*PostgresWriter) MaintainUniqueSources

func (pgw *PostgresWriter) MaintainUniqueSources()

MaintainUniqueSources is a background task that maintains a mapping of unique sources in each metric table in admin.all_distinct_dbname_metrics. This is used to avoid listing the same source multiple times in Grafana dropdowns.

func (*PostgresWriter) Migrate

func (pgw *PostgresWriter) Migrate() error

Migrate upgrades database with all migrations

func (*PostgresWriter) NeedsMigration

func (pgw *PostgresWriter) NeedsMigration() (bool, error)

NeedsMigration checks if database needs migration

func (*PostgresWriter) ReadMetricSchemaType

func (pgw *PostgresWriter) ReadMetricSchemaType() (err error)

func (*PostgresWriter) SyncMetric

func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error

SyncMetric ensures that tables exist for newly added metrics and/or sources

func (*PostgresWriter) Write

Write sends the measurements to the cache channel

type PromMetricCache

type PromMetricCache = map[string]map[string]metrics.MeasurementEnvelope // [dbUnique][metric]lastly_fetched_data

type PrometheusWriter

type PrometheusWriter struct {
	sync.RWMutex

	Namespace string
	Cache     PromMetricCache // [dbUnique][metric]lastly_fetched_data
	// contains filtered or unexported fields
}

PrometheusWriter is a sink that allows to expose metric measurements to Prometheus scrapper. Prometheus collects metrics data from pgwatch by scraping metrics HTTP endpoints.

func NewPrometheusWriter

func NewPrometheusWriter(ctx context.Context, connstr string) (promw *PrometheusWriter, err error)

func (*PrometheusWriter) Collect

func (promw *PrometheusWriter) Collect(ch chan<- prometheus.Metric)

func (*PrometheusWriter) DefineMetrics

func (promw *PrometheusWriter) DefineMetrics(metrics *metrics.Metrics) (err error)

func (*PrometheusWriter) Describe

func (promw *PrometheusWriter) Describe(_ chan<- *prometheus.Desc)

func (*PrometheusWriter) MetricStoreMessageToPromMetrics

func (promw *PrometheusWriter) MetricStoreMessageToPromMetrics(msg metrics.MeasurementEnvelope) []prometheus.Metric

func (*PrometheusWriter) Println

func (promw *PrometheusWriter) Println(v ...any)

func (*PrometheusWriter) PromAsyncCacheAddMetricData

func (promw *PrometheusWriter) PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr metrics.MeasurementEnvelope)

func (*PrometheusWriter) PromAsyncCacheInitIfRequired

func (promw *PrometheusWriter) PromAsyncCacheInitIfRequired(dbUnique, _ string)

func (*PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny

func (promw *PrometheusWriter) PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string)

func (*PrometheusWriter) SyncMetric

func (promw *PrometheusWriter) SyncMetric(sourceName, metricName string, op SyncOp) error

func (*PrometheusWriter) Write

type RPCWriter

type RPCWriter struct {
	// contains filtered or unexported fields
}

RPCWriter sends metric measurements to a remote server using gRPC. Remote servers should make use the .proto file under api/pb/ to integrate with it. It's up to the implementer to define the behavior of the server. It can be a simple logger, external storage, alerting system, or an analytics system.

func NewRPCWriter

func NewRPCWriter(ctx context.Context, connStr string) (*RPCWriter, error)

func (*RPCWriter) DefineMetrics

func (rw *RPCWriter) DefineMetrics(metrics *metrics.Metrics) error

DefineMetrics sends metric definitions to the remote server

func (*RPCWriter) Ping

func (rw *RPCWriter) Ping() error

func (*RPCWriter) SyncMetric

func (rw *RPCWriter) SyncMetric(sourceName, metricName string, op SyncOp) error

SyncMetric synchronizes a metric and monitored source with the remote server

func (*RPCWriter) Write

func (rw *RPCWriter) Write(msg metrics.MeasurementEnvelope) error

Sends Measurement Message to RPC Sink

type SyncOp

type SyncOp int32

SyncOp represents synchronization operations for metrics. These constants are used both in Go code and protobuf definitions.

const (
	// InvalidOp represents an invalid or unrecognized operation
	InvalidOp SyncOp = iota // 0
	// AddOp represents adding a new metric
	AddOp // 1
	// DeleteOp represents deleting an existing metric or entire source
	DeleteOp // 2
	// DefineOp represents defining metric definitions
	DefineOp // 3
)

func (SyncOp) String

func (s SyncOp) String() string

String returns the string representation of the SyncOp

type Writer

type Writer interface {
	SyncMetric(sourceName, metricName string, op SyncOp) error
	Write(msgs metrics.MeasurementEnvelope) error
}

Writer is an interface that writes metrics values

func NewSinkWriter

func NewSinkWriter(ctx context.Context, opts *CmdOpts) (w Writer, err error)

NewSinkWriter creates and returns new instance of MultiWriter struct.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL