Documentation
¶
Overview ¶
Package base provides the production-grade SQL plugin implementation shared by all lynx database plugins (MySQL, PostgreSQL, etc.). It wraps database/sql with a connection pool, health checking, auto-reconnect, pool monitoring, slow-query detection, connection-leak detection, and a MetricsRecorder interface that individual plugins implement to expose Prometheus metrics.
Index ¶
- Variables
- type AutoReconnector
- type ConnectionPoolStats
- type HealthChecker
- type HealthReporter
- type LeakDetector
- type MetricsConfig
- type MetricsRecorder
- type Monitorable
- type NoOpMetricsRecorder
- func (n *NoOpMetricsRecorder) IncConnectAttempt()
- func (n *NoOpMetricsRecorder) IncConnectFailure()
- func (n *NoOpMetricsRecorder) IncConnectRetry()
- func (n *NoOpMetricsRecorder) IncConnectSuccess()
- func (n *NoOpMetricsRecorder) RecordConnectionPoolStats(stats *ConnectionPoolStats)
- func (n *NoOpMetricsRecorder) RecordHealthCheck(success bool)
- func (n *NoOpMetricsRecorder) RecordQuery(duration time.Duration, err error, threshold time.Duration)
- func (n *NoOpMetricsRecorder) RecordTx(duration time.Duration, committed bool)
- type PoolMonitor
- type PoolThresholds
- type PrometheusMetrics
- func (pm *PrometheusMetrics) GetGatherer() prometheus.Gatherer
- func (pm *PrometheusMetrics) IncConnectAttempt()
- func (pm *PrometheusMetrics) IncConnectFailure()
- func (pm *PrometheusMetrics) IncConnectRetry()
- func (pm *PrometheusMetrics) IncConnectSuccess()
- func (pm *PrometheusMetrics) RecordConnectionPoolStats(stats *ConnectionPoolStats)
- func (pm *PrometheusMetrics) RecordHealthCheck(success bool)
- func (pm *PrometheusMetrics) RecordQuery(duration time.Duration, err error, threshold time.Duration)
- func (pm *PrometheusMetrics) RecordTx(duration time.Duration, committed bool)
- type QueryMonitor
- func (m *QueryMonitor) MonitorExec(ctx context.Context, db *sql.DB, query string, args []any) (sql.Result, error)
- func (m *QueryMonitor) MonitorQuery(ctx context.Context, db *sql.DB, query string, args []any, fn func() error) error
- func (m *QueryMonitor) MonitorQueryRow(ctx context.Context, db *sql.DB, query string, args []any, ...) error
- type Reconnectable
- type SQLPlugin
- func (p *SQLPlugin) CheckHealth() error
- func (p *SQLPlugin) CheckHealthContext(ctx context.Context) error
- func (p *SQLPlugin) CleanupTasks() error
- func (p *SQLPlugin) GetAutoReconnector() *AutoReconnector
- func (p *SQLPlugin) GetDB() (*sql.DB, error)
- func (p *SQLPlugin) GetDBWithContext(ctx context.Context) (*sql.DB, error)
- func (p *SQLPlugin) GetDialect() string
- func (p *SQLPlugin) GetMetricsRecorder() MetricsRecorder
- func (p *SQLPlugin) GetQueryMonitor() *QueryMonitor
- func (p *SQLPlugin) GetStats() *ConnectionPoolStats
- func (p *SQLPlugin) GetValidatedConn(ctx context.Context) (*sql.Conn, error)
- func (p *SQLPlugin) InitializeFromConfig(rt plugins.Runtime) error
- func (p *SQLPlugin) InitializeResources(rt plugins.Runtime) error
- func (p *SQLPlugin) IsConnected() bool
- func (p *SQLPlugin) IsConnectedContext(ctx context.Context) bool
- func (p *SQLPlugin) Reconnect() error
- func (p *SQLPlugin) ReconnectContext(ctx context.Context) error
- func (p *SQLPlugin) ReportHealth() error
- func (p *SQLPlugin) SetMetricsRecorder(recorder MetricsRecorder)
- func (p *SQLPlugin) SetProvider(provider interfaces.DBProvider)
- func (p *SQLPlugin) SharedProviderResourceName() string
- func (p *SQLPlugin) StartupTasks() error
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotConnected = errors.New("database not connected") ErrAlreadyClosed = errors.New("database already closed") )
Functions ¶
This section is empty.
Types ¶
type AutoReconnector ¶
type AutoReconnector struct {
// contains filtered or unexported fields
}
AutoReconnector performs automatic reconnection on connection loss
func NewAutoReconnector ¶
func NewAutoReconnector(target Reconnectable, interval time.Duration, maxAttempts int) *AutoReconnector
NewAutoReconnector creates a new auto-reconnector
func (*AutoReconnector) GetAttempts ¶
func (a *AutoReconnector) GetAttempts() int64
GetAttempts returns the current number of reconnection attempts
func (*AutoReconnector) Start ¶
func (a *AutoReconnector) Start(ctx context.Context)
Start starts the auto-reconnect routine
type ConnectionPoolStats ¶
type ConnectionPoolStats struct {
MaxOpenConnections int64 // Maximum number of open connections
OpenConnections int64 // Number of established connections
InUse int64 // Number of connections currently in use
Idle int64 // Number of idle connections
MaxIdleConnections int64 // Maximum number of idle connections
WaitCount int64 // Total number of connections waited for
WaitDuration time.Duration // Total time blocked waiting for a new connection
MaxIdleClosed int64 // Total number of connections closed due to SetMaxIdleConns
MaxLifetimeClosed int64 // Total number of connections closed due to SetConnMaxLifetime
}
ConnectionPoolStats represents database connection pool statistics
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker performs periodic health checks and updates isHealthy. It only reports health state — recovery (Reconnect) is handled exclusively by AutoReconnector to avoid redundant concurrent reconnect attempts.
func NewHealthChecker ¶
func NewHealthChecker(target HealthReporter, interval time.Duration, customQuery string) *HealthChecker
NewHealthChecker creates a new health checker.
func (*HealthChecker) IsHealthy ¶
func (h *HealthChecker) IsHealthy() bool
IsHealthy returns the current health status
func (*HealthChecker) Start ¶
func (h *HealthChecker) Start(ctx context.Context)
Start starts the health check routine
type HealthReporter ¶ added in v1.6.3
HealthReporter is implemented by database plugins to expose a no-reconnect health probe for the HealthChecker. Unlike CheckHealth, ReportHealth never triggers reconnection — it only marks the connection as unhealthy when a query fails.
type LeakDetector ¶
type LeakDetector struct {
// contains filtered or unexported fields
}
LeakDetector detects connection leaks by monitoring connection usage
func NewLeakDetector ¶
func NewLeakDetector(target Monitorable, threshold time.Duration, interval time.Duration) *LeakDetector
NewLeakDetector creates a new connection leak detector. interval controls how often to check; 0 defaults to 30s.
func (*LeakDetector) Start ¶
func (l *LeakDetector) Start(ctx context.Context)
Start starts the leak detection routine
type MetricsConfig ¶
type MetricsConfig struct {
// Enabled determines if metrics recording is enabled
Enabled bool
// Namespace for Prometheus metrics
Namespace string
// Subsystem for Prometheus metrics
Subsystem string
// Labels to add to all metrics
Labels map[string]string
// SlowQueryThreshold defines the threshold for slow query detection
SlowQueryThreshold time.Duration
}
MetricsConfig defines configuration for metrics recording
func DefaultMetricsConfig ¶
func DefaultMetricsConfig() *MetricsConfig
DefaultMetricsConfig returns default metrics configuration
type MetricsRecorder ¶
type MetricsRecorder interface {
// RecordConnectionPoolStats records connection pool statistics
RecordConnectionPoolStats(stats *ConnectionPoolStats)
// RecordHealthCheck records health check results
RecordHealthCheck(success bool)
// RecordQuery records SQL query duration and errors
RecordQuery(duration time.Duration, err error, threshold time.Duration)
// RecordTx records transaction duration and status
RecordTx(duration time.Duration, committed bool)
// IncConnectAttempt increments connection attempt counter
IncConnectAttempt()
// IncConnectRetry increments connection retry counter
IncConnectRetry()
// IncConnectSuccess increments connection success counter
IncConnectSuccess()
// IncConnectFailure increments connection failure counter
IncConnectFailure()
}
MetricsRecorder defines the interface for recording database metrics
type Monitorable ¶
type Monitorable interface {
GetStats() *ConnectionPoolStats
Name() string
}
Monitorable interface for components that can be monitored
type NoOpMetricsRecorder ¶
type NoOpMetricsRecorder struct{}
NoOpMetricsRecorder provides a no-operation implementation of MetricsRecorder This is useful when metrics recording is disabled or not implemented
func (*NoOpMetricsRecorder) IncConnectAttempt ¶
func (n *NoOpMetricsRecorder) IncConnectAttempt()
IncConnectAttempt implements MetricsRecorder
func (*NoOpMetricsRecorder) IncConnectFailure ¶
func (n *NoOpMetricsRecorder) IncConnectFailure()
IncConnectFailure implements MetricsRecorder
func (*NoOpMetricsRecorder) IncConnectRetry ¶
func (n *NoOpMetricsRecorder) IncConnectRetry()
IncConnectRetry implements MetricsRecorder
func (*NoOpMetricsRecorder) IncConnectSuccess ¶
func (n *NoOpMetricsRecorder) IncConnectSuccess()
IncConnectSuccess implements MetricsRecorder
func (*NoOpMetricsRecorder) RecordConnectionPoolStats ¶
func (n *NoOpMetricsRecorder) RecordConnectionPoolStats(stats *ConnectionPoolStats)
RecordConnectionPoolStats implements MetricsRecorder
func (*NoOpMetricsRecorder) RecordHealthCheck ¶
func (n *NoOpMetricsRecorder) RecordHealthCheck(success bool)
RecordHealthCheck implements MetricsRecorder
func (*NoOpMetricsRecorder) RecordQuery ¶
func (n *NoOpMetricsRecorder) RecordQuery(duration time.Duration, err error, threshold time.Duration)
RecordQuery implements MetricsRecorder
type PoolMonitor ¶
type PoolMonitor struct {
// contains filtered or unexported fields
}
PoolMonitor monitors connection pool health and triggers alerts
func NewPoolMonitor ¶
func NewPoolMonitor(target Monitorable, interval time.Duration, thresholds *PoolThresholds) *PoolMonitor
NewPoolMonitor creates a new connection pool monitor
func (*PoolMonitor) Start ¶
func (m *PoolMonitor) Start(ctx context.Context)
Start starts the monitoring routine
type PoolThresholds ¶
type PoolThresholds struct {
UsagePercentage float64 // Alert when pool usage exceeds this (0.0-1.0)
WaitDuration time.Duration // Alert when wait duration exceeds this
WaitCount int64 // Alert when wait count exceeds this
AlertCooldown time.Duration // Minimum time between alerts; 0 = default (60s)
}
PoolThresholds defines alert thresholds for connection pool monitoring
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics provides a unified Prometheus metrics implementation
func NewPrometheusMetrics ¶
func NewPrometheusMetrics(config *MetricsConfig) *PrometheusMetrics
NewPrometheusMetrics creates a new Prometheus metrics instance
func (*PrometheusMetrics) GetGatherer ¶
func (pm *PrometheusMetrics) GetGatherer() prometheus.Gatherer
GetGatherer returns the Prometheus gatherer
func (*PrometheusMetrics) IncConnectAttempt ¶
func (pm *PrometheusMetrics) IncConnectAttempt()
IncConnectAttempt implements MetricsRecorder
func (*PrometheusMetrics) IncConnectFailure ¶
func (pm *PrometheusMetrics) IncConnectFailure()
IncConnectFailure implements MetricsRecorder
func (*PrometheusMetrics) IncConnectRetry ¶
func (pm *PrometheusMetrics) IncConnectRetry()
IncConnectRetry implements MetricsRecorder
func (*PrometheusMetrics) IncConnectSuccess ¶
func (pm *PrometheusMetrics) IncConnectSuccess()
IncConnectSuccess implements MetricsRecorder
func (*PrometheusMetrics) RecordConnectionPoolStats ¶
func (pm *PrometheusMetrics) RecordConnectionPoolStats(stats *ConnectionPoolStats)
RecordConnectionPoolStats implements MetricsRecorder
func (*PrometheusMetrics) RecordHealthCheck ¶
func (pm *PrometheusMetrics) RecordHealthCheck(success bool)
RecordHealthCheck implements MetricsRecorder
func (*PrometheusMetrics) RecordQuery ¶
func (pm *PrometheusMetrics) RecordQuery(duration time.Duration, err error, threshold time.Duration)
RecordQuery implements MetricsRecorder
type QueryMonitor ¶
type QueryMonitor struct {
// contains filtered or unexported fields
}
QueryMonitor provides slow query monitoring and logging
func NewQueryMonitor ¶
func NewQueryMonitor(enabled bool, threshold time.Duration, recorder MetricsRecorder) *QueryMonitor
NewQueryMonitor creates a new query monitor
func (*QueryMonitor) MonitorExec ¶
func (m *QueryMonitor) MonitorExec(ctx context.Context, db *sql.DB, query string, args []any) (sql.Result, error)
MonitorExec wraps a database exec with monitoring
type Reconnectable ¶
Reconnectable interface for components that can be reconnected
type SQLPlugin ¶
type SQLPlugin struct {
*plugins.BasePlugin
// contains filtered or unexported fields
}
SQLPlugin provides common functionality for all SQL plugins
func NewBaseSQLPlugin ¶
func NewBaseSQLPlugin( id, name, desc, version, confPrefix string, weight int, config *interfaces.Config, ) *SQLPlugin
NewBaseSQLPlugin creates a new base SQL plugin
func (*SQLPlugin) CheckHealth ¶
CheckHealth performs a health check. When auto-reconnect is enabled it may rebuild the pool once on failure. When auto-reconnect is disabled, health checks only report the unhealthy state and never replace the current pool.
func (*SQLPlugin) CheckHealthContext ¶ added in v1.6.1
func (*SQLPlugin) CleanupTasks ¶
CleanupTasks performs cleanup on shutdown
func (*SQLPlugin) GetAutoReconnector ¶
func (p *SQLPlugin) GetAutoReconnector() *AutoReconnector
GetAutoReconnector returns the auto-reconnector instance
func (*SQLPlugin) GetDB ¶
GetDB returns the database connection. Important: when auto-reconnect is enabled, do not cache the returned *sql.DB in long-lived structs (e.g. global Data/Repo). After Reconnect(), the previous *sql.DB is closed; cached references will get "sql: database is closed". Obtain the DB at the point of use (e.g. per request) or use a provider that calls GetDBWithContext when needed.
func (*SQLPlugin) GetDBWithContext ¶
GetDBWithContext returns the database connection with context support. When EnsureAliveBeforeHandout is true (default), it pings the pool before returning; on failure it triggers Reconnect() once so the pool is not handed out when already broken. Do not cache the returned *sql.DB when auto-reconnect is enabled; after Reconnect() it becomes closed.
func (*SQLPlugin) GetDialect ¶
GetDialect returns the database dialect string (e.g. "mysql", "postgres").
func (*SQLPlugin) GetMetricsRecorder ¶
func (p *SQLPlugin) GetMetricsRecorder() MetricsRecorder
GetMetricsRecorder returns the current metrics recorder
func (*SQLPlugin) GetQueryMonitor ¶
func (p *SQLPlugin) GetQueryMonitor() *QueryMonitor
GetQueryMonitor returns the query monitor for opt-in slow query detection. Callers wrap their queries with monitor.MonitorQuery() to record duration and detect slow queries. Returns nil when SlowQueryEnabled is false.
func (*SQLPlugin) GetStats ¶
func (p *SQLPlugin) GetStats() *ConnectionPoolStats
GetStats returns a snapshot of connection pool statistics. Returns nil when the plugin is not connected or is shutting down.
func (*SQLPlugin) GetValidatedConn ¶ added in v1.6.1
GetValidatedConn returns a single connection from the pool that has been verified alive (Ping). The returned connection is guaranteed to be usable at handoff time. Caller must call conn.Close() when done to return the connection to the pool.
func (*SQLPlugin) InitializeFromConfig ¶ added in v1.6.3
InitializeFromConfig binds the runtime and applies defaults/validation without re-scanning the config source. Use this when the config has already been fully populated by a plugin-specific loader (e.g. lynx-pgsql populates interfaces.Config from its own proto config before calling base).
func (*SQLPlugin) InitializeResources ¶
InitializeResources loads config via Scan then applies defaults and validates.
func (*SQLPlugin) IsConnected ¶
IsConnected pings the database to verify the connection is live.
func (*SQLPlugin) IsConnectedContext ¶ added in v1.6.1
func (*SQLPlugin) Reconnect ¶
Reconnect re-establishes the database connection; called by AutoReconnector on failure.
func (*SQLPlugin) ReconnectContext ¶ added in v1.6.1
func (*SQLPlugin) ReportHealth ¶ added in v1.6.3
ReportHealth performs a health check without triggering reconnect. It is called by HealthChecker; recovery is handled exclusively by AutoReconnector.
func (*SQLPlugin) SetMetricsRecorder ¶
func (p *SQLPlugin) SetMetricsRecorder(recorder MetricsRecorder)
SetMetricsRecorder sets the metrics recorder for this plugin
func (*SQLPlugin) SetProvider ¶ added in v1.6.1
func (p *SQLPlugin) SetProvider(provider interfaces.DBProvider)
SetProvider sets the stable DB provider published into runtime resources.
func (*SQLPlugin) SharedProviderResourceName ¶ added in v1.6.1
SharedProviderResourceName returns the canonical shared resource name for a SQL provider.
func (*SQLPlugin) StartupTasks ¶
StartupTasks performs startup initialization with retry support