pool

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2025 License: MIT Imports: 16 Imported by: 0

README

Porter Pool Infrastructure

The pool infrastructure package provides efficient resource pooling for the Flight SQL server, including database connections and object pools for memory optimization.

Design Overview

Core Components
  1. Connection Pool

    type ConnectionPool interface {
        Get(ctx context.Context) (*sql.DB, error)
        Stats() PoolStats
        HealthCheck(ctx context.Context) error
        Close() error
    }
    
  2. Object Pools

    type ByteBufferPool struct {
        pools map[int]*sync.Pool
        mu    sync.RWMutex
    }
    
    type RecordBuilderPool struct {
        pool      sync.Pool
        allocator memory.Allocator
    }
    

Implementation Details

Connection Pool

The connection pool manages database connections with the following features:

  1. Configuration

    type Config struct {
        DSN                string
        MaxOpenConnections int
        MaxIdleConnections int
        ConnMaxLifetime    time.Duration
        ConnMaxIdleTime    time.Duration
        HealthCheckPeriod  time.Duration
        ConnectionTimeout  time.Duration
    }
    
  2. Statistics

    type PoolStats struct {
        OpenConnections   int
        InUse             int
        Idle              int
        WaitCount         int64
        WaitDuration      time.Duration
        MaxIdleClosed     int64
        MaxLifetimeClosed int64
        LastHealthCheck   time.Time
        HealthCheckStatus string
    }
    
  3. Health Monitoring

    • Periodic health checks
    • Connection validation
    • Query execution testing
    • Status tracking
Object Pools
  1. Byte Buffer Pool

    • Size-class based pooling
    • Power-of-two sizing
    • Thread-safe operations
    • Zero-allocation reuse
  2. Record Builder Pool

    • Schema-aware pooling
    • Memory allocator integration
    • Automatic schema validation
    • Resource cleanup

Usage

Connection Pool
// Create pool configuration
config := pool.Config{
    DSN:                "duckdb.db",
    MaxOpenConnections: 25,
    MaxIdleConnections: 5,
    ConnMaxLifetime:    30 * time.Minute,
    ConnMaxIdleTime:    10 * time.Minute,
    HealthCheckPeriod:  time.Minute,
    ConnectionTimeout:  30 * time.Second,
}

// Create pool
pool, err := pool.New(config, logger)
if err != nil {
    log.Fatal(err)
}
defer pool.Close()

// Get connection
db, err := pool.Get(ctx)
if err != nil {
    log.Fatal(err)
}

// Check pool stats
stats := pool.Stats()
fmt.Printf("Active connections: %d\n", stats.InUse)
Object Pools
// Create byte buffer pool
bufferPool := pool.NewByteBufferPool()

// Get buffer
buf := bufferPool.Get(1024)
defer bufferPool.Put(buf)

// Create record builder pool
builderPool := pool.NewRecordBuilderPool(memory.DefaultAllocator)

// Get builder
builder := builderPool.Get(schema)
defer builderPool.Put(builder)

Best Practices

  1. Connection Management

    • Set appropriate pool sizes
    • Monitor connection usage
    • Implement proper error handling
    • Use context for timeouts
  2. Resource Cleanup

    • Always close connections
    • Return objects to pools
    • Handle cleanup in defer
    • Monitor resource leaks
  3. Performance

    • Size pools appropriately
    • Monitor pool statistics
    • Adjust timeouts as needed
    • Handle backpressure
  4. Monitoring

    • Track pool statistics
    • Monitor health checks
    • Log connection issues
    • Alert on pool exhaustion

Testing

The package includes comprehensive tests:

  1. Connection Pool Tests

    • Connection lifecycle
    • Health checks
    • Error handling
    • Resource cleanup
  2. Object Pool Tests

    • Buffer allocation
    • Record builder reuse
    • Thread safety
    • Memory management

Performance Considerations

  1. Memory Usage

    • Pool sizing
    • Buffer reuse
    • Memory fragmentation
    • Resource limits
  2. CPU Impact

    • Pool synchronization
    • Health checks
    • Connection validation
    • Object creation
  3. Concurrency

    • Thread safety
    • Lock contention
    • Connection limits
    • Resource sharing

Integration Examples

Custom Connection Wrapper
type CustomConnection struct {
    *pool.ConnectionWrapper
    // Additional fields
}

func (c *CustomConnection) ExecuteWithRetry(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
    // Implement retry logic
    return c.Execute(ctx, query, args...)
}
Pool Monitoring
type PoolMonitor struct {
    pool   pool.ConnectionPool
    logger zerolog.Logger
}

func (m *PoolMonitor) Monitor(ctx context.Context) {
    ticker := time.NewTicker(time.Minute)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            stats := m.pool.Stats()
            m.logger.Info().
                Int("open", stats.OpenConnections).
                Int("in_use", stats.InUse).
                Int("idle", stats.Idle).
                Msg("Pool statistics")
        }
    }
}
Custom Object Pool
type CustomObjectPool struct {
    pool sync.Pool
}

