Documentation
¶
Overview ¶
Package postgres is celeris's native PostgreSQL driver. It speaks the PostgreSQL v3 wire protocol directly on top of the celeris event loop, exposes a database/sql-compatible driver, and also provides a lower-level worker-affinity Pool for callers that want to skip database/sql overhead.
There are two entry points. Use database/sql for portability (works with any ORM); use the direct Pool for peak throughput.
// database/sql — registers itself under DriverName in init. import _ "github.com/goceleris/celeris/driver/postgres" db, err := sql.Open(postgres.DriverName, "postgres://app:pass@localhost/mydb?sslmode=disable") // Direct Pool — optionally bound to a celeris.Server event loop. pool, err := postgres.Open(dsn, postgres.WithEngine(srv))
Open accepts both the libpq URL form and the key=value DSN form. Pool behavior is tuned with functional Option values: WithEngine, WithMaxOpen, WithMaxIdlePerWorker, WithMaxLifetime, WithMaxIdleTime, WithHealthCheck, WithStatementCacheSize, and WithApplication. WithEngine is optional and affects performance only: when set, the pool shares the HTTP server's event loop instead of resolving a standalone one. WithWorker adds a per-call worker-affinity hint to a context.
Query with Pool.QueryContext (returns *Rows), Pool.QueryRow (returns *Row), or Pool.ExecContext (returns Result). Transactions come from Pool.BeginTx, which returns a *Tx with Tx.Commit, Tx.Rollback, and savepoint helpers (Tx.Savepoint, Tx.ReleaseSavepoint, Tx.RollbackToSavepoint). Bulk loads use Pool.CopyFrom (feed rows via CopyFromSource or CopyFromSlice) and Pool.CopyTo.
Server errors surface as *PGError (alias of protocol.PGError) carrying the SQLSTATE code; match with errors.As. Package sentinels include ErrPoolClosed, ErrClosed, ErrBadConn, ErrSSLNotSupported, ErrUnsupportedAuth, and ErrResultTooBig. Custom type codecs can be registered with protocol.RegisterType.
Current limitations: TLS is not yet supported (use sslmode=disable behind a VPC/loopback/sidecar); authentication is limited to trust, cleartext, MD5, and SCRAM-SHA-256 without channel binding; there is no LISTEN/NOTIFY, pipeline mode, or replica routing.
Documentation ¶
Full guides and examples: https://goceleris.dev/docs/data-stores
Index ¶
- Constants
- Variables
- func WithWorker(ctx context.Context, workerID int) context.Context
- type Conn
- type Connector
- type CopyFromSource
- type DSN
- type Driver
- type Option
- func WithApplication(name string) Option
- func WithEngine(sp eventloop.ServerProvider) Option
- func WithHealthCheck(d time.Duration) Option
- func WithMaxIdlePerWorker(n int) Option
- func WithMaxIdleTime(d time.Duration) Option
- func WithMaxLifetime(d time.Duration) Option
- func WithMaxOpen(n int) Option
- func WithStatementCacheSize(n int) Option
- type Options
- type PGError
- type Pool
- func (p *Pool) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error)
- func (p *Pool) Close() error
- func (p *Pool) CopyFrom(ctx context.Context, tableName string, columns []string, src CopyFromSource) (int64, error)
- func (p *Pool) CopyTo(ctx context.Context, query string, dest func(row []byte) error) error
- func (p *Pool) ExecContext(ctx context.Context, query string, args ...any) (Result, error)
- func (p *Pool) IdleConnWorkers() []int
- func (p *Pool) Ping(ctx context.Context) error
- func (p *Pool) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)
- func (p *Pool) QueryRow(ctx context.Context, query string, args ...any) *Row
- func (p *Pool) Stats() async.PoolStats
- type PoolConfig
- type Result
- type Row
- type Rows
- type Tx
- func (t *Tx) Commit() error
- func (t *Tx) ExecContext(ctx context.Context, query string, args ...any) (Result, error)
- func (t *Tx) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)
- func (t *Tx) QueryRow(ctx context.Context, query string, args ...any) *Row
- func (t *Tx) ReleaseSavepoint(ctx context.Context, name string) error
- func (t *Tx) Rollback() error
- func (t *Tx) RollbackToSavepoint(ctx context.Context, name string) error
- func (t *Tx) Savepoint(ctx context.Context, name string) error
Examples ¶
Constants ¶
const DriverName = "celeris-postgres"
DriverName is the name under which the driver is registered with database/sql. Use it in sql.Open: sql.Open(postgres.DriverName, dsn).
Variables ¶
var ErrBadConn = errors.New("celeris-postgres: bad connection")
ErrBadConn is returned when a connection is in an unusable state; returning it causes database/sql to evict the conn and open a fresh one.
var ErrClosed = errors.New("celeris-postgres: connection closed")
ErrClosed is returned from operations on a closed pgConn.
var ErrDirectModeUnsupported = errors.New("celeris-postgres: LISTEN/UNLISTEN/NOTIFY are not supported in direct mode; use a non-async engine pool or a dedicated listener conn")
ErrDirectModeUnsupported is returned by operations that rely on unsolicited server messages between queries — LISTEN/UNLISTEN/NOTIFY (NotificationResponse) most notably. Direct-mode conns dial a plain net.TCPConn and drive reads from the caller goroutine only during an active query; between queries no reader is active, so any async-delivered message is silently dropped. COPY FROM/TO is supported via a short-lived per-call reader goroutine; only the persistent-listener class of operations returns this error.
Workarounds: open the pool against an engine with AsyncHandlers=false (drivers will pick the mini-loop path which has an always-on reader), or use a separate non-pooled listener conn dedicated to LISTEN/NOTIFY in a non-async configuration.
var ErrNoLastInsertID = errors.New("celeris-postgres: LastInsertId is not supported; use RETURNING")
ErrNoLastInsertID is returned from pgResult.LastInsertId. PG exposes sequence values via RETURNING rather than a generic last-insert-id.
var ErrNoLastInsertId = ErrNoLastInsertID //nolint:revive // backward-compat alias
ErrNoLastInsertId is a deprecated alias retained for API compatibility.
Deprecated: use ErrNoLastInsertID instead.
var ErrPoolClosed = errors.New("celeris-postgres: pool is closed")
ErrPoolClosed is returned from Pool methods after Close has been called.
var ErrResultTooBig = errors.New("celeris-postgres: query result exceeds direct-mode buffer cap (64 MiB); paginate or use streaming mode")
ErrResultTooBig is returned when a query's cumulative DataRow payload exceeds the direct-mode buffering cap (maxDirectResultBytes, 64 MiB). Direct mode pins syncMode=true and cannot lazily promote to streaming, so huge SELECTs would otherwise balloon per-conn memory. To iterate large result sets, use a non-direct pool (set up with a non-async engine via WithEngine) which supports lazy streaming, or paginate the query with LIMIT/OFFSET or cursor-based paging.
var ErrSSLNotSupported = errors.New("celeris-postgres: TLS/SSL is not yet supported; use sslmode=disable for VPC/loopback deployments, or terminate TLS at a sidecar / VPC boundary")
ErrSSLNotSupported is returned from Connect / Open when the DSN requests sslmode=require, verify-ca, or verify-full. Plaintext (sslmode=disable or prefer) is the only mode the driver currently supports.
var ErrUnsupportedAuth = errors.New("celeris-postgres: unsupported authentication method")
ErrUnsupportedAuth is returned when the server demands an authentication method this driver cannot fulfill (for example GSS, SSPI, or a SASL mechanism other than SCRAM-SHA-256).
Functions ¶
Types ¶
type Conn ¶
type Conn = pgConn
Conn is the public alias of the driver's underlying connection type. It is exposed so that callers using database/sql can reach celeris-specific connection features (Savepoint, ReleaseSavepoint, RollbackToSavepoint, ServerParam) via sql.Conn.Raw:
db.Conn(ctx, func(c *sql.Conn) error {
return c.Raw(func(dc any) error {
pc := dc.(*postgres.Conn)
return pc.Savepoint(ctx, "sp1")
})
})
The type is otherwise opaque; methods intended for external use are defined on *pgConn in conn.go.
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector is a database/sql/driver.Connector that opens celeris-backed PostgreSQL connections. It implements driver.Connector and additionally offers a WithEngine method that rebinds the connector to a running celeris.Server's event loop.
func NewConnector ¶
NewConnector parses dsn and returns a Connector. Callers typically reach this via sql.OpenDB(postgres.NewConnector(dsn)) or via sql.Open, which calls Driver.OpenConnector.
func (*Connector) Close ¶
Close releases the standalone event loop if this connector owns one. database/sql calls Close via DB.Close.
func (*Connector) Connect ¶
Connect returns a new driver.Conn. database/sql's pool reuses this connector across all pooled conns, so we resolve the provider once and share it for the connector's lifetime.
func (*Connector) WithEngine ¶
func (c *Connector) WithEngine(sp eventloop.ServerProvider) *Connector
WithEngine rebinds this Connector to sp's event loop. The returned Connector shares the DSN but replaces the provider; it does not own the loop (so Close does not tear it down).
type CopyFromSource ¶
CopyFromSource is the interface consumed by Pool.CopyFrom. It is modeled after pgx.CopyFromSource: Next advances one row at a time, Values returns the current row's column values, and Err surfaces any iteration error.
Implementations are driven synchronously on the caller's goroutine; no concurrent access is expected. Returned slices may be reused between iterations — the driver consumes them before calling Next again.
func CopyFromSlice ¶
func CopyFromSlice(rows [][]any) CopyFromSource
CopyFromSlice wraps a [][]any fixture as a CopyFromSource. Useful for tests and for callers that already have their rows materialised in memory.
type DSN ¶
type DSN struct {
Host string
Port string
User string
Password string
Database string
// Params is the set of server-side startup parameters (e.g. "search_path",
// "timezone") that will be sent in the StartupMessage.
Params map[string]string
Options Options
}
DSN is the parsed form of a PostgreSQL connection string.
func ParseDSN ¶
ParseDSN parses either the URL form ("postgres://user:pass@host:port/db?...") or the key=value form ("host=... port=... user=...") and returns a populated DSN. Unknown keys are carried as server-side startup parameters rather than rejected; the PG server validates them during the StartupMessage exchange.
func (*DSN) CheckSSL ¶
CheckSSL returns ErrSSLNotSupported if the DSN requests a TLS mode this driver version cannot satisfy.
In v1.4.0 the driver has no TLS stack. sslmode semantics:
- "" / "disable" : plaintext, always allowed.
- "prefer" / "allow" : the libpq semantics are "try TLS, fall back to plaintext" (prefer) or "try plaintext, upgrade if the server demands TLS" (allow). Since we can only ever do plaintext, these are treated as equivalent to "disable" but WITH a warning — the user expected opportunistic encryption and got none.
- "require" / "verify-ca" / "verify-full" : TLS is mandatory; rejected with ErrSSLNotSupported. Users with managed cloud DBs (RDS, CloudSQL, etc.) need to terminate TLS at a sidecar / VPC boundary until first-class TLS support lands.
type Driver ¶
type Driver struct{}
Driver implements database/sql/driver.Driver for PostgreSQL on top of the celeris event loop.
type Option ¶
type Option func(*options)
Option configures Open.
func WithApplication ¶
WithApplication sets the "application_name" startup parameter.
func WithEngine ¶
func WithEngine(sp eventloop.ServerProvider) Option
WithEngine routes pool connections through the event loop of a running celeris.Server. When unset, Open resolves a standalone loop.
func WithHealthCheck ¶
WithHealthCheck sets the background sweep interval. Zero disables it.
func WithMaxIdlePerWorker ¶
WithMaxIdlePerWorker bounds each worker's idle list. Default: 2.
func WithMaxIdleTime ¶
WithMaxIdleTime sets the max idle duration. Default: 5m.
func WithMaxLifetime ¶
WithMaxLifetime sets the max age of a pooled conn. Default: 30m.
func WithMaxOpen ¶
WithMaxOpen sets the total connection cap. Default: NumWorkers * 4.
func WithStatementCacheSize ¶
WithStatementCacheSize sets the per-conn prepared-statement LRU capacity.
type Options ¶
type Options struct {
// ConnectTimeout bounds dial + startup. Zero means no client-side timeout.
ConnectTimeout time.Duration
// StatementCacheSize is the per-conn LRU capacity for named prepared
// statements. Zero disables the cache.
StatementCacheSize int
// AutoCacheStatements, when true AND StatementCacheSize > 0, causes
// cacheable SELECT-style QueryContext calls to transparently auto-
// prepare on first use and reuse via the extended protocol on
// subsequent calls. Equivalent to pgx's QueryExecModeCacheStatement
// at steady state. Default true; opt out per DSN with
// auto_cache_statements=false (preserves the simple-query behaviour
// for callers that do not want per-conn statement caching).
AutoCacheStatements bool
// Application is copied into the "application_name" startup parameter.
Application string
// SSLMode is the parsed sslmode value. "disable" and "prefer" are
// accepted; "require" / "verify-ca" / "verify-full" are rejected at Open
// time with ErrSSLNotSupported.
SSLMode string
// contains filtered or unexported fields
}
Options holds client-side driver knobs parsed out of the DSN before they are passed to the server as startup parameters.
type PGError ¶
PGError re-exports the server-side ErrorResponse type so callers can type-assert without importing the protocol package.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is an alternative to database/sql.DB that keeps connections pinned to worker-affinity slots. It is usable from any goroutine but routes idle connections back to the worker that created them, so a handler running on worker N preferentially acquires conns whose event-loop callbacks land on worker N.
func Open ¶
Open opens a worker-affinity pool. The DSN is parsed once here; connect errors surface on the first Acquire.
Example ¶
ExampleOpen shows the database/sql entry point: import the driver for its side-effect (the init registers "celeris-postgres") and sql.Open a DSN. database/sql owns the pool and the driver runs against a package-level standalone event loop.
package main
import (
"context"
"database/sql"
"fmt"
"log"
)
func main() {
// The blank import at package scope is what registers the driver.
db, err := sql.Open("celeris-postgres",
"postgres://app:secret@localhost:5432/mydb?sslmode=disable&application_name=svc")
if err != nil {
log.Fatal(err)
}
defer func() { _ = db.Close() }()
ctx := context.Background()
var greeting string
if err := db.QueryRowContext(ctx, "SELECT 'hello, world'").Scan(&greeting); err != nil {
log.Fatal(err)
}
fmt.Println(greeting)
}
Output:
Example (WithEngine) ¶
ExampleOpen_withEngine shows the direct Pool path. Opening with postgres.WithEngine binds the pool to a running celeris.Server's event loop so driver FDs are serviced by the same epoll/io_uring instance as the HTTP workers, and per-worker idle conns match the HTTP handler's CPU affinity.
package main
import (
"context"
"fmt"
"log"
"github.com/goceleris/celeris/driver/postgres"
)
func main() {
// In real code, pass postgres.WithEngine(srv) where srv is a
// *celeris.Server that has already been Start()-ed. Doing so binds
// the pool to the HTTP event loop so driver FDs and HTTP workers
// share the same epoll/io_uring instance.
pool, err := postgres.Open(
"postgres://app:secret@localhost:5432/mydb?sslmode=disable",
postgres.WithMaxOpen(32),
postgres.WithStatementCacheSize(512),
postgres.WithApplication("api"),
)
if err != nil {
log.Fatal(err)
}
defer func() { _ = pool.Close() }()
ctx := context.Background()
rows, err := pool.QueryContext(ctx,
"SELECT id, name FROM users WHERE tenant = $1", 42)
if err != nil {
log.Fatal(err)
}
defer func() { _ = rows.Close() }()
for rows.Next() {
var id int
var name string
if err := rows.Scan(&id, &name); err != nil {
log.Fatal(err)
}
fmt.Printf("id=%d name=%s\n", id, name)
}
if err := rows.Err(); err != nil {
log.Fatal(err)
}
}
Output:
func (*Pool) BeginTx ¶
BeginTx acquires a conn, issues BEGIN, and returns a Tx that pins the conn until Commit/Rollback.
Example ¶
ExamplePool_BeginTx wraps a pair of updates in a serializable transaction and rolls back on any error.
package main
import (
"context"
"database/sql"
"log"
"time"
"github.com/goceleris/celeris/driver/postgres"
)
func main() {
pool, err := postgres.Open("postgres://app:secret@localhost/mydb?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer func() { _ = pool.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
tx, err := pool.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
log.Fatal(err)
}
if _, err := tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, "a"); err != nil {
_ = tx.Rollback()
log.Fatal(err)
}
if _, err := tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100, "b"); err != nil {
_ = tx.Rollback()
log.Fatal(err)
}
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
}
Output:
func (*Pool) Close ¶
Close closes the pool and releases the event loop if it was owned. Safe to call concurrently with Acquire — Acquire returns ErrPoolClosed once Close has set the closed flag.
func (*Pool) CopyFrom ¶
func (p *Pool) CopyFrom(ctx context.Context, tableName string, columns []string, src CopyFromSource) (int64, error)
CopyFrom streams rows from src to the server via COPY FROM STDIN. Returns the number of rows imported (parsed from the server's CommandComplete tag) or any transport / iteration error. Column names may be empty to default to the table's natural column order.
Example ¶
ExamplePool_CopyFrom bulk-loads a []row fixture into a table via PG's COPY FROM STDIN protocol. For large, live-generated streams, implement postgres.CopyFromSource directly instead of materialising to a slice.
package main
import (
"context"
"fmt"
"log"
"github.com/goceleris/celeris/driver/postgres"
)
func main() {
pool, err := postgres.Open("postgres://app:secret@localhost/mydb?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer func() { _ = pool.Close() }()
ctx := context.Background()
rows := [][]any{
{1, "alice", true},
{2, "bob", false},
{3, "carol", true},
}
n, err := pool.CopyFrom(ctx, "users", []string{"id", "name", "active"},
postgres.CopyFromSlice(rows))
if err != nil {
log.Fatal(err)
}
fmt.Printf("inserted %d rows\n", n)
}
Output:
func (*Pool) CopyTo ¶
CopyTo executes query (typically "COPY <table> TO STDOUT ...") and invokes dest for each row. The dest slice is freshly allocated per call, so callers are free to retain it without copying. Aborting mid-stream by returning a non-nil error from dest terminates the copy — the server continues to stream bytes until ReadyForQuery, so the conn remains usable on return.
func (*Pool) ExecContext ¶
ExecContext runs a statement and returns a Result.
func (*Pool) IdleConnWorkers ¶
IdleConnWorkers returns the Worker() IDs of every currently-idle connection across all worker slots. Same worker ID may appear multiple times when the slot holds more than one idle conn. In-use conns do not appear.
Intended for tests and introspection asserting that per-CPU affinity is actually honored by the dial path — callers should not use it for load balancing decisions.
func (*Pool) QueryContext ¶
QueryContext runs query on an acquired conn, returns a Rows wrapper that returns the conn to the pool when closed.
type PoolConfig ¶
type PoolConfig struct {
DSN string
MaxOpen int
MaxIdlePerWorker int
MaxLifetime time.Duration
MaxIdleTime time.Duration
HealthCheck time.Duration
StatementCacheSize int
Application string
}
PoolConfig controls the worker-affinity pool returned by Open.
type Result ¶
type Result struct {
// contains filtered or unexported fields
}
Result is a pool-level Result that implements database/sql/driver.Result.
func (Result) LastInsertId ¶
LastInsertId is not supported by PostgreSQL. Use RETURNING instead.
func (Result) RowsAffected ¶
RowsAffected returns the number of affected rows parsed from the CommandComplete tag.
type Row ¶
type Row struct {
// contains filtered or unexported fields
}
Row is the result of calling Pool.QueryRow. It wraps a single-row query with automatic Close semantics, matching database/sql.Row.
type Rows ¶
type Rows struct {
// contains filtered or unexported fields
}
Rows is the pool-level rows wrapper. It returns the underlying conn to the pool when Close is called. The iteration API matches database/sql.Rows:
for rows.Next() {
if err := rows.Scan(&id, &name); err != nil { ... }
}
if err := rows.Err(); err != nil { ... }
func (*Rows) Next ¶
Next advances the cursor to the next row. It returns false when no more rows are available or an error occurred. After Next returns false, call Rows.Err to distinguish normal end-of-data from an error.
Fast path: when inner is *pgRows (the production path), Next captures the raw wire bytes + per-column codecs without boxing into driver.Value. Scan() then decodes directly into user pointers, saving one heap alloc per non-zero-size cell (int64 / time.Time etc. would otherwise escape via interface conversion).
type Tx ¶
type Tx struct {
// contains filtered or unexported fields
}
Tx is the pool-level transaction wrapper.
func (*Tx) Commit ¶
Commit commits the transaction and returns the conn to the pool. On failure, done is left false so a deferred Rollback can still fire.
func (*Tx) ExecContext ¶
ExecContext runs a statement on the pinned conn.
func (*Tx) QueryContext ¶
QueryContext runs a query on the pinned conn. The returned Rows must be closed before Commit/Rollback.
func (*Tx) QueryRow ¶
QueryRow executes a query expected to return at most one row on the pinned conn. The returned Row is always non-nil; errors are deferred until Row.Scan.
func (*Tx) ReleaseSavepoint ¶
ReleaseSavepoint issues RELEASE SAVEPOINT <name>. Same name rules as Savepoint.
func (*Tx) RollbackToSavepoint ¶
RollbackToSavepoint issues ROLLBACK TO SAVEPOINT <name>. Same name rules as Savepoint.
func (*Tx) Savepoint ¶
Savepoint issues SAVEPOINT <name> inside this transaction. name must match [A-Za-z0-9_]+; anything else is rejected before the wire write to avoid SQL-injection.
Example ¶
ExampleTx_Savepoint demonstrates partial rollback via savepoints inside a larger transaction.
package main
import (
"context"
"log"
"github.com/goceleris/celeris/driver/postgres"
)
func main() {
pool, err := postgres.Open("postgres://app:secret@localhost/mydb?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer func() { _ = pool.Close() }()
ctx := context.Background()
tx, err := pool.BeginTx(ctx, nil)
if err != nil {
log.Fatal(err)
}
defer func() { _ = tx.Rollback() }()
if _, err := tx.ExecContext(ctx, "INSERT INTO jobs(name) VALUES ('always')"); err != nil {
log.Fatal(err)
}
if err := tx.Savepoint(ctx, "maybe"); err != nil {
log.Fatal(err)
}
_, err = tx.ExecContext(ctx, "INSERT INTO jobs(name) VALUES ('might-fail')")
if err != nil {
// Undo just the 'might-fail' insert; the 'always' row survives.
if rbErr := tx.RollbackToSavepoint(ctx, "maybe"); rbErr != nil {
log.Fatal(rbErr)
}
} else {
if err := tx.ReleaseSavepoint(ctx, "maybe"); err != nil {
log.Fatal(err)
}
}
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
}
Output:
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package protocol implements the PostgreSQL v3 wire protocol framing and per-type encode/decode tables used by the celeris native PostgreSQL driver.
|
Package protocol implements the PostgreSQL v3 wire protocol framing and per-type encode/decode tables used by the celeris native PostgreSQL driver. |