Documentation
¶
Overview ¶
Package pool provides database connection pooling for DuckDB.
Index ¶
- type CacheStats
- type CircuitBreaker
- type CircuitBreakerState
- type Config
- type ConnectionPool
- type ConnectionValidator
- type ConnectionWrapper
- type EnterpriseConnection
- type EnterprisePoolStats
- type EnterpriseStats
- type FastRecordPool
- type MetricsCollector
- type PoolStats
- type QueryLogger
- type SchemaCache
- type Stats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CacheStats ¶
CacheStats contains live statistics.
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern for connection failures.
func NewCircuitBreaker ¶
func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker.
func (*CircuitBreaker) CanExecute ¶
func (cb *CircuitBreaker) CanExecute() bool
CanExecute checks if the circuit breaker allows execution.
func (*CircuitBreaker) GetFailures ¶
func (cb *CircuitBreaker) GetFailures() int64
GetFailures returns the current failure count.
func (*CircuitBreaker) GetState ¶
func (cb *CircuitBreaker) GetState() CircuitBreakerState
GetState returns the current circuit breaker state.
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation.
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation.
type CircuitBreakerState ¶
type CircuitBreakerState int
CircuitBreakerState represents the state of the circuit breaker.
const ( CircuitBreakerClosed CircuitBreakerState = iota CircuitBreakerOpen CircuitBreakerHalfOpen )
func (CircuitBreakerState) String ¶
func (s CircuitBreakerState) String() string
type Config ¶
type Config struct {
DSN string `json:"dsn"`
MaxOpenConnections int `json:"max_open_connections"`
MaxIdleConnections int `json:"max_idle_connections"`
ConnMaxLifetime time.Duration `json:"conn_max_lifetime"`
ConnMaxIdleTime time.Duration `json:"conn_max_idle_time"`
HealthCheckPeriod time.Duration `json:"health_check_period"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
// Enterprise features
EnableCircuitBreaker bool `json:"enable_circuit_breaker"`
CircuitBreakerThreshold int `json:"circuit_breaker_threshold"`
CircuitBreakerTimeout time.Duration `json:"circuit_breaker_timeout"`
EnableConnectionRetry bool `json:"enable_connection_retry"`
MaxRetryAttempts int `json:"max_retry_attempts"`
RetryBackoffBase time.Duration `json:"retry_backoff_base"`
EnableSlowQueryLogging bool `json:"enable_slow_query_logging"`
SlowQueryThreshold time.Duration `json:"slow_query_threshold"`
EnableMetrics bool `json:"enable_metrics"`
MetricsNamespace string `json:"metrics_namespace"`
}
Config represents pool configuration.
type ConnectionPool ¶
type ConnectionPool interface {
// Get returns a database connection.
Get(ctx context.Context) (*sql.DB, error)
// GetWithValidation returns a validated database connection.
GetWithValidation(ctx context.Context) (*EnterpriseConnection, error)
// Stats returns pool statistics.
Stats() PoolStats
// EnterpriseStats returns comprehensive enterprise statistics.
EnterpriseStats() EnterprisePoolStats
// HealthCheck performs a health check on the pool.
HealthCheck(ctx context.Context) error
// Close closes the connection pool.
Close() error
// SetMetricsCollector sets the metrics collector.
SetMetricsCollector(collector MetricsCollector)
}
ConnectionPool manages database connections.
type ConnectionValidator ¶
type ConnectionValidator struct {
// contains filtered or unexported fields
}
ConnectionValidator validates database connections.
func NewConnectionValidator ¶
func NewConnectionValidator(logger zerolog.Logger, config Config) *ConnectionValidator
NewConnectionValidator creates a new connection validator.
func (*ConnectionValidator) ValidateConnection ¶
ValidateConnection performs comprehensive connection validation.
type ConnectionWrapper ¶
type ConnectionWrapper struct {
// contains filtered or unexported fields
}
ConnectionWrapper wraps a database connection with additional functionality.
func NewConnectionWrapper ¶
func NewConnectionWrapper(db *sql.DB, pool *connectionPool, logger zerolog.Logger) *ConnectionWrapper
NewConnectionWrapper creates a new connection wrapper.
type EnterpriseConnection ¶
type EnterpriseConnection struct {
// contains filtered or unexported fields
}
EnterpriseConnection wraps a database connection with enterprise features.
func NewEnterpriseConnection ¶
func NewEnterpriseConnection(db *sql.DB, pool *connectionPool) *EnterpriseConnection
NewEnterpriseConnection creates a new enterprise connection.
type EnterprisePoolStats ¶
type EnterprisePoolStats struct {
PoolStats
CircuitBreakerState string `json:"circuit_breaker_state"`
CircuitBreakerFailures int64 `json:"circuit_breaker_failures"`
TotalRetryAttempts int64 `json:"total_retry_attempts"`
SlowQueries int64 `json:"slow_queries"`
ValidationFailures int64 `json:"validation_failures"`
AverageAcquisitionTime time.Duration `json:"average_acquisition_time"`
PeakConnections int `json:"peak_connections"`
ConnectionErrors int64 `json:"connection_errors"`
}
EnterprisePoolStats provides comprehensive pool statistics.
type EnterpriseStats ¶
type EnterpriseStats struct {
// contains filtered or unexported fields
}
EnterpriseStats tracks comprehensive statistics.
func NewEnterpriseStats ¶
func NewEnterpriseStats() *EnterpriseStats
NewEnterpriseStats creates new enterprise statistics tracker.
func (*EnterpriseStats) GetAverageAcquisitionTime ¶
func (es *EnterpriseStats) GetAverageAcquisitionTime() time.Duration
GetAverageAcquisitionTime returns the average acquisition time.
func (*EnterpriseStats) RecordAcquisitionTime ¶
func (es *EnterpriseStats) RecordAcquisitionTime(duration time.Duration)
RecordAcquisitionTime records connection acquisition time.
type FastRecordPool ¶
type FastRecordPool struct {
// contains filtered or unexported fields
}
FastRecordPool is a concurrent, multi‑schema pool optimised for throughput.
func NewFastRecordPool ¶
func NewFastRecordPool(alloc memory.Allocator) *FastRecordPool
NewFastRecordPool constructs a pool; pass nil for the default Go allocator.
func (*FastRecordPool) Get ¶
func (p *FastRecordPool) Get(schema *arrow.Schema) arrow.Record
Get returns an empty record for the given schema.
func (*FastRecordPool) Put ¶
func (p *FastRecordPool) Put(rec arrow.Record)
Put resets the record to zero‑length and puts it back.
func (*FastRecordPool) Stats ¶
func (p *FastRecordPool) Stats() Stats
Stats returns current statistics for the pool
type MetricsCollector ¶
type MetricsCollector interface {
RecordConnectionAcquisition(duration time.Duration)
RecordConnectionValidation(success bool, duration time.Duration)
RecordQueryExecution(query string, duration time.Duration, success bool)
UpdateActiveConnections(count int)
IncrementCircuitBreakerTrip()
IncrementRetryAttempt()
}
MetricsCollector interface for collecting pool metrics.
type PoolStats ¶
type PoolStats struct {
OpenConnections int `json:"open_connections"`
InUse int `json:"in_use"`
Idle int `json:"idle"`
WaitCount int64 `json:"wait_count"`
WaitDuration time.Duration `json:"wait_duration"`
MaxIdleClosed int64 `json:"max_idle_closed"`
MaxLifetimeClosed int64 `json:"max_lifetime_closed"`
LastHealthCheck time.Time `json:"last_health_check"`
HealthCheckStatus string `json:"health_check_status"`
}
PoolStats represents connection pool statistics.
type QueryLogger ¶
type QueryLogger struct {
// contains filtered or unexported fields
}
QueryLogger logs slow queries and query statistics.
func NewQueryLogger ¶
NewQueryLogger creates a new query logger.
type SchemaCache ¶
type SchemaCache struct {
// contains filtered or unexported fields
}
SchemaCache is an O(1) LRU cache (pointer‑keyed, thread‑safe).
func NewSchemaCache ¶
func NewSchemaCache(max int) *SchemaCache
NewSchemaCache returns a cache with a given maximum size (>0).
func (*SchemaCache) Put ¶
func (c *SchemaCache) Put(s *arrow.Schema)
Put inserts the schema or refreshes its position if already cached.
func (*SchemaCache) Size ¶
func (c *SchemaCache) Size() int
Size returns the current number of cached schemas.
func (*SchemaCache) Stats ¶
func (c *SchemaCache) Stats() CacheStats
Stats gathers statistics (O(n), called rarely).