func (p *CustomObjectPool) Get() *CustomObject {
    v := p.pool.Get()
    if v == nil {
        return NewCustomObject()
    }
    return v.(*CustomObject)
}

func (p *CustomObjectPool) Put(obj *CustomObject) {
    obj.Reset()
    p.pool.Put(obj)
}

Documentation

Overview

Package pool provides database connection pooling for DuckDB.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CacheStats

type CacheStats struct {
	Size      int
	Cap       int
	TotalHits int64
	OldestAge time.Duration
}

CacheStats contains live statistics.

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern for connection failures.

func NewCircuitBreaker

func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker.

func (*CircuitBreaker) CanExecute

func (cb *CircuitBreaker) CanExecute() bool

CanExecute checks if the circuit breaker allows execution.

func (*CircuitBreaker) GetFailures

func (cb *CircuitBreaker) GetFailures() int64

GetFailures returns the current failure count.

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitBreakerState

GetState returns the current circuit breaker state.

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation.

type CircuitBreakerState

type CircuitBreakerState int

CircuitBreakerState represents the state of the circuit breaker.

const (
	CircuitBreakerClosed CircuitBreakerState = iota
	CircuitBreakerOpen
	CircuitBreakerHalfOpen
)

func (CircuitBreakerState) String

func (s CircuitBreakerState) String() string

type Config

type Config struct {
	DSN                string        `json:"dsn"`
	MaxOpenConnections int           `json:"max_open_connections"`
	MaxIdleConnections int           `json:"max_idle_connections"`
	ConnMaxLifetime    time.Duration `json:"conn_max_lifetime"`
	ConnMaxIdleTime    time.Duration `json:"conn_max_idle_time"`
	HealthCheckPeriod  time.Duration `json:"health_check_period"`
	ConnectionTimeout  time.Duration `json:"connection_timeout"`

	// Enterprise features
	EnableCircuitBreaker    bool          `json:"enable_circuit_breaker"`
	CircuitBreakerThreshold int           `json:"circuit_breaker_threshold"`
	CircuitBreakerTimeout   time.Duration `json:"circuit_breaker_timeout"`
	EnableConnectionRetry   bool          `json:"enable_connection_retry"`
	MaxRetryAttempts        int           `json:"max_retry_attempts"`
	RetryBackoffBase        time.Duration `json:"retry_backoff_base"`
	EnableSlowQueryLogging  bool          `json:"enable_slow_query_logging"`
	SlowQueryThreshold      time.Duration `json:"slow_query_threshold"`
	EnableMetrics           bool          `json:"enable_metrics"`
	MetricsNamespace        string        `json:"metrics_namespace"`
}

Config represents pool configuration.

type ConnectionPool

type ConnectionPool interface {
	// Get returns a database connection.
	Get(ctx context.Context) (*sql.DB, error)
	// GetWithValidation returns a validated database connection.
	GetWithValidation(ctx context.Context) (*EnterpriseConnection, error)
	// Stats returns pool statistics.
	Stats() PoolStats
	// EnterpriseStats returns comprehensive enterprise statistics.
	EnterpriseStats() EnterprisePoolStats
	// HealthCheck performs a health check on the pool.
	HealthCheck(ctx context.Context) error
	// Close closes the connection pool.
	Close() error
	// SetMetricsCollector sets the metrics collector.
	SetMetricsCollector(collector MetricsCollector)
}

ConnectionPool manages database connections.

func New

func New(cfg Config, logger zerolog.Logger) (ConnectionPool, error)

New creates a new connection pool.

type ConnectionValidator

type ConnectionValidator struct {
	// contains filtered or unexported fields
}

ConnectionValidator validates database connections.

func NewConnectionValidator

func NewConnectionValidator(logger zerolog.Logger, config Config) *ConnectionValidator

NewConnectionValidator creates a new connection validator.

func (*ConnectionValidator) ValidateConnection

func (cv *ConnectionValidator) ValidateConnection(ctx context.Context, db *sql.DB) error

ValidateConnection performs comprehensive connection validation.

type ConnectionWrapper

type ConnectionWrapper struct {
	// contains filtered or unexported fields
}

ConnectionWrapper wraps a database connection with additional functionality.

func NewConnectionWrapper

func NewConnectionWrapper(db *sql.DB, pool *connectionPool, logger zerolog.Logger) *ConnectionWrapper

NewConnectionWrapper creates a new connection wrapper.

func (*ConnectionWrapper) Execute

func (w *ConnectionWrapper) Execute(ctx context.Context, query string, args ...interface{}) (sql.Result, error)

Execute executes a query with retry logic.

func (*ConnectionWrapper) Query

func (w *ConnectionWrapper) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

Query executes a query and returns rows.

type EnterpriseConnection

type EnterpriseConnection struct {
	// contains filtered or unexported fields
}

EnterpriseConnection wraps a database connection with enterprise features.

func NewEnterpriseConnection

func NewEnterpriseConnection(db *sql.DB, pool *connectionPool) *EnterpriseConnection

NewEnterpriseConnection creates a new enterprise connection.

func (*EnterpriseConnection) Execute

func (ec *EnterpriseConnection) Execute(ctx context.Context, query string, args ...interface{}) (sql.Result, error)

Execute executes a query with enterprise features.

func (*EnterpriseConnection) Query

func (ec *EnterpriseConnection) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)

Query executes a query and returns rows with enterprise features.

type EnterprisePoolStats

type EnterprisePoolStats struct {
	PoolStats
	CircuitBreakerState    string        `json:"circuit_breaker_state"`
	CircuitBreakerFailures int64         `json:"circuit_breaker_failures"`
	TotalRetryAttempts     int64         `json:"total_retry_attempts"`
	SlowQueries            int64         `json:"slow_queries"`
	ValidationFailures     int64         `json:"validation_failures"`
	AverageAcquisitionTime time.Duration `json:"average_acquisition_time"`
	PeakConnections        int           `json:"peak_connections"`
	ConnectionErrors       int64         `json:"connection_errors"`
}

EnterprisePoolStats provides comprehensive pool statistics.

type EnterpriseStats

type EnterpriseStats struct {
	// contains filtered or unexported fields
}

EnterpriseStats tracks comprehensive statistics.

func NewEnterpriseStats

func NewEnterpriseStats() *EnterpriseStats

NewEnterpriseStats creates new enterprise statistics tracker.

func (*EnterpriseStats) GetAverageAcquisitionTime

func (es *EnterpriseStats) GetAverageAcquisitionTime() time.Duration

GetAverageAcquisitionTime returns the average acquisition time.

func (*EnterpriseStats) RecordAcquisitionTime

func (es *EnterpriseStats) RecordAcquisitionTime(duration time.Duration)

RecordAcquisitionTime records connection acquisition time.

type FastRecordPool

type FastRecordPool struct {
	// contains filtered or unexported fields
}

FastRecordPool is a concurrent, multi‑schema pool optimised for throughput.

func NewFastRecordPool

func NewFastRecordPool(alloc memory.Allocator) *FastRecordPool

NewFastRecordPool constructs a pool; pass nil for the default Go allocator.

func (*FastRecordPool) Get

func (p *FastRecordPool) Get(schema *arrow.Schema) arrow.Record

Get returns an empty record for the given schema.

func (*FastRecordPool) Put

func (p *FastRecordPool) Put(rec arrow.Record)

Put resets the record to zero‑length and puts it back.

func (*FastRecordPool) Stats

func (p *FastRecordPool) Stats() Stats

Stats returns current statistics for the pool

type MetricsCollector

type MetricsCollector interface {
	RecordConnectionAcquisition(duration time.Duration)
	RecordConnectionValidation(success bool, duration time.Duration)
	RecordQueryExecution(query string, duration time.Duration, success bool)
	UpdateActiveConnections(count int)
	IncrementCircuitBreakerTrip()
	IncrementRetryAttempt()
}

MetricsCollector interface for collecting pool metrics.

type PoolStats

type PoolStats struct {
	OpenConnections   int           `json:"open_connections"`
	InUse             int           `json:"in_use"`
	Idle              int           `json:"idle"`
	WaitCount         int64         `json:"wait_count"`
	WaitDuration      time.Duration `json:"wait_duration"`
	MaxIdleClosed     int64         `json:"max_idle_closed"`
	MaxLifetimeClosed int64         `json:"max_lifetime_closed"`
	LastHealthCheck   time.Time     `json:"last_health_check"`
	HealthCheckStatus string        `json:"health_check_status"`
}

PoolStats represents connection pool statistics.

type QueryLogger

type QueryLogger struct {
	// contains filtered or unexported fields
}

QueryLogger logs slow queries and query statistics.

func NewQueryLogger

func NewQueryLogger(logger zerolog.Logger, threshold time.Duration, enabled bool) *QueryLogger

NewQueryLogger creates a new query logger.

func (*QueryLogger) LogQuery

func (ql *QueryLogger) LogQuery(query string, duration time.Duration, err error)

LogQuery logs query execution details.

type SchemaCache

type SchemaCache struct {
	// contains filtered or unexported fields
}

SchemaCache is an O(1) LRU cache (pointer‑keyed, thread‑safe).

func NewSchemaCache

func NewSchemaCache(max int) *SchemaCache

NewSchemaCache returns a cache with a given maximum size (>0).

func (*SchemaCache) Clear

func (c *SchemaCache) Clear()

Clear empties the cache.

func (*SchemaCache) Get

func (c *SchemaCache) Get(s *arrow.Schema) (*arrow.Schema, bool)

Get returns the cached entry (updates LRU & stats) or nil/false.

func (*SchemaCache) Put

func (c *SchemaCache) Put(s *arrow.Schema)

Put inserts the schema or refreshes its position if already cached.

func (*SchemaCache) Size

func (c *SchemaCache) Size() int

Size returns the current number of cached schemas.

func (*SchemaCache) Stats

func (c *SchemaCache) Stats() CacheStats

Stats gathers statistics (O(n), called rarely).

type Stats

type Stats struct {
	Hits   int64
	Misses int64
	Allocs int64
}

Stats returns current pool statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL