Documentation
¶
Overview ¶
Package async is internal infrastructure shared by the celeris PostgreSQL and Redis drivers. It is not a supported public API: the types exported here exist to let the two drivers share plumbing without forcing the user-facing packages to leak implementation details. No backwards-compatibility guarantees are made for imports from outside the celeris module.
What it provides ¶
Bridge: FIFO queue of in-flight [PendingRequest]s per connection. Drivers Enqueue before writing bytes to the wire and Pop as responses arrive. Responses-in-order is a property of the underlying protocols (Postgres v3, RESP).
Pool[C]: a generic worker-affinity connection pool. C is the driver's connection type (must satisfy Conn). Each pool keeps a per-worker idle list so handlers on worker N preferentially reuse conns whose event-loop callbacks run on worker N. Lifetime / idle / max-open are enforced by the pool.
Backoff: exponential-with-jitter delay computation for retry loops. Not safe for concurrent use.
Internal [health] checker scheduled by the pool.
Drivers build their user-facing API (postgres.Pool, redis.Client) by composing these primitives with a protocol-specific dispatch loop.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrPoolClosed = errors.New("celeris/async: pool is closed")
ErrPoolClosed is returned from Acquire once the pool has been closed.
var ErrPoolExhausted = errors.New("celeris/async: pool exhausted")
ErrPoolExhausted is returned when MaxOpen is reached and no slot becomes available before the context deadline.
Deprecated: Since the introduction of the wait-queue, Acquire blocks until a slot is available or the context expires (returning ctx.Err()). This sentinel is retained for backward compatibility but is no longer returned by Pool.Acquire.
Functions ¶
This section is empty.
Types ¶
type Backoff ¶
type Backoff struct {
// Base is the initial delay. Defaults to 50ms when <= 0.
Base time.Duration
// Cap is the maximum delay. Defaults to 5s when <= 0.
Cap time.Duration
// Jitter is the jitter factor in [0, 1]. A value of 0 disables jitter.
// Values above 1 are clamped to 1; negative values to 0.
Jitter float64
// contains filtered or unexported fields
}
Backoff computes exponentially-increasing delays for retry loops with optional multiplicative jitter.
The zero value is unusable: callers must set Base and Cap explicitly (or rely on NewBackoff). Jitter of zero disables jitter.
Backoff is not safe for concurrent use; it is intended to be owned by one retry loop at a time.
func NewBackoff ¶
NewBackoff returns a Backoff with a 0.2 jitter factor and the provided base/max (or the package defaults if either is <= 0).
type Bridge ¶
type Bridge struct {
// contains filtered or unexported fields
}
Bridge manages an ordered queue of pending requests for a single connection. The wire protocol must guarantee responses are delivered in request order (true for both Postgres and Redis on a single connection).
Bridge is safe for concurrent use: a writer goroutine enqueues requests while the reader goroutine pops them as responses arrive.
The backing store is a power-of-two ring buffer so steady-state enqueue/pop cycles at a bounded depth (the common case for pipelining) do not allocate — a plain append+reslice queue would re-grow its backing array every time Pop walked it forward past the capacity boundary.
func (*Bridge) DrainWithError ¶
func (b *Bridge) DrainWithError(err error, notify func(PendingRequest, error))
DrainWithError pops every pending request and invokes notify with the given error for each one. The queue is left empty on return. notify must not call back into Bridge on the same instance — do that from a separate goroutine if needed.
func (*Bridge) Enqueue ¶
func (b *Bridge) Enqueue(req PendingRequest)
Enqueue appends req to the back of the queue.
func (*Bridge) Head ¶
func (b *Bridge) Head() PendingRequest
Head returns the front-of-queue request without removing it, or nil if the queue is empty.
func (*Bridge) Pop ¶
func (b *Bridge) Pop() PendingRequest
Pop removes and returns the front-of-queue request, or nil if empty.
func (*Bridge) PopTail ¶
func (b *Bridge) PopTail() PendingRequest
PopTail removes and returns the back-of-queue request, or nil if empty. Drivers use this to unwind a request that was enqueued but whose wire bytes never reached the server (for example, when Write returned an error). Do NOT use it to pop arbitrary entries from the middle — the underlying protocols (Postgres, Redis) are FIFO, so out-of-order removal desyncs subsequent responses.
type Conn ¶
type Conn interface {
// Ping verifies the connection is usable. It is invoked by the
// periodic health check and on acquire when a lifetime is configured.
Ping(ctx context.Context) error
// Close releases any resources. Called at most once per connection.
Close() error
// Worker returns the engine worker index this connection is pinned to.
// Pool routes idle connections back to this worker's idle list.
Worker() int
// IsExpired reports whether this connection has exceeded MaxLifetime.
IsExpired(now time.Time) bool
// IsIdleTooLong reports whether this connection has been idle for
// longer than MaxIdleTime.
IsIdleTooLong(now time.Time) bool
}
Conn is the minimal interface a pooled connection must satisfy.
Implementations are expected to be goroutine-safe for Close; other methods are called serially by the pool while a single goroutine owns the connection.
type PendingRequest ¶
type PendingRequest interface {
// Ctx returns the request context. If the context is done before the
// response arrives, the bridge still leaves the request on the queue —
// cancellation is the driver's responsibility (e.g. Postgres
// CancelRequest on a side connection).
Ctx() context.Context
}
PendingRequest is any driver-specific request that expects a response on the wire. Implementations supply their own result plumbing (typically a doneCh channel owned by the driver); the bridge only manages the FIFO queue and completion notification on drain.
type Pool ¶
type Pool[C Conn] struct { // contains filtered or unexported fields }
Pool is a generic worker-affinity pool. Drivers supply a dial function and a conn type implementing Conn.
Each worker owns an idle list guarded by its own lock. When MaxOpen > 0 a buffered channel (sem) acts as a counting semaphore that limits concurrent in-use connections. Acquire blocks (respecting the context deadline) until a slot is available, matching database/sql.DB semantics.
func NewPool ¶
func NewPool[C Conn](cfg PoolConfig, dial func(ctx context.Context, workerID int) (C, error)) *Pool[C]
NewPool constructs a pool with the given configuration. It panics if NumWorkers is less than 1 or dial is nil.
func (*Pool[C]) Acquire ¶
Acquire returns a connection pinned to a worker. If workerHint is negative or >= NumWorkers, a round-robin worker is chosen.
When MaxOpen > 0 and all slots are in use, Acquire blocks until a slot is released or the context expires (returning ctx.Err()). The semaphore tracks in-use connections; idle connections do not hold slots.
func (*Pool[C]) Close ¶
Close closes every idle connection, stops the health check, and prevents further Acquire calls. In-flight connections held by drivers will be closed when they are Release'd back to the pool. Close closes all idle connections and marks the pool closed. In-flight connections — those currently held by callers outside the pool's idle lists — are NOT force-closed; when the caller returns them via Release, the pool will see its closed state and Discard the connection (TCP close). Callers that want a strict drain should stop issuing new queries and wait for in-flight work to complete before calling Close.
Any active database transactions on in-flight connections are aborted at the server side when the TCP connection is eventually closed (no ROLLBACK is sent). Callers that must guarantee transactional cleanup should issue Commit or Rollback before releasing the conn to the pool.
func (*Pool[C]) Discard ¶
func (p *Pool[C]) Discard(c C)
Discard permanently removes c from the pool: it is closed and openCount decremented. The semaphore slot is freed so another Acquire can proceed. Drivers call this on connection errors they cannot recover from.
func (*Pool[C]) IdleConnWorkers ¶
IdleConnWorkers returns the Worker() IDs of every currently-idle connection across all worker slots. The same worker ID may appear multiple times if the slot holds more than one idle conn. In-use conns do not appear — they are held by active callers and their Worker() is observable on the caller side. Useful for integration tests asserting that per-CPU affinity is actually being honored by the dial path.
type PoolConfig ¶
type PoolConfig struct {
// MaxOpen is the total connection cap across all workers.
// Zero means unlimited.
MaxOpen int
// MaxIdlePerWorker bounds the idle pool held on each worker. Connections
// beyond this limit on Release are closed.
MaxIdlePerWorker int
// MaxLifetime is the max age of a connection before it is evicted on
// release or health check. Zero disables lifetime expiry.
MaxLifetime time.Duration
// MaxIdleTime is the max time a connection can sit idle before
// eviction. Zero disables idle expiry.
MaxIdleTime time.Duration
// HealthCheck is the interval at which the background checker runs.
// Zero disables the checker.
HealthCheck time.Duration
// NumWorkers is the number of worker-affinity slots. Must be >= 1.
NumWorkers int
}
PoolConfig controls pool sizing and lifetime.
type PoolStats ¶
type PoolStats struct {
// Open is the total number of connections (idle + in-use).
Open int
// Idle is the count across all worker idle lists.
Idle int
// InUse is Open - Idle.
InUse int
// PerWorker holds per-worker breakdowns; len == NumWorkers.
PerWorker []PoolWorkerStats
}
PoolStats reports current pool occupancy.
type PoolWorkerStats ¶
type PoolWorkerStats struct {
Idle int
}
PoolWorkerStats reports per-worker occupancy.