Documentation
ΒΆ
Index ΒΆ
- Constants
- type BackendConfig
- type BackendsConfig
- type BehaviorConfig
- type CacheConfig
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type ClientConnection
- type Config
- type Configurer
- type DatabaseConfig
- type DatabasePool
- func (p *DatabasePool) Acquire(ctx context.Context) (*pgxpool.Conn, error)
- func (p *DatabasePool) Begin(ctx context.Context) (pgx.Tx, error)
- func (p *DatabasePool) Close(ctx context.Context) error
- func (p *DatabasePool) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
- func (p *DatabasePool) HealthCheck(ctx context.Context) error
- func (p *DatabasePool) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
- func (p *DatabasePool) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
- func (p *DatabasePool) Stats() *PoolStats
- type DatabaseStats
- type ExecuteRequest
- type ExecuteResponse
- type FailoverConfig
- type FeaturesConfig
- type HealthCheckRequest
- type HealthCheckResponse
- type Logger
- type LoggingConfig
- type MetricsCollector
- func (mc *MetricsCollector) RecordConnectionError(database string)
- func (mc *MetricsCollector) RecordHealthCheck(database string, passed bool)
- func (mc *MetricsCollector) RecordPoolStats(database string, stats *PoolStats)
- func (mc *MetricsCollector) RecordQuery(database string, duration time.Duration, err error)
- func (mc *MetricsCollector) Register() error
- type MonitoringConfig
- type Plugin
- type PoolConfig
- type PoolStats
- type PoolStatsRequest
- type PoolStatsResponse
- type PrometheusConfig
- type ProxyConfig
- type ProxyServer
- type RateLimitConfig
- type ReplicaConfig
- type RoutingConfig
- type SlowQueryConfig
- type TransactionBeginRequest
- type TransactionBeginResponse
- type TransactionCommitRequest
- type TransactionCommitResponse
- type TransactionRollbackRequest
- type TransactionRollbackResponse
Constants ΒΆ
const (
// PluginName is the name used for plugin registration
PluginName = "pgpool"
)
Variables ΒΆ
This section is empty.
Functions ΒΆ
This section is empty.
Types ΒΆ
type BackendConfig ΒΆ
type BackendConfig struct {
// DSN is the PostgreSQL connection string
DSN string `mapstructure:"dsn"`
// Alternative structured configuration
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Database string `mapstructure:"database"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
SSLMode string `mapstructure:"sslmode"`
// Pool configuration
Pool PoolConfig `mapstructure:"pool"`
// Behavior configuration
Behavior BehaviorConfig `mapstructure:"behavior"`
}
BackendConfig defines a single database backend configuration
func (*BackendConfig) BuildDSN ΒΆ
func (b *BackendConfig) BuildDSN() string
BuildDSN constructs a PostgreSQL DSN from structured configuration
func (*BackendConfig) Validate ΒΆ
func (b *BackendConfig) Validate() error
Validate checks backend configuration for errors
type BackendsConfig ΒΆ
type BackendsConfig struct {
// Primary database configuration
Primary BackendConfig `mapstructure:"primary"`
// Replicas for read scaling (optional)
Replicas []ReplicaConfig `mapstructure:"replicas"`
}
BackendsConfig defines primary and replica database connections
type BehaviorConfig ΒΆ
type BehaviorConfig struct {
// Mode specifies pooling mode: "session", "transaction", "statement", "auto"
Mode string `mapstructure:"mode"`
// ResetOnReturn resets connection state when returned to pool
ResetOnReturn bool `mapstructure:"reset_on_return"`
// LazyConnect delays connection establishment until first use
LazyConnect bool `mapstructure:"lazy_connect"`
// StatementCacheSize for prepared statements per connection
StatementCacheSize int `mapstructure:"statement_cache_size"`
// QueryTimeout for individual queries (0 = no timeout)
QueryTimeout time.Duration `mapstructure:"query_timeout"`
}
BehaviorConfig defines connection behavior settings
type CacheConfig ΒΆ
type CacheConfig struct {
// Enabled activates query caching
Enabled bool `mapstructure:"enabled"`
// TTL for cached results
TTL time.Duration `mapstructure:"ttl"`
// MaxSize limits cache memory usage
MaxSize string `mapstructure:"max_size"`
}
CacheConfig defines query caching layer
type CircuitBreaker ΒΆ
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern for backend health
func NewCircuitBreaker ΒΆ
NewCircuitBreaker creates a new circuit breaker
func (*CircuitBreaker) Allow ΒΆ
func (cb *CircuitBreaker) Allow() bool
Allow checks if operation should be allowed
func (*CircuitBreaker) GetState ΒΆ
func (cb *CircuitBreaker) GetState() CircuitBreakerState
GetState returns current circuit breaker state
func (*CircuitBreaker) RecordFailure ΒΆ
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation
func (*CircuitBreaker) RecordSuccess ΒΆ
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation
func (*CircuitBreaker) Reset ΒΆ
func (cb *CircuitBreaker) Reset()
Reset manually resets the circuit breaker to closed state
type CircuitBreakerConfig ΒΆ
type CircuitBreakerConfig struct {
// Enabled activates circuit breaker
Enabled bool `mapstructure:"enabled"`
// Threshold of failures to open circuit
Threshold int `mapstructure:"threshold"`
// Timeout before attempting reset
Timeout time.Duration `mapstructure:"timeout"`
}
CircuitBreakerConfig defines circuit breaker behavior
type CircuitBreakerState ΒΆ
type CircuitBreakerState int32
CircuitBreakerState represents the state of the circuit breaker
const ( // StateClosed means circuit is operating normally StateClosed CircuitBreakerState = iota // StateOpen means circuit is open due to failures StateOpen // StateHalfOpen means circuit is testing if backend recovered StateHalfOpen )
type ClientConnection ΒΆ
type ClientConnection struct {
// contains filtered or unexported fields
}
ClientConnection represents a single PHP client connection
func (*ClientConnection) Close ΒΆ
func (c *ClientConnection) Close() error
Close closes a client connection
type Config ΒΆ
type Config struct {
// Enabled determines if the plugin is active
Enabled bool `mapstructure:"enabled"`
// Proxy configuration for PostgreSQL wire protocol server
Proxy ProxyConfig `mapstructure:"proxy"`
// Backends defines database connections
Backends BackendsConfig `mapstructure:"backends"`
// Routing configuration for read/write splitting
Routing RoutingConfig `mapstructure:"routing"`
// Monitoring and metrics configuration
Monitoring MonitoringConfig `mapstructure:"monitoring"`
// Logging configuration
Logging LoggingConfig `mapstructure:"logging"`
// Advanced features configuration
Features FeaturesConfig `mapstructure:"features"`
// Additional database configurations
Databases map[string]DatabaseConfig `mapstructure:"databases"`
}
Config represents the complete plugin configuration structure
type Configurer ΒΆ
type Configurer interface {
// UnmarshalKey reads configuration section into provided structure
UnmarshalKey(name string, out interface{}) error
// Has checks if configuration key exists
Has(name string) bool
}
Configurer provides access to plugin configuration
type DatabaseConfig ΒΆ
type DatabaseConfig struct {
// Primary backend for this database
Primary BackendConfig `mapstructure:"primary"`
// Routing configuration (optional, inherits from global if not set)
Routing *RoutingConfig `mapstructure:"routing"`
}
DatabaseConfig defines configuration for additional databases
type DatabasePool ΒΆ
type DatabasePool struct {
// contains filtered or unexported fields
}
DatabasePool manages a pool of PostgreSQL connections for a single database
func NewDatabasePool ΒΆ
func NewDatabasePool( name string, config *BackendConfig, log *zap.Logger, ) (*DatabasePool, error)
NewDatabasePool creates a new connection pool for the specified database
func (*DatabasePool) Acquire ΒΆ
Acquire obtains a connection from the pool The connection must be released back to the pool when done
func (*DatabasePool) Close ΒΆ
func (p *DatabasePool) Close(ctx context.Context) error
Close gracefully shuts down the pool
func (*DatabasePool) Exec ΒΆ
func (p *DatabasePool) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
Exec executes a command (INSERT, UPDATE, DELETE, etc.)
func (*DatabasePool) HealthCheck ΒΆ
func (p *DatabasePool) HealthCheck(ctx context.Context) error
HealthCheck verifies pool connectivity and health
func (*DatabasePool) Query ΒΆ
func (p *DatabasePool) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
Query executes a query and returns the result rows
func (*DatabasePool) Stats ΒΆ
func (p *DatabasePool) Stats() *PoolStats
Stats returns current pool statistics
type DatabaseStats ΒΆ
type DatabaseStats struct {
// Connection counts
TotalConnections int32 `json:"total_connections"`
IdleConnections int32 `json:"idle_connections"`
ActiveConnections int32 `json:"active_connections"`
MaxConnections int32 `json:"max_connections"`
// Acquisition statistics
TotalAcquired int64 `json:"total_acquired"`
TotalReleased int64 `json:"total_released"`
AverageAcquireTimeMs int64 `json:"average_acquire_time_ms"`
// Query statistics
QueryCount int64 `json:"query_count"`
AverageQueryTimeMs int64 `json:"average_query_time_ms"`
ErrorCount int64 `json:"error_count"`
SlowQueryCount int64 `json:"slow_query_count"`
// Health status
HealthChecksPassed int64 `json:"health_checks_passed"`
HealthChecksFailed int64 `json:"health_checks_failed"`
LastHealthCheck string `json:"last_health_check"`
LastError string `json:"last_error,omitempty"`
}
DatabaseStats represents statistics for a single database pool
type ExecuteRequest ΒΆ
type ExecuteRequest struct {
// Database name to execute query against (default: "primary")
Database string `json:"database"`
// SQL query to execute
Query string `json:"query"`
// Query parameters (positional)
Args []interface{} `json:"args,omitempty"`
// Timeout in milliseconds (0 = use default)
Timeout int `json:"timeout,omitempty"`
}
ExecuteRequest represents a query execution request from PHP
type ExecuteResponse ΒΆ
type ExecuteResponse struct {
// Rows returned by query (for SELECT queries)
Rows []map[string]interface{} `json:"rows,omitempty"`
// RowsAffected for INSERT/UPDATE/DELETE
RowsAffected int64 `json:"rows_affected,omitempty"`
// Error message if query failed
Error string `json:"error,omitempty"`
// Query execution time in milliseconds
ExecutionTime int64 `json:"execution_time"`
}
ExecuteResponse contains query execution results
type FailoverConfig ΒΆ
type FailoverConfig struct {
// Enabled allows automatic failover to replicas
Enabled bool `mapstructure:"enabled"`
// RetryInterval before retrying failed backend
RetryInterval time.Duration `mapstructure:"retry_interval"`
// MaxFailures before marking backend as down
MaxFailures int `mapstructure:"max_failures"`
}
FailoverConfig defines failover behavior
type FeaturesConfig ΒΆ
type FeaturesConfig struct {
// Cache configuration for query results
Cache CacheConfig `mapstructure:"cache"`
// CircuitBreaker for backend failure handling
CircuitBreaker CircuitBreakerConfig `mapstructure:"circuit_breaker"`
// RateLimit per client configuration
RateLimit RateLimitConfig `mapstructure:"rate_limit"`
}
FeaturesConfig defines optional advanced features
type HealthCheckRequest ΒΆ
type HealthCheckRequest struct {
// Database name (default: "primary", "*" for all)
Database string `json:"database"`
}
HealthCheckRequest checks pool health
type HealthCheckResponse ΒΆ
type HealthCheckResponse struct {
// Health status per database
Health map[string]bool `json:"health"`
// Error messages per database
Errors map[string]string `json:"errors,omitempty"`
}
HealthCheckResponse returns health status
type LoggingConfig ΒΆ
type LoggingConfig struct {
// Level specifies log level: "debug", "info", "warn", "error"
Level string `mapstructure:"level"`
// SlowQuery logging configuration
SlowQuery SlowQueryConfig `mapstructure:"slow_query"`
// LogQueries enables query logging (debug mode)
LogQueries bool `mapstructure:"log_queries"`
// LogParams includes query parameters in logs
LogParams bool `mapstructure:"log_params"`
}
LoggingConfig defines logging behavior
type MetricsCollector ΒΆ
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector collects and exposes pool metrics
func NewMetricsCollector ΒΆ
func NewMetricsCollector(config MonitoringConfig) *MetricsCollector
NewMetricsCollector creates a new metrics collector
func (*MetricsCollector) RecordConnectionError ΒΆ
func (mc *MetricsCollector) RecordConnectionError(database string)
RecordConnectionError records a connection error
func (*MetricsCollector) RecordHealthCheck ΒΆ
func (mc *MetricsCollector) RecordHealthCheck(database string, passed bool)
RecordHealthCheck records a health check result
func (*MetricsCollector) RecordPoolStats ΒΆ
func (mc *MetricsCollector) RecordPoolStats(database string, stats *PoolStats)
RecordPoolStats records pool statistics
func (*MetricsCollector) RecordQuery ΒΆ
func (mc *MetricsCollector) RecordQuery(database string, duration time.Duration, err error)
RecordQuery records a query execution
func (*MetricsCollector) Register ΒΆ
func (mc *MetricsCollector) Register() error
Register registers metrics with Prometheus
type MonitoringConfig ΒΆ
type MonitoringConfig struct {
// Enabled activates metrics collection
Enabled bool `mapstructure:"enabled"`
// Interval between metrics updates
Interval time.Duration `mapstructure:"interval"`
// ExposeRPC makes metrics available via RPC
ExposeRPC bool `mapstructure:"expose_rpc"`
// Prometheus configuration
Prometheus PrometheusConfig `mapstructure:"prometheus"`
// Collect specifies which metrics to gather
Collect []string `mapstructure:"collect"`
}
MonitoringConfig defines metrics and monitoring settings
type Plugin ΒΆ
type Plugin struct {
// contains filtered or unexported fields
}
Plugin represents the PostgreSQL connection pool plugin for RoadRunner
func (*Plugin) GetPool ΒΆ
func (p *Plugin) GetPool(name string) (*DatabasePool, error)
GetPool retrieves a database pool by name (thread-safe)
func (*Plugin) Init ΒΆ
func (p *Plugin) Init(cfg Configurer, log Logger) error
Init initializes the plugin with dependencies from container This is called during RoadRunner startup before Serve()
func (*Plugin) RPC ΒΆ
func (p *Plugin) RPC() interface{}
RPC returns the RPC interface exposed to PHP
func (*Plugin) Serve ΒΆ
Serve starts the plugin services (proxy server, health checks, metrics) Returns a channel that receives errors from long-running services
type PoolConfig ΒΆ
type PoolConfig struct {
// MaxConnections is the maximum number of connections in the pool
MaxConnections int `mapstructure:"max_connections"`
// MinConnections is the minimum idle connections to maintain
MinConnections int `mapstructure:"min_connections"`
// MaxLifetime is the maximum connection lifetime
MaxLifetime time.Duration `mapstructure:"max_lifetime"`
// MaxIdleTime is the maximum idle time before closing
MaxIdleTime time.Duration `mapstructure:"max_idle_time"`
// HealthCheckPeriod for periodic connection validation
HealthCheckPeriod time.Duration `mapstructure:"health_check_period"`
// AcquireTimeout when waiting for available connection
AcquireTimeout time.Duration `mapstructure:"acquire_timeout"`
// WaitQueueSize limits pending connection requests
WaitQueueSize int `mapstructure:"wait_queue_size"`
}
PoolConfig defines connection pool parameters
type PoolStats ΒΆ
type PoolStats struct {
// Connection statistics
TotalConnections int32
IdleConnections int32
ActiveConnections int32
WaitingClients int32
TotalAcquired int64
TotalReleased int64
AcquireCount int64
AcquireDuration time.Duration
CanceledAcquireCount int64
ConstructingConns int32
MaxConnections int32
// Query statistics
QueryCount int64
QueryDuration time.Duration
ErrorCount int64
SlowQueryCount int64
// Pool health
HealthChecksPassed int64
HealthChecksFailed int64
LastHealthCheck time.Time
LastError error
// contains filtered or unexported fields
}
PoolStats tracks pool usage metrics
type PoolStatsRequest ΒΆ
type PoolStatsRequest struct {
// Database name (default: "primary", "*" for all)
Database string `json:"database"`
}
PoolStatsRequest retrieves pool statistics
type PoolStatsResponse ΒΆ
type PoolStatsResponse struct {
// Statistics per database
Stats map[string]DatabaseStats `json:"stats"`
// Error message if stats retrieval failed
Error string `json:"error,omitempty"`
}
PoolStatsResponse contains pool statistics
type PrometheusConfig ΒΆ
type PrometheusConfig struct {
// Enabled exposes Prometheus metrics
Enabled bool `mapstructure:"enabled"`
// Namespace for metrics
Namespace string `mapstructure:"namespace"`
// Subsystem for metrics
Subsystem string `mapstructure:"subsystem"`
}
PrometheusConfig defines Prometheus metrics settings
type ProxyConfig ΒΆ
type ProxyConfig struct {
// Address where the proxy listens (e.g., "tcp://127.0.0.1:5433")
Address string `mapstructure:"address"`
// MaxClients limits concurrent client connections
MaxClients int `mapstructure:"max_clients"`
// ClientTimeout for idle client connections
ClientTimeout time.Duration `mapstructure:"client_timeout"`
// AuthMode specifies authentication method: "none", "password", "trust"
AuthMode string `mapstructure:"auth_mode"`
}
ProxyConfig defines the local proxy server settings
type ProxyServer ΒΆ
type ProxyServer struct {
// contains filtered or unexported fields
}
ProxyServer implements PostgreSQL wire protocol server This allows PHP to connect using standard PostgreSQL drivers
func NewProxyServer ΒΆ
func NewProxyServer( address string, plugin *Plugin, log *zap.Logger, config *ProxyConfig, ) (*ProxyServer, error)
NewProxyServer creates a new PostgreSQL protocol proxy server
type RateLimitConfig ΒΆ
type RateLimitConfig struct {
// Enabled activates rate limiting
Enabled bool `mapstructure:"enabled"`
// RequestsPerSecond allowed per client
RequestsPerSecond int `mapstructure:"requests_per_second"`
// Burst allows temporary exceeding of rate limit
Burst int `mapstructure:"burst"`
}
RateLimitConfig defines rate limiting per client
type ReplicaConfig ΒΆ
type ReplicaConfig struct {
BackendConfig `mapstructure:",squash"`
// Weight for load balancing (higher = more traffic)
Weight int `mapstructure:"weight"`
}
ReplicaConfig extends BackendConfig with replica-specific settings
type RoutingConfig ΒΆ
type RoutingConfig struct {
// ReadWriteSplit enables automatic read/write query routing
ReadWriteSplit bool `mapstructure:"read_write_split"`
// ReadPatterns are regex patterns identifying read queries
ReadPatterns []string `mapstructure:"read_patterns"`
// WritePatterns are regex patterns identifying write queries
WritePatterns []string `mapstructure:"write_patterns"`
// LoadBalance strategy: "round_robin", "least_connections", "random", "weighted"
LoadBalance string `mapstructure:"load_balance"`
// Failover configuration
Failover FailoverConfig `mapstructure:"failover"`
}
RoutingConfig defines query routing behavior
type SlowQueryConfig ΒΆ
type SlowQueryConfig struct {
// Enabled activates slow query logging
Enabled bool `mapstructure:"enabled"`
// Threshold for slow query detection
Threshold time.Duration `mapstructure:"threshold"`
}
SlowQueryConfig defines slow query logging
type TransactionBeginRequest ΒΆ
type TransactionBeginRequest struct {
// Database name (default: "primary")
Database string `json:"database"`
// Transaction ID for tracking (generated if not provided)
TxID string `json:"tx_id,omitempty"`
// Isolation level: "read_committed", "repeatable_read", "serializable"
IsolationLevel string `json:"isolation_level,omitempty"`
// Read only transaction
ReadOnly bool `json:"read_only,omitempty"`
}
TransactionBeginRequest starts a new transaction
type TransactionBeginResponse ΒΆ
type TransactionBeginResponse struct {
// Transaction ID for subsequent operations
TxID string `json:"tx_id"`
// Error message if transaction start failed
Error string `json:"error,omitempty"`
}
TransactionBeginResponse returns transaction identifier
type TransactionCommitRequest ΒΆ
type TransactionCommitRequest struct {
// Transaction ID to commit
TxID string `json:"tx_id"`
}
TransactionCommitRequest commits a transaction
type TransactionCommitResponse ΒΆ
type TransactionCommitResponse struct {
// Success indicator
Success bool `json:"success"`
// Error message if commit failed
Error string `json:"error,omitempty"`
}
TransactionCommitResponse returns commit result
type TransactionRollbackRequest ΒΆ
type TransactionRollbackRequest struct {
// Transaction ID to rollback
TxID string `json:"tx_id"`
}
TransactionRollbackRequest rolls back a transaction
type TransactionRollbackResponse ΒΆ
type TransactionRollbackResponse struct {
// Success indicator
Success bool `json:"success"`
// Error message if rollback failed
Error string `json:"error,omitempty"`
}
TransactionRollbackResponse returns rollback result