base

package
v1.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: Apache-2.0 Imports: 11 Imported by: 3

Documentation

Overview

Package base provides the production-grade SQL plugin implementation shared by all lynx database plugins (MySQL, PostgreSQL, etc.). It wraps database/sql with a connection pool, health checking, auto-reconnect, pool monitoring, slow-query detection, connection-leak detection, and a MetricsRecorder interface that individual plugins implement to expose Prometheus metrics.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotConnected  = errors.New("database not connected")
	ErrAlreadyClosed = errors.New("database already closed")
)

Functions

This section is empty.

Types

type AutoReconnector

type AutoReconnector struct {
	// contains filtered or unexported fields
}

AutoReconnector performs automatic reconnection on connection loss

func NewAutoReconnector

func NewAutoReconnector(target Reconnectable, interval time.Duration, maxAttempts int) *AutoReconnector

NewAutoReconnector creates a new auto-reconnector

func (*AutoReconnector) GetAttempts

func (a *AutoReconnector) GetAttempts() int64

GetAttempts returns the current number of reconnection attempts

func (*AutoReconnector) Start

func (a *AutoReconnector) Start(ctx context.Context)

Start starts the auto-reconnect routine

func (*AutoReconnector) Stop

func (a *AutoReconnector) Stop()

Stop stops the auto-reconnector

type ConnectionPoolStats

type ConnectionPoolStats struct {
	MaxOpenConnections int64         // Maximum number of open connections
	OpenConnections    int64         // Number of established connections
	InUse              int64         // Number of connections currently in use
	Idle               int64         // Number of idle connections
	MaxIdleConnections int64         // Maximum number of idle connections
	WaitCount          int64         // Total number of connections waited for
	WaitDuration       time.Duration // Total time blocked waiting for a new connection
	MaxIdleClosed      int64         // Total number of connections closed due to SetMaxIdleConns
	MaxLifetimeClosed  int64         // Total number of connections closed due to SetConnMaxLifetime
}

ConnectionPoolStats represents database connection pool statistics

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker performs periodic health checks and updates isHealthy. It only reports health state — recovery (Reconnect) is handled exclusively by AutoReconnector to avoid redundant concurrent reconnect attempts.

func NewHealthChecker

func NewHealthChecker(target HealthReporter, interval time.Duration, customQuery string) *HealthChecker

NewHealthChecker creates a new health checker.

func (*HealthChecker) IsHealthy

func (h *HealthChecker) IsHealthy() bool

IsHealthy returns the current health status

func (*HealthChecker) Start

func (h *HealthChecker) Start(ctx context.Context)

Start starts the health check routine

func (*HealthChecker) Stop

func (h *HealthChecker) Stop()

Stop stops the health checker

type HealthReporter added in v1.6.3

type HealthReporter interface {
	ReportHealth() error
	Name() string
}

HealthReporter is implemented by database plugins to expose a no-reconnect health probe for the HealthChecker. Unlike CheckHealth, ReportHealth never triggers reconnection — it only marks the connection as unhealthy when a query fails.

type LeakDetector

type LeakDetector struct {
	// contains filtered or unexported fields
}

LeakDetector detects connection leaks by monitoring connection usage

func NewLeakDetector

func NewLeakDetector(target Monitorable, threshold time.Duration, interval time.Duration) *LeakDetector

NewLeakDetector creates a new connection leak detector. interval controls how often to check; 0 defaults to 30s.

func (*LeakDetector) Start

func (l *LeakDetector) Start(ctx context.Context)

Start starts the leak detection routine

func (*LeakDetector) Stop

func (l *LeakDetector) Stop()

Stop stops the leak detector

type MetricsConfig

type MetricsConfig struct {
	// Enabled determines if metrics recording is enabled
	Enabled bool

	// Namespace for Prometheus metrics
	Namespace string

	// Subsystem for Prometheus metrics
	Subsystem string

	// Labels to add to all metrics
	Labels map[string]string

	// SlowQueryThreshold defines the threshold for slow query detection
	SlowQueryThreshold time.Duration
}

MetricsConfig defines configuration for metrics recording

func DefaultMetricsConfig

func DefaultMetricsConfig() *MetricsConfig

DefaultMetricsConfig returns default metrics configuration

type MetricsRecorder

type MetricsRecorder interface {
	// RecordConnectionPoolStats records connection pool statistics
	RecordConnectionPoolStats(stats *ConnectionPoolStats)

	// RecordHealthCheck records health check results
	RecordHealthCheck(success bool)

	// RecordQuery records SQL query duration and errors
	RecordQuery(duration time.Duration, err error, threshold time.Duration)

	// RecordTx records transaction duration and status
	RecordTx(duration time.Duration, committed bool)

	// IncConnectAttempt increments connection attempt counter
	IncConnectAttempt()

	// IncConnectRetry increments connection retry counter
	IncConnectRetry()

	// IncConnectSuccess increments connection success counter
	IncConnectSuccess()

	// IncConnectFailure increments connection failure counter
	IncConnectFailure()
}

MetricsRecorder defines the interface for recording database metrics

type Monitorable

type Monitorable interface {
	GetStats() *ConnectionPoolStats
	Name() string
}

Monitorable interface for components that can be monitored

type NoOpMetricsRecorder

type NoOpMetricsRecorder struct{}

NoOpMetricsRecorder provides a no-operation implementation of MetricsRecorder This is useful when metrics recording is disabled or not implemented

func (*NoOpMetricsRecorder) IncConnectAttempt

func (n *NoOpMetricsRecorder) IncConnectAttempt()

IncConnectAttempt implements MetricsRecorder

func (*NoOpMetricsRecorder) IncConnectFailure

func (n *NoOpMetricsRecorder) IncConnectFailure()

IncConnectFailure implements MetricsRecorder

func (*NoOpMetricsRecorder) IncConnectRetry

func (n *NoOpMetricsRecorder) IncConnectRetry()

IncConnectRetry implements MetricsRecorder

func (*NoOpMetricsRecorder) IncConnectSuccess

func (n *NoOpMetricsRecorder) IncConnectSuccess()

IncConnectSuccess implements MetricsRecorder

func (*NoOpMetricsRecorder) RecordConnectionPoolStats

func (n *NoOpMetricsRecorder) RecordConnectionPoolStats(stats *ConnectionPoolStats)

RecordConnectionPoolStats implements MetricsRecorder

func (*NoOpMetricsRecorder) RecordHealthCheck

func (n *NoOpMetricsRecorder) RecordHealthCheck(success bool)

RecordHealthCheck implements MetricsRecorder

func (*NoOpMetricsRecorder) RecordQuery

func (n *NoOpMetricsRecorder) RecordQuery(duration time.Duration, err error, threshold time.Duration)

RecordQuery implements MetricsRecorder

func (*NoOpMetricsRecorder) RecordTx

func (n *NoOpMetricsRecorder) RecordTx(duration time.Duration, committed bool)

RecordTx implements MetricsRecorder

type PoolMonitor

type PoolMonitor struct {
	// contains filtered or unexported fields
}

PoolMonitor monitors connection pool health and triggers alerts

func NewPoolMonitor

func NewPoolMonitor(target Monitorable, interval time.Duration, thresholds *PoolThresholds) *PoolMonitor

NewPoolMonitor creates a new connection pool monitor

func (*PoolMonitor) Start

func (m *PoolMonitor) Start(ctx context.Context)

Start starts the monitoring routine

func (*PoolMonitor) Stop

func (m *PoolMonitor) Stop()

Stop stops the monitor

type PoolThresholds

type PoolThresholds struct {
	UsagePercentage float64       // Alert when pool usage exceeds this (0.0-1.0)
	WaitDuration    time.Duration // Alert when wait duration exceeds this
	WaitCount       int64         // Alert when wait count exceeds this
	AlertCooldown   time.Duration // Minimum time between alerts; 0 = default (60s)
}

PoolThresholds defines alert thresholds for connection pool monitoring

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics provides a unified Prometheus metrics implementation

func NewPrometheusMetrics

func NewPrometheusMetrics(config *MetricsConfig) *PrometheusMetrics

NewPrometheusMetrics creates a new Prometheus metrics instance

func (*PrometheusMetrics) GetGatherer

func (pm *PrometheusMetrics) GetGatherer() prometheus.Gatherer

GetGatherer returns the Prometheus gatherer

func (*PrometheusMetrics) IncConnectAttempt

func (pm *PrometheusMetrics) IncConnectAttempt()

IncConnectAttempt implements MetricsRecorder

func (*PrometheusMetrics) IncConnectFailure

func (pm *PrometheusMetrics) IncConnectFailure()

IncConnectFailure implements MetricsRecorder

func (*PrometheusMetrics) IncConnectRetry

func (pm *PrometheusMetrics) IncConnectRetry()

IncConnectRetry implements MetricsRecorder

func (*PrometheusMetrics) IncConnectSuccess

func (pm *PrometheusMetrics) IncConnectSuccess()

IncConnectSuccess implements MetricsRecorder

func (*PrometheusMetrics) RecordConnectionPoolStats

func (pm *PrometheusMetrics) RecordConnectionPoolStats(stats *ConnectionPoolStats)

RecordConnectionPoolStats implements MetricsRecorder

func (*PrometheusMetrics) RecordHealthCheck

func (pm *PrometheusMetrics) RecordHealthCheck(success bool)

RecordHealthCheck implements MetricsRecorder

func (*PrometheusMetrics) RecordQuery

func (pm *PrometheusMetrics) RecordQuery(duration time.Duration, err error, threshold time.Duration)

RecordQuery implements MetricsRecorder

func (*PrometheusMetrics) RecordTx

func (pm *PrometheusMetrics) RecordTx(duration time.Duration, committed bool)

RecordTx implements MetricsRecorder

type QueryMonitor

type QueryMonitor struct {
	// contains filtered or unexported fields
}

QueryMonitor provides slow query monitoring and logging

func NewQueryMonitor

func NewQueryMonitor(enabled bool, threshold time.Duration, recorder MetricsRecorder) *QueryMonitor

NewQueryMonitor creates a new query monitor

func (*QueryMonitor) MonitorExec

func (m *QueryMonitor) MonitorExec(ctx context.Context, db *sql.DB, query string, args []any) (sql.Result, error)

MonitorExec wraps a database exec with monitoring

func (*QueryMonitor) MonitorQuery

func (m *QueryMonitor) MonitorQuery(ctx context.Context, db *sql.DB, query string, args []any, fn func() error) error

MonitorQuery wraps a database query with monitoring

func (*QueryMonitor) MonitorQueryRow

func (m *QueryMonitor) MonitorQueryRow(ctx context.Context, db *sql.DB, query string, args []any, scan func(*sql.Row) error) error

MonitorQueryRow wraps a database query row with monitoring

type Reconnectable

type Reconnectable interface {
	Reconnect() error
	IsConnected() bool
	Name() string
}

Reconnectable interface for components that can be reconnected

type SQLPlugin

type SQLPlugin struct {
	*plugins.BasePlugin
	// contains filtered or unexported fields
}

SQLPlugin provides common functionality for all SQL plugins

func NewBaseSQLPlugin

func NewBaseSQLPlugin(
	id, name, desc, version, confPrefix string,
	weight int,
	config *interfaces.Config,
) *SQLPlugin

NewBaseSQLPlugin creates a new base SQL plugin

func (*SQLPlugin) CheckHealth

func (p *SQLPlugin) CheckHealth() error

CheckHealth performs a health check. When auto-reconnect is enabled it may rebuild the pool once on failure. When auto-reconnect is disabled, health checks only report the unhealthy state and never replace the current pool.

func (*SQLPlugin) CheckHealthContext added in v1.6.1

func (p *SQLPlugin) CheckHealthContext(ctx context.Context) error

func (*SQLPlugin) CleanupTasks

func (p *SQLPlugin) CleanupTasks() error

CleanupTasks performs cleanup on shutdown

func (*SQLPlugin) GetAutoReconnector

func (p *SQLPlugin) GetAutoReconnector() *AutoReconnector

GetAutoReconnector returns the auto-reconnector instance

func (*SQLPlugin) GetDB

func (p *SQLPlugin) GetDB() (*sql.DB, error)

GetDB returns the database connection. Important: when auto-reconnect is enabled, do not cache the returned *sql.DB in long-lived structs (e.g. global Data/Repo). After Reconnect(), the previous *sql.DB is closed; cached references will get "sql: database is closed". Obtain the DB at the point of use (e.g. per request) or use a provider that calls GetDBWithContext when needed.

func (*SQLPlugin) GetDBWithContext

func (p *SQLPlugin) GetDBWithContext(ctx context.Context) (*sql.DB, error)

GetDBWithContext returns the database connection with context support. When EnsureAliveBeforeHandout is true (default), it pings the pool before returning; on failure it triggers Reconnect() once so the pool is not handed out when already broken. Do not cache the returned *sql.DB when auto-reconnect is enabled; after Reconnect() it becomes closed.

func (*SQLPlugin) GetDialect

func (p *SQLPlugin) GetDialect() string

GetDialect returns the database dialect string (e.g. "mysql", "postgres").

func (*SQLPlugin) GetMetricsRecorder

func (p *SQLPlugin) GetMetricsRecorder() MetricsRecorder

GetMetricsRecorder returns the current metrics recorder

func (*SQLPlugin) GetQueryMonitor

func (p *SQLPlugin) GetQueryMonitor() *QueryMonitor

GetQueryMonitor returns the query monitor for opt-in slow query detection. Callers wrap their queries with monitor.MonitorQuery() to record duration and detect slow queries. Returns nil when SlowQueryEnabled is false.

func (*SQLPlugin) GetStats

func (p *SQLPlugin) GetStats() *ConnectionPoolStats

GetStats returns a snapshot of connection pool statistics. Returns nil when the plugin is not connected or is shutting down.

func (*SQLPlugin) GetValidatedConn added in v1.6.1

func (p *SQLPlugin) GetValidatedConn(ctx context.Context) (*sql.Conn, error)

GetValidatedConn returns a single connection from the pool that has been verified alive (Ping). The returned connection is guaranteed to be usable at handoff time. Caller must call conn.Close() when done to return the connection to the pool.

func (*SQLPlugin) InitializeFromConfig added in v1.6.3

func (p *SQLPlugin) InitializeFromConfig(rt plugins.Runtime) error

InitializeFromConfig binds the runtime and applies defaults/validation without re-scanning the config source. Use this when the config has already been fully populated by a plugin-specific loader (e.g. lynx-pgsql populates interfaces.Config from its own proto config before calling base).

func (*SQLPlugin) InitializeResources

func (p *SQLPlugin) InitializeResources(rt plugins.Runtime) error

InitializeResources loads config via Scan then applies defaults and validates.

func (*SQLPlugin) IsConnected

func (p *SQLPlugin) IsConnected() bool

IsConnected pings the database to verify the connection is live.

func (*SQLPlugin) IsConnectedContext added in v1.6.1

func (p *SQLPlugin) IsConnectedContext(ctx context.Context) bool

func (*SQLPlugin) Reconnect

func (p *SQLPlugin) Reconnect() error

Reconnect re-establishes the database connection; called by AutoReconnector on failure.

func (*SQLPlugin) ReconnectContext added in v1.6.1

func (p *SQLPlugin) ReconnectContext(ctx context.Context) error

func (*SQLPlugin) ReportHealth added in v1.6.3

func (p *SQLPlugin) ReportHealth() error

ReportHealth performs a health check without triggering reconnect. It is called by HealthChecker; recovery is handled exclusively by AutoReconnector.

func (*SQLPlugin) SetMetricsRecorder

func (p *SQLPlugin) SetMetricsRecorder(recorder MetricsRecorder)

SetMetricsRecorder sets the metrics recorder for this plugin

func (*SQLPlugin) SetProvider added in v1.6.1

func (p *SQLPlugin) SetProvider(provider interfaces.DBProvider)

SetProvider sets the stable DB provider published into runtime resources.

func (*SQLPlugin) SharedProviderResourceName added in v1.6.1

func (p *SQLPlugin) SharedProviderResourceName() string

SharedProviderResourceName returns the canonical shared resource name for a SQL provider.

func (*SQLPlugin) StartupTasks

func (p *SQLPlugin) StartupTasks() error

StartupTasks performs startup initialization with retry support

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL