pgpool

package module
v0.0.0-...-df16262 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2025 License: MIT Imports: 12 Imported by: 0

README ΒΆ

PostgreSQL Connection Pool Plugin for RoadRunner

Go Report Card GoDoc License

A high-performance PostgreSQL connection pooling plugin for RoadRunner that dramatically reduces database connection overhead by allowing PHP workers to share a common pool of database connections.

πŸš€ Features

  • 80% Connection Reduction: Share connections across all PHP workers
  • Sub-millisecond Overhead: < 1ms latency for connection acquisition
  • Zero PHP Code Changes: Works with existing PDO/Doctrine/Eloquent code (via RPC)
  • Transaction-Aware: Proper isolation and transaction boundary management
  • Health Monitoring: Automatic connection health checks and recovery
  • Prometheus Metrics: Complete observability of pool performance
  • Circuit Breaker: Automatic failover on backend failures
  • Multiple Databases: Support for multiple independent connection pools

πŸ“¦ Installation

Add to Your RoadRunner Build

Add the plugin to your velox.toml:

[roadrunner]
ref = "v2024.x.x"

[github]
[github.token]
token = "YOUR_GITHUB_TOKEN"

[plugins]
pg_pool = { ref = "v1.0.0", owner = "roadrunner-server", repository = "pg-pool" }

Build RoadRunner with the plugin:

vx build -c velox.toml -o rr

βš™οΈ Configuration

Basic Configuration

Create or update .rr.yaml:

pgpool:
  enabled: true

  # Local proxy endpoint for PostgreSQL protocol (Phase 2)
  proxy:
    address: "tcp://127.0.0.1:5433"
    max_clients: 100
    client_timeout: 30s
    auth_mode: "trust"

  # Database backends
  backends:
    primary:
      dsn: "postgres://user:password@localhost:5432/mydb?sslmode=prefer"

      pool:
        max_connections: 25
        min_connections: 5
        max_lifetime: 1h
        max_idle_time: 30m
        health_check_period: 30s
        acquire_timeout: 5s

      behavior:
        mode: "transaction"
        reset_on_return: true
        statement_cache_size: 100
        query_timeout: 30s

  # Monitoring
  monitoring:
    enabled: true
    interval: 10s
    prometheus:
      enabled: true
      namespace: "roadrunner"
      subsystem: "pgpool"

  # Logging
  logging:
    level: "info"
    slow_query:
      enabled: true
      threshold: 100ms
Multi-Database Configuration
pgpool:
  enabled: true

  backends:
    primary:
      dsn: "postgres://user:pass@db1:5432/main"
      pool:
        max_connections: 25

  # Additional databases
  databases:
    analytics:
      primary:
        dsn: "postgres://user:pass@db2:5432/analytics"
        pool:
          max_connections: 10

    cache:
      primary:
        dsn: "postgres://user:pass@db3:5432/cache"
        pool:
          max_connections: 5
        behavior:
          mode: "statement"

πŸ”§ PHP Usage

Using RPC (Phase 1 - Current Implementation)

Install RoadRunner PHP client:

composer require spiral/roadrunner-http spiral/goridge
Execute Queries
<?php

use Spiral\Goridge\RPC\RPC;
use Spiral\Goridge\RPC\RPCInterface;

$rpc = RPC::create('tcp://127.0.0.1:6001');

// Execute SELECT query
$result = $rpc->call('pgpool.Execute', [
    'database' => 'primary',
    'query' => 'SELECT * FROM users WHERE active = $1 AND created_at > $2',
    'args' => [true, '2024-01-01'],
    'timeout' => 5000, // milliseconds
]);

foreach ($result['rows'] as $row) {
    echo "User: {$row['name']}\n";
}

// Execute INSERT/UPDATE/DELETE
$result = $rpc->call('pgpool.Execute', [
    'database' => 'primary',
    'query' => 'INSERT INTO users (name, email) VALUES ($1, $2)',
    'args' => ['John Doe', 'john@example.com'],
]);

echo "Inserted {$result['rows_affected']} row(s)\n";
Get Pool Statistics
<?php

$stats = $rpc->call('pgpool.GetPoolStats', [
    'database' => 'primary', // or '*' for all databases
]);

foreach ($stats['stats'] as $db => $stat) {
    echo "Database: {$db}\n";
    echo "  Total Connections: {$stat['total_connections']}\n";
    echo "  Active: {$stat['active_connections']}\n";
    echo "  Idle: {$stat['idle_connections']}\n";
    echo "  Query Count: {$stat['query_count']}\n";
    echo "  Avg Query Time: {$stat['average_query_time_ms']}ms\n";
}
Health Checks
<?php

$health = $rpc->call('pgpool.HealthCheck', [
    'database' => '*', // Check all databases
]);

foreach ($health['health'] as $db => $isHealthy) {
    echo "{$db}: " . ($isHealthy ? 'OK' : 'FAILED') . "\n";
    
    if (!$isHealthy && isset($health['errors'][$db])) {
        echo "  Error: {$health['errors'][$db]}\n";
    }
}
Helper Class for PHP
<?php

namespace App\Database;

use Spiral\Goridge\RPC\RPCInterface;

class PgPoolClient
{
    public function __construct(
        private RPCInterface $rpc,
        private string $database = 'primary'
    ) {}
    
    public function query(string $sql, array $args = []): array
    {
        $result = $this->rpc->call('pgpool.Execute', [
            'database' => $this->database,
            'query' => $sql,
            'args' => $args,
        ]);
        
        if (!empty($result['error'])) {
            throw new \RuntimeException($result['error']);
        }
        
        return $result['rows'] ?? [];
    }
    
    public function execute(string $sql, array $args = []): int
    {
        $result = $this->rpc->call('pgpool.Execute', [
            'database' => $this->database,
            'query' => $sql,
            'args' => $args,
        ]);
        
        if (!empty($result['error'])) {
            throw new \RuntimeException($result['error']);
        }
        
        return $result['rows_affected'] ?? 0;
    }
    
    public function stats(): array
    {
        $result = $this->rpc->call('pgpool.GetPoolStats', [
            'database' => $this->database,
        ]);
        
        return $result['stats'][$this->database] ?? [];
    }
}

// Usage
$pool = new PgPoolClient($rpc, 'primary');
$users = $pool->query('SELECT * FROM users WHERE id = $1', [123]);

πŸ“Š Monitoring

Prometheus Metrics

The plugin exposes the following Prometheus metrics:

  • roadrunner_pgpool_connections{database,state} - Connection counts (total/idle/active)
  • roadrunner_pgpool_acquire_duration_seconds{database} - Connection acquisition time
  • roadrunner_pgpool_query_duration_seconds{database} - Query execution time
  • roadrunner_pgpool_queries_total{database} - Total number of queries
  • roadrunner_pgpool_errors_total{database,type} - Error counts
  • roadrunner_pgpool_health_checks_total{database,status} - Health check results
Grafana Dashboard

Import the included Grafana dashboard from grafana-dashboard.json for comprehensive pool monitoring.

πŸ” Troubleshooting

High Connection Usage
# Check pool statistics
./rr rpc pgpool.GetPoolStats '{"database":"*"}'

# Monitor metrics
curl http://localhost:2112/metrics | grep pgpool
Slow Queries

Enable slow query logging in .rr.yaml:

pgpool:
  logging:
    slow_query:
      enabled: true
      threshold: 100ms
Connection Errors

Check health status:

./rr rpc pgpool.HealthCheck '{"database":"*"}'

πŸ› οΈ Development Roadmap

Phase 1: RPC Interface (Current)
  • βœ… Core connection pooling
  • βœ… Basic RPC methods (Execute, Stats, HealthCheck)
  • βœ… Prometheus metrics
  • βœ… Circuit breaker
  • ⏳ Transaction support via RPC
Phase 2: PostgreSQL Wire Protocol
  • ⏳ Full protocol implementation
  • ⏳ Direct PDO/pg_* function support
  • ⏳ Prepared statement support
  • ⏳ COPY protocol
Phase 3: Advanced Features
  • ⏳ Read/write splitting
  • ⏳ Multi-replica support
  • ⏳ Query result caching
  • ⏳ Advanced load balancing

πŸ“„ License

MIT License - see LICENSE file for details.

🀝 Contributing

Contributions are welcome! Please read our Contributing Guide for details.

πŸ“ž Support

πŸ™ Acknowledgments

Built with:

  • pgx - PostgreSQL driver and toolkit
  • RoadRunner - High-performance PHP application server
  • Endure - Dependency injection container

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

View Source
const (
	// PluginName is the name used for plugin registration
	PluginName = "pgpool"
)

Variables ΒΆ

This section is empty.

Functions ΒΆ

This section is empty.

Types ΒΆ

type BackendConfig ΒΆ

type BackendConfig struct {
	// DSN is the PostgreSQL connection string
	DSN string `mapstructure:"dsn"`

	// Alternative structured configuration
	Host     string `mapstructure:"host"`
	Port     int    `mapstructure:"port"`
	Database string `mapstructure:"database"`
	User     string `mapstructure:"user"`
	Password string `mapstructure:"password"`
	SSLMode  string `mapstructure:"sslmode"`

	// Pool configuration
	Pool PoolConfig `mapstructure:"pool"`

	// Behavior configuration
	Behavior BehaviorConfig `mapstructure:"behavior"`
}

BackendConfig defines a single database backend configuration

func (*BackendConfig) BuildDSN ΒΆ

func (b *BackendConfig) BuildDSN() string

BuildDSN constructs a PostgreSQL DSN from structured configuration

func (*BackendConfig) Validate ΒΆ

func (b *BackendConfig) Validate() error

Validate checks backend configuration for errors

type BackendsConfig ΒΆ

type BackendsConfig struct {
	// Primary database configuration
	Primary BackendConfig `mapstructure:"primary"`

	// Replicas for read scaling (optional)
	Replicas []ReplicaConfig `mapstructure:"replicas"`
}

BackendsConfig defines primary and replica database connections

type BehaviorConfig ΒΆ

type BehaviorConfig struct {
	// Mode specifies pooling mode: "session", "transaction", "statement", "auto"
	Mode string `mapstructure:"mode"`

	// ResetOnReturn resets connection state when returned to pool
	ResetOnReturn bool `mapstructure:"reset_on_return"`

	// LazyConnect delays connection establishment until first use
	LazyConnect bool `mapstructure:"lazy_connect"`

	// StatementCacheSize for prepared statements per connection
	StatementCacheSize int `mapstructure:"statement_cache_size"`

	// QueryTimeout for individual queries (0 = no timeout)
	QueryTimeout time.Duration `mapstructure:"query_timeout"`
}

BehaviorConfig defines connection behavior settings

type CacheConfig ΒΆ

type CacheConfig struct {
	// Enabled activates query caching
	Enabled bool `mapstructure:"enabled"`

	// TTL for cached results
	TTL time.Duration `mapstructure:"ttl"`

	// MaxSize limits cache memory usage
	MaxSize string `mapstructure:"max_size"`
}

CacheConfig defines query caching layer

type CircuitBreaker ΒΆ

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern for backend health

func NewCircuitBreaker ΒΆ

func NewCircuitBreaker(maxFailures int, timeout time.Duration, log *zap.Logger) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) Allow ΒΆ

func (cb *CircuitBreaker) Allow() bool

Allow checks if operation should be allowed

func (*CircuitBreaker) GetState ΒΆ

func (cb *CircuitBreaker) GetState() CircuitBreakerState

GetState returns current circuit breaker state

func (*CircuitBreaker) RecordFailure ΒΆ

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation

func (*CircuitBreaker) RecordSuccess ΒΆ

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation

func (*CircuitBreaker) Reset ΒΆ

func (cb *CircuitBreaker) Reset()

Reset manually resets the circuit breaker to closed state

type CircuitBreakerConfig ΒΆ

type CircuitBreakerConfig struct {
	// Enabled activates circuit breaker
	Enabled bool `mapstructure:"enabled"`

	// Threshold of failures to open circuit
	Threshold int `mapstructure:"threshold"`

	// Timeout before attempting reset
	Timeout time.Duration `mapstructure:"timeout"`
}

CircuitBreakerConfig defines circuit breaker behavior

type CircuitBreakerState ΒΆ

type CircuitBreakerState int32

CircuitBreakerState represents the state of the circuit breaker

const (
	// StateClosed means circuit is operating normally
	StateClosed CircuitBreakerState = iota
	// StateOpen means circuit is open due to failures
	StateOpen
	// StateHalfOpen means circuit is testing if backend recovered
	StateHalfOpen
)

type ClientConnection ΒΆ

type ClientConnection struct {
	// contains filtered or unexported fields
}

ClientConnection represents a single PHP client connection

func (*ClientConnection) Close ΒΆ

func (c *ClientConnection) Close() error

Close closes a client connection

type Config ΒΆ

type Config struct {
	// Enabled determines if the plugin is active
	Enabled bool `mapstructure:"enabled"`

	// Proxy configuration for PostgreSQL wire protocol server
	Proxy ProxyConfig `mapstructure:"proxy"`

	// Backends defines database connections
	Backends BackendsConfig `mapstructure:"backends"`

	// Routing configuration for read/write splitting
	Routing RoutingConfig `mapstructure:"routing"`

	// Monitoring and metrics configuration
	Monitoring MonitoringConfig `mapstructure:"monitoring"`

	// Logging configuration
	Logging LoggingConfig `mapstructure:"logging"`

	// Advanced features configuration
	Features FeaturesConfig `mapstructure:"features"`

	// Additional database configurations
	Databases map[string]DatabaseConfig `mapstructure:"databases"`
}

Config represents the complete plugin configuration structure

func (*Config) Validate ΒΆ

func (c *Config) Validate() error

Validate checks configuration for errors and applies defaults

type Configurer ΒΆ

type Configurer interface {
	// UnmarshalKey reads configuration section into provided structure
	UnmarshalKey(name string, out interface{}) error
	// Has checks if configuration key exists
	Has(name string) bool
}

Configurer provides access to plugin configuration

type DatabaseConfig ΒΆ

type DatabaseConfig struct {
	// Primary backend for this database
	Primary BackendConfig `mapstructure:"primary"`

	// Routing configuration (optional, inherits from global if not set)
	Routing *RoutingConfig `mapstructure:"routing"`
}

DatabaseConfig defines configuration for additional databases

type DatabasePool ΒΆ

type DatabasePool struct {
	// contains filtered or unexported fields
}

DatabasePool manages a pool of PostgreSQL connections for a single database

func NewDatabasePool ΒΆ

func NewDatabasePool(
	name string,
	config *BackendConfig,
	log *zap.Logger,
) (*DatabasePool, error)

NewDatabasePool creates a new connection pool for the specified database

func (*DatabasePool) Acquire ΒΆ

func (p *DatabasePool) Acquire(ctx context.Context) (*pgxpool.Conn, error)

Acquire obtains a connection from the pool The connection must be released back to the pool when done

func (*DatabasePool) Begin ΒΆ

func (p *DatabasePool) Begin(ctx context.Context) (pgx.Tx, error)

Begin starts a new transaction

func (*DatabasePool) Close ΒΆ

func (p *DatabasePool) Close(ctx context.Context) error

Close gracefully shuts down the pool

func (*DatabasePool) Exec ΒΆ

func (p *DatabasePool) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)

Exec executes a command (INSERT, UPDATE, DELETE, etc.)

func (*DatabasePool) HealthCheck ΒΆ

func (p *DatabasePool) HealthCheck(ctx context.Context) error

HealthCheck verifies pool connectivity and health

func (*DatabasePool) Query ΒΆ

func (p *DatabasePool) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes a query and returns the result rows

func (*DatabasePool) QueryRow ΒΆ

func (p *DatabasePool) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes a query that returns at most one row

func (*DatabasePool) Stats ΒΆ

func (p *DatabasePool) Stats() *PoolStats

Stats returns current pool statistics

type DatabaseStats ΒΆ

type DatabaseStats struct {
	// Connection counts
	TotalConnections  int32 `json:"total_connections"`
	IdleConnections   int32 `json:"idle_connections"`
	ActiveConnections int32 `json:"active_connections"`
	MaxConnections    int32 `json:"max_connections"`

	// Acquisition statistics
	TotalAcquired        int64 `json:"total_acquired"`
	TotalReleased        int64 `json:"total_released"`
	AverageAcquireTimeMs int64 `json:"average_acquire_time_ms"`

	// Query statistics
	QueryCount         int64 `json:"query_count"`
	AverageQueryTimeMs int64 `json:"average_query_time_ms"`
	ErrorCount         int64 `json:"error_count"`
	SlowQueryCount     int64 `json:"slow_query_count"`

	// Health status
	HealthChecksPassed int64  `json:"health_checks_passed"`
	HealthChecksFailed int64  `json:"health_checks_failed"`
	LastHealthCheck    string `json:"last_health_check"`
	LastError          string `json:"last_error,omitempty"`
}

DatabaseStats represents statistics for a single database pool

type ExecuteRequest ΒΆ

type ExecuteRequest struct {
	// Database name to execute query against (default: "primary")
	Database string `json:"database"`

	// SQL query to execute
	Query string `json:"query"`

	// Query parameters (positional)
	Args []interface{} `json:"args,omitempty"`

	// Timeout in milliseconds (0 = use default)
	Timeout int `json:"timeout,omitempty"`
}

ExecuteRequest represents a query execution request from PHP

type ExecuteResponse ΒΆ

type ExecuteResponse struct {
	// Rows returned by query (for SELECT queries)
	Rows []map[string]interface{} `json:"rows,omitempty"`

	// RowsAffected for INSERT/UPDATE/DELETE
	RowsAffected int64 `json:"rows_affected,omitempty"`

	// Error message if query failed
	Error string `json:"error,omitempty"`

	// Query execution time in milliseconds
	ExecutionTime int64 `json:"execution_time"`
}

ExecuteResponse contains query execution results

type FailoverConfig ΒΆ

type FailoverConfig struct {
	// Enabled allows automatic failover to replicas
	Enabled bool `mapstructure:"enabled"`

	// RetryInterval before retrying failed backend
	RetryInterval time.Duration `mapstructure:"retry_interval"`

	// MaxFailures before marking backend as down
	MaxFailures int `mapstructure:"max_failures"`
}

FailoverConfig defines failover behavior

type FeaturesConfig ΒΆ

type FeaturesConfig struct {
	// Cache configuration for query results
	Cache CacheConfig `mapstructure:"cache"`

	// CircuitBreaker for backend failure handling
	CircuitBreaker CircuitBreakerConfig `mapstructure:"circuit_breaker"`

	// RateLimit per client configuration
	RateLimit RateLimitConfig `mapstructure:"rate_limit"`
}

FeaturesConfig defines optional advanced features

type HealthCheckRequest ΒΆ

type HealthCheckRequest struct {
	// Database name (default: "primary", "*" for all)
	Database string `json:"database"`
}

HealthCheckRequest checks pool health

type HealthCheckResponse ΒΆ

type HealthCheckResponse struct {
	// Health status per database
	Health map[string]bool `json:"health"`

	// Error messages per database
	Errors map[string]string `json:"errors,omitempty"`
}

HealthCheckResponse returns health status

type Logger ΒΆ

type Logger interface {
	NamedLogger(name string) *zap.Logger
}

Logger provides logging capabilities

type LoggingConfig ΒΆ

type LoggingConfig struct {
	// Level specifies log level: "debug", "info", "warn", "error"
	Level string `mapstructure:"level"`

	// SlowQuery logging configuration
	SlowQuery SlowQueryConfig `mapstructure:"slow_query"`

	// LogQueries enables query logging (debug mode)
	LogQueries bool `mapstructure:"log_queries"`

	// LogParams includes query parameters in logs
	LogParams bool `mapstructure:"log_params"`
}

LoggingConfig defines logging behavior

type MetricsCollector ΒΆ

type MetricsCollector struct {
	// contains filtered or unexported fields
}

MetricsCollector collects and exposes pool metrics

func NewMetricsCollector ΒΆ

func NewMetricsCollector(config MonitoringConfig) *MetricsCollector

NewMetricsCollector creates a new metrics collector

func (*MetricsCollector) RecordConnectionError ΒΆ

func (mc *MetricsCollector) RecordConnectionError(database string)

RecordConnectionError records a connection error

func (*MetricsCollector) RecordHealthCheck ΒΆ

func (mc *MetricsCollector) RecordHealthCheck(database string, passed bool)

RecordHealthCheck records a health check result

func (*MetricsCollector) RecordPoolStats ΒΆ

func (mc *MetricsCollector) RecordPoolStats(database string, stats *PoolStats)

RecordPoolStats records pool statistics

func (*MetricsCollector) RecordQuery ΒΆ

func (mc *MetricsCollector) RecordQuery(database string, duration time.Duration, err error)

RecordQuery records a query execution

func (*MetricsCollector) Register ΒΆ

func (mc *MetricsCollector) Register() error

Register registers metrics with Prometheus

type MonitoringConfig ΒΆ

type MonitoringConfig struct {
	// Enabled activates metrics collection
	Enabled bool `mapstructure:"enabled"`

	// Interval between metrics updates
	Interval time.Duration `mapstructure:"interval"`

	// ExposeRPC makes metrics available via RPC
	ExposeRPC bool `mapstructure:"expose_rpc"`

	// Prometheus configuration
	Prometheus PrometheusConfig `mapstructure:"prometheus"`

	// Collect specifies which metrics to gather
	Collect []string `mapstructure:"collect"`
}

MonitoringConfig defines metrics and monitoring settings

type Plugin ΒΆ

type Plugin struct {
	// contains filtered or unexported fields
}

Plugin represents the PostgreSQL connection pool plugin for RoadRunner

func (*Plugin) GetPool ΒΆ

func (p *Plugin) GetPool(name string) (*DatabasePool, error)

GetPool retrieves a database pool by name (thread-safe)

func (*Plugin) Init ΒΆ

func (p *Plugin) Init(cfg Configurer, log Logger) error

Init initializes the plugin with dependencies from container This is called during RoadRunner startup before Serve()

func (*Plugin) Name ΒΆ

func (p *Plugin) Name() string

Name returns the plugin name for registration

func (*Plugin) RPC ΒΆ

func (p *Plugin) RPC() interface{}

RPC returns the RPC interface exposed to PHP

func (*Plugin) Serve ΒΆ

func (p *Plugin) Serve() chan error

Serve starts the plugin services (proxy server, health checks, metrics) Returns a channel that receives errors from long-running services

func (*Plugin) Stop ΒΆ

func (p *Plugin) Stop(ctx context.Context) error

Stop gracefully shuts down the plugin Waits for active connections to complete and closes all pools

func (*Plugin) Weight ΒΆ

func (p *Plugin) Weight() uint

Weight returns plugin initialization priority (higher = later) PgPool should initialize after config but before workers

type PoolConfig ΒΆ

type PoolConfig struct {
	// MaxConnections is the maximum number of connections in the pool
	MaxConnections int `mapstructure:"max_connections"`

	// MinConnections is the minimum idle connections to maintain
	MinConnections int `mapstructure:"min_connections"`

	// MaxLifetime is the maximum connection lifetime
	MaxLifetime time.Duration `mapstructure:"max_lifetime"`

	// MaxIdleTime is the maximum idle time before closing
	MaxIdleTime time.Duration `mapstructure:"max_idle_time"`

	// HealthCheckPeriod for periodic connection validation
	HealthCheckPeriod time.Duration `mapstructure:"health_check_period"`

	// AcquireTimeout when waiting for available connection
	AcquireTimeout time.Duration `mapstructure:"acquire_timeout"`

	// WaitQueueSize limits pending connection requests
	WaitQueueSize int `mapstructure:"wait_queue_size"`
}

PoolConfig defines connection pool parameters

type PoolStats ΒΆ

type PoolStats struct {
	// Connection statistics
	TotalConnections     int32
	IdleConnections      int32
	ActiveConnections    int32
	WaitingClients       int32
	TotalAcquired        int64
	TotalReleased        int64
	AcquireCount         int64
	AcquireDuration      time.Duration
	CanceledAcquireCount int64
	ConstructingConns    int32
	MaxConnections       int32

	// Query statistics
	QueryCount     int64
	QueryDuration  time.Duration
	ErrorCount     int64
	SlowQueryCount int64

	// Pool health
	HealthChecksPassed int64
	HealthChecksFailed int64
	LastHealthCheck    time.Time
	LastError          error
	// contains filtered or unexported fields
}

PoolStats tracks pool usage metrics

type PoolStatsRequest ΒΆ

type PoolStatsRequest struct {
	// Database name (default: "primary", "*" for all)
	Database string `json:"database"`
}

PoolStatsRequest retrieves pool statistics

type PoolStatsResponse ΒΆ

type PoolStatsResponse struct {
	// Statistics per database
	Stats map[string]DatabaseStats `json:"stats"`

	// Error message if stats retrieval failed
	Error string `json:"error,omitempty"`
}

PoolStatsResponse contains pool statistics

type PrometheusConfig ΒΆ

type PrometheusConfig struct {
	// Enabled exposes Prometheus metrics
	Enabled bool `mapstructure:"enabled"`

	// Namespace for metrics
	Namespace string `mapstructure:"namespace"`

	// Subsystem for metrics
	Subsystem string `mapstructure:"subsystem"`
}

PrometheusConfig defines Prometheus metrics settings

type ProxyConfig ΒΆ

type ProxyConfig struct {
	// Address where the proxy listens (e.g., "tcp://127.0.0.1:5433")
	Address string `mapstructure:"address"`

	// MaxClients limits concurrent client connections
	MaxClients int `mapstructure:"max_clients"`

	// ClientTimeout for idle client connections
	ClientTimeout time.Duration `mapstructure:"client_timeout"`

	// AuthMode specifies authentication method: "none", "password", "trust"
	AuthMode string `mapstructure:"auth_mode"`
}

ProxyConfig defines the local proxy server settings

type ProxyServer ΒΆ

type ProxyServer struct {
	// contains filtered or unexported fields
}

ProxyServer implements PostgreSQL wire protocol server This allows PHP to connect using standard PostgreSQL drivers

func NewProxyServer ΒΆ

func NewProxyServer(
	address string,
	plugin *Plugin,
	log *zap.Logger,
	config *ProxyConfig,
) (*ProxyServer, error)

NewProxyServer creates a new PostgreSQL protocol proxy server

func (*ProxyServer) Start ΒΆ

func (p *ProxyServer) Start(ctx context.Context) error

Start begins accepting client connections

func (*ProxyServer) Stop ΒΆ

func (p *ProxyServer) Stop(ctx context.Context) error

Stop gracefully shuts down the proxy server

type RateLimitConfig ΒΆ

type RateLimitConfig struct {
	// Enabled activates rate limiting
	Enabled bool `mapstructure:"enabled"`

	// RequestsPerSecond allowed per client
	RequestsPerSecond int `mapstructure:"requests_per_second"`

	// Burst allows temporary exceeding of rate limit
	Burst int `mapstructure:"burst"`
}

RateLimitConfig defines rate limiting per client

type ReplicaConfig ΒΆ

type ReplicaConfig struct {
	BackendConfig `mapstructure:",squash"`

	// Weight for load balancing (higher = more traffic)
	Weight int `mapstructure:"weight"`
}

ReplicaConfig extends BackendConfig with replica-specific settings

type RoutingConfig ΒΆ

type RoutingConfig struct {
	// ReadWriteSplit enables automatic read/write query routing
	ReadWriteSplit bool `mapstructure:"read_write_split"`

	// ReadPatterns are regex patterns identifying read queries
	ReadPatterns []string `mapstructure:"read_patterns"`

	// WritePatterns are regex patterns identifying write queries
	WritePatterns []string `mapstructure:"write_patterns"`

	// LoadBalance strategy: "round_robin", "least_connections", "random", "weighted"
	LoadBalance string `mapstructure:"load_balance"`

	// Failover configuration
	Failover FailoverConfig `mapstructure:"failover"`
}

RoutingConfig defines query routing behavior

type SlowQueryConfig ΒΆ

type SlowQueryConfig struct {
	// Enabled activates slow query logging
	Enabled bool `mapstructure:"enabled"`

	// Threshold for slow query detection
	Threshold time.Duration `mapstructure:"threshold"`
}

SlowQueryConfig defines slow query logging

type TransactionBeginRequest ΒΆ

type TransactionBeginRequest struct {
	// Database name (default: "primary")
	Database string `json:"database"`

	// Transaction ID for tracking (generated if not provided)
	TxID string `json:"tx_id,omitempty"`

	// Isolation level: "read_committed", "repeatable_read", "serializable"
	IsolationLevel string `json:"isolation_level,omitempty"`

	// Read only transaction
	ReadOnly bool `json:"read_only,omitempty"`
}

TransactionBeginRequest starts a new transaction

type TransactionBeginResponse ΒΆ

type TransactionBeginResponse struct {
	// Transaction ID for subsequent operations
	TxID string `json:"tx_id"`

	// Error message if transaction start failed
	Error string `json:"error,omitempty"`
}

TransactionBeginResponse returns transaction identifier

type TransactionCommitRequest ΒΆ

type TransactionCommitRequest struct {
	// Transaction ID to commit
	TxID string `json:"tx_id"`
}

TransactionCommitRequest commits a transaction

type TransactionCommitResponse ΒΆ

type TransactionCommitResponse struct {
	// Success indicator
	Success bool `json:"success"`

	// Error message if commit failed
	Error string `json:"error,omitempty"`
}

TransactionCommitResponse returns commit result

type TransactionRollbackRequest ΒΆ

type TransactionRollbackRequest struct {
	// Transaction ID to rollback
	TxID string `json:"tx_id"`
}

TransactionRollbackRequest rolls back a transaction

type TransactionRollbackResponse ΒΆ

type TransactionRollbackResponse struct {
	// Success indicator
	Success bool `json:"success"`

	// Error message if rollback failed
	Error string `json:"error,omitempty"`
}

TransactionRollbackResponse returns rollback result

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL