postgres

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Package postgres is celeris's native PostgreSQL driver. It speaks the PostgreSQL v3 wire protocol directly on top of the celeris event loop, exposes a database/sql compatible driver, and also provides a lower-level worker-affinity Pool for callers that want to skip database/sql overhead.

Usage

Two entry points are supported. Which one to pick depends on whether the caller wants portability (database/sql + any ORM) or peak throughput (direct Pool bound to a celeris.Server).

(a) database/sql

import (
    "database/sql"
    _ "github.com/goceleris/celeris/driver/postgres"
)

db, err := sql.Open("celeris-postgres", "postgres://app:pass@localhost/mydb?sslmode=disable")

database/sql owns the pool in this mode. The driver registers itself under DriverName in its init. Every *sql.Conn handed out by db is a pgConn running on a standalone event loop (one is resolved and reference-counted inside driver/internal/eventloop).

(b) Direct Pool

pool, err := postgres.Open(dsn, postgres.WithEngine(srv))
defer pool.Close()

rows, err := pool.QueryContext(ctx, "SELECT id, name FROM users WHERE tenant = $1", tenantID)

The direct Pool pins each connection to a worker (see WithWorker for the context-based hint API), so a handler running on worker N preferentially re-uses conns whose event-loop callbacks also land on worker N. When WithEngine is passed, the Pool shares the same epoll/io_uring instance as the HTTP workers; otherwise a standalone driver loop is resolved.

DSNs

Both the libpq URL form and the key=value form are accepted.

URL form:

postgres://user:pass@host:5432/dbname?sslmode=disable&application_name=svc&connect_timeout=5

Key=value form:

host=localhost port=5432 user=app password=secret dbname=mydb sslmode=disable

Recognized DSN keys: host, port, user, password, dbname / database, sslmode, connect_timeout (seconds), statement_cache_size, application_name. Any other key is forwarded to the server as a StartupMessage parameter (so search_path, timezone, statement_timeout, and similar GUCs can be set at connect time).

Options

Pool knobs are supplied as functional options to Open:

Transactions

Pool.BeginTx opens a Tx that pins its connection until Tx.Commit or Tx.Rollback is called. Passing *sql.TxOptions selects the isolation level (Read Uncommitted / Read Committed / Repeatable Read / Serializable) and the read-only flag, all folded into a single BEGIN round trip.

Savepoints are reachable three ways:

  1. (*postgres.Tx).Savepoint / ReleaseSavepoint / RollbackToSavepoint on the direct Pool transaction.

  2. database/sql via sql.Conn.Raw + the exported Conn alias:

    conn, _ := db.Conn(ctx) defer conn.Close() _ = conn.Raw(func(raw any) error { pc := raw.(*postgres.Conn) return pc.Savepoint(ctx, "sp1") })

  3. Raw simple queries ("SAVEPOINT sp1") issued through Exec on a transaction; the first two forms are preferred because they validate the name.

Savepoint names must match [A-Za-z0-9_]+; other identifiers are rejected before the wire write to avoid SQL-injection.

Bulk COPY

Pool.CopyFrom streams rows into a table via COPY FROM STDIN. Callers implement CopyFromSource (or use CopyFromSlice for in-memory fixtures) to feed rows. Pool.CopyTo runs COPY ... TO STDOUT and invokes a callback for each raw row. Both use PG's text format with tab separators and backslash escaping.

Pool configurations

database/sql and postgres.Pool each maintain their own set of connections. Opening a sql.DB with sql.Open("celeris-postgres", dsn) and separately calling postgres.Open(dsn) does not share pool state — configure sql.DB.SetMaxOpenConns on the former and WithMaxOpen on the latter independently.

Type support

The encoder/decoder understands the following server OIDs out of the box (see postgres/protocol for the full codec table):

bool, int2, int4, int8, float4, float8, text, varchar, bytea, uuid,
date, timestamp, timestamptz, numeric, json, jsonb, and the one-
dimensional array variants of those types (_bool, _int4, _text, ...).

Go type mappings:

bool                   ↔ bool
int2/int4/int8         ↔ int64 (accepts int, int32 at encode)
float4/float8          ↔ float64
text/varchar           ↔ string
bytea                  ↔ []byte
uuid                   ↔ []byte (16 bytes)
date/timestamp/tstz    ↔ time.Time
numeric                ↔ string (use strconv or math/big to convert)
json/jsonb             ↔ []byte or string
arrays                 ↔ []T of the element's Go type

Any argument that implements database/sql/driver.Valuer is resolved to its underlying value before encoding. Any destination that implements database/sql.Scanner receives the raw bytes. Custom types can be plugged in via protocol.RegisterType.

Errors

Server-side errors surface as *PGError (a re-export of protocol.PGError). The struct carries the five-character SQLSTATE code, the short message, and any optional fields the server attached (detail, hint, constraint name, etc.). Sentinels are wrapped so errors.Is and errors.As work as expected:

var pgErr *postgres.PGError
if errors.As(err, &pgErr) && pgErr.Code == "23505" { ... }

Other package-level sentinels: ErrPoolClosed, ErrClosed, ErrBadConn, ErrSSLNotSupported, ErrUnsupportedAuth.

Query cancellation

Canceling a context aborts the in-flight query by dialing a short-lived side connection and sending the PostgreSQL v3 CancelRequest packet (protocol code 80877102). The client blocks until the server drains the rest of the original query's response — cancellation is cooperative, not instantaneous.

WithEngine and standalone operation

WithEngine is optional. When omitted, Open resolves a standalone event loop backed by the platform's best mechanism (epoll on Linux, goroutine- per-conn on Darwin). The standalone loop is reference-counted inside driver/internal/eventloop and shared across all drivers that omit WithEngine. Correctness is identical with or without WithEngine; the difference is performance: sharing the HTTP server's event loop improves data locality and saves one epoll/uring syscall per I/O. Expect ~5-20% lower latency for serial queries when WithEngine is used.

database/sql mode (sql.Open) always uses the standalone loop. For optimal performance, prefer the direct Pool with WithEngine(srv).

Rows iteration

Pool.QueryContext returns a *Rows value. Iterate with Next + Scan:

rows, err := pool.QueryContext(ctx, "SELECT id, name FROM users")
if err != nil { return err }
defer rows.Close()
for rows.Next() {
    var id int64
    var name string
    if err := rows.Scan(&id, &name); err != nil { return err }
    // process id, name
}
if err := rows.Err(); err != nil {
    return err // check Err() after the loop
}

Always check Rows.Err after the loop — it surfaces errors from the underlying query that were deferred until iteration completed (e.g. cancellation, network errors, or server-side errors on large result sets).

Known limitations (v1.4.0)

  • TLS is not supported in v1.4.0. sslmode=require, verify-ca, and verify-full are rejected at Open time with ErrSSLNotSupported. Deploy over VPC, loopback, or a sidecar TLS terminator. TLS support is planned for v1.4.x.
  • Result sets are fully buffered before Rows.Next returns — there is no true row-by-row streaming. Callers with large result sets should page via LIMIT/OFFSET or a server-side DECLARE CURSOR inside a transaction.
  • Authentication supports trust, cleartext, MD5, and SCRAM-SHA-256 (without channel binding). GSS, SSPI, Kerberos, and SCRAM-SHA-256-PLUS are not implemented.
  • No LISTEN / NOTIFY. Asynchronous NotificationResponse frames received from the server are silently dropped by the dispatcher.
  • COPY IN / COPY OUT is exposed via Pool.CopyFrom / Pool.CopyTo on the direct Pool API. database/sql has no analogous surface; callers who need COPY must use the direct Pool.
  • No PG 14+ pipeline mode (Execute chaining).
  • No cluster- or replica-routing logic; callers with a read replica should open a second Pool pointed at it.

Index

Examples

Constants

View Source
const DriverName = "celeris-postgres"

DriverName is the name under which the driver is registered with database/sql. Use it in sql.Open: sql.Open(postgres.DriverName, dsn).

Variables

View Source
var ErrBadConn = errors.New("celeris-postgres: bad connection")

ErrBadConn is returned when a connection is in an unusable state; returning it causes database/sql to evict the conn and open a fresh one.

View Source
var ErrClosed = errors.New("celeris-postgres: connection closed")

ErrClosed is returned from operations on a closed pgConn.

View Source
var ErrDirectModeUnsupported = errors.New("celeris-postgres: LISTEN/UNLISTEN/NOTIFY are not supported in direct mode; use a non-async engine pool or a dedicated listener conn")

ErrDirectModeUnsupported is returned by operations that rely on unsolicited server messages between queries — LISTEN/UNLISTEN/NOTIFY (NotificationResponse) most notably. Direct-mode conns dial a plain net.TCPConn and drive reads from the caller goroutine only during an active query; between queries no reader is active, so any async-delivered message is silently dropped. COPY FROM/TO is supported via a short-lived per-call reader goroutine; only the persistent-listener class of operations returns this error.

Workarounds: open the pool against an engine with AsyncHandlers=false (drivers will pick the mini-loop path which has an always-on reader), or use a separate non-pooled listener conn dedicated to LISTEN/NOTIFY in a non-async configuration.

View Source
var ErrNoLastInsertID = errors.New("celeris-postgres: LastInsertId is not supported; use RETURNING")

ErrNoLastInsertID is returned from pgResult.LastInsertId. PG exposes sequence values via RETURNING rather than a generic last-insert-id.

View Source
var ErrNoLastInsertId = ErrNoLastInsertID //nolint:revive // backward-compat alias

ErrNoLastInsertId is a deprecated alias retained for API compatibility.

Deprecated: use ErrNoLastInsertID instead.

View Source
var ErrPoolClosed = errors.New("celeris-postgres: pool is closed")

ErrPoolClosed is returned from Pool methods after Close has been called.

View Source
var ErrResultTooBig = errors.New("celeris-postgres: query result exceeds direct-mode buffer cap (64 MiB); paginate or use streaming mode")

ErrResultTooBig is returned when a query's cumulative DataRow payload exceeds the direct-mode buffering cap (maxDirectResultBytes, 64 MiB). Direct mode pins syncMode=true and cannot lazily promote to streaming, so huge SELECTs would otherwise balloon per-conn memory. To iterate large result sets, use a non-direct pool (set up with a non-async engine via WithEngine) which supports lazy streaming, or paginate the query with LIMIT/OFFSET or cursor-based paging.

View Source
var ErrSSLNotSupported = errors.New("celeris-postgres: TLS/SSL not supported in v1.4.0; use sslmode=disable for VPC/loopback deployments or wait for v1.4.x TLS support")

ErrSSLNotSupported is returned from Connect / Open when the DSN requests sslmode=require, verify-ca, or verify-full. Plaintext (sslmode=disable or prefer) is the only supported mode in v1.4.0.

View Source
var ErrUnsupportedAuth = errors.New("celeris-postgres: unsupported authentication method")

ErrUnsupportedAuth is returned when the server demands an authentication method this driver cannot fulfill (for example GSS, SSPI, or a SASL mechanism other than SCRAM-SHA-256).

Functions

func WithWorker

func WithWorker(ctx context.Context, workerID int) context.Context

WithWorker returns ctx with a worker-affinity hint. Pool.Acquire uses the hint as the preferred worker index.

Types

type Conn

type Conn = pgConn

Conn is the public alias of the driver's underlying connection type. It is exposed so that callers using database/sql can reach celeris-specific connection features (Savepoint, ReleaseSavepoint, RollbackToSavepoint, ServerParam) via sql.Conn.Raw:

db.Conn(ctx, func(c *sql.Conn) error {
    return c.Raw(func(dc any) error {
        pc := dc.(*postgres.Conn)
        return pc.Savepoint(ctx, "sp1")
    })
})

The type is otherwise opaque; methods intended for external use are defined on *pgConn in conn.go.

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector is a database/sql/driver.Connector that opens celeris-backed PostgreSQL connections. It implements driver.Connector and additionally offers a WithEngine method that rebinds the connector to a running celeris.Server's event loop.

func NewConnector

func NewConnector(dsn string) (*Connector, error)

NewConnector parses dsn and returns a Connector. Callers typically reach this via sql.OpenDB(postgres.NewConnector(dsn)) or via sql.Open, which calls Driver.OpenConnector.

func (*Connector) Close

func (c *Connector) Close() error

Close releases the standalone event loop if this connector owns one. database/sql calls Close via DB.Close.

func (*Connector) Connect

func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)

Connect returns a new driver.Conn. database/sql's pool reuses this connector across all pooled conns, so we resolve the provider once and share it for the connector's lifetime.

func (*Connector) Driver

func (c *Connector) Driver() driver.Driver

Driver returns the singleton Driver.

func (*Connector) WithEngine

func (c *Connector) WithEngine(sp eventloop.ServerProvider) *Connector

WithEngine rebinds this Connector to sp's event loop. The returned Connector shares the DSN but replaces the provider; it does not own the loop (so Close does not tear it down).

type CopyFromSource

type CopyFromSource interface {
	Next() bool
	Values() ([]any, error)
	Err() error
}

CopyFromSource is the interface consumed by Pool.CopyFrom. It is modeled after pgx.CopyFromSource: Next advances one row at a time, Values returns the current row's column values, and Err surfaces any iteration error.

Implementations are driven synchronously on the caller's goroutine; no concurrent access is expected. Returned slices may be reused between iterations — the driver consumes them before calling Next again.

func CopyFromSlice

func CopyFromSlice(rows [][]any) CopyFromSource

CopyFromSlice wraps a [][]any fixture as a CopyFromSource. Useful for tests and for callers that already have their rows materialised in memory.

type DSN

type DSN struct {
	Host     string
	Port     string
	User     string
	Password string
	Database string
	// Params is the set of server-side startup parameters (e.g. "search_path",
	// "timezone") that will be sent in the StartupMessage.
	Params  map[string]string
	Options Options
}

DSN is the parsed form of a PostgreSQL connection string.

func ParseDSN

func ParseDSN(raw string) (DSN, error)

ParseDSN parses either the URL form ("postgres://user:pass@host:port/db?...") or the key=value form ("host=... port=... user=...") and returns a populated DSN. Unknown keys are carried as server-side startup parameters rather than rejected; the PG server validates them during the StartupMessage exchange.

func (*DSN) CheckSSL

func (d *DSN) CheckSSL() error

CheckSSL returns ErrSSLNotSupported if the DSN requests a TLS mode this driver version cannot satisfy.

In v1.4.0 the driver has no TLS stack. sslmode semantics:

  • "" / "disable" : plaintext, always allowed.
  • "prefer" / "allow" : the libpq semantics are "try TLS, fall back to plaintext" (prefer) or "try plaintext, upgrade if the server demands TLS" (allow). Since we can only ever do plaintext, these are treated as equivalent to "disable" but WITH a warning — the user expected opportunistic encryption and got none.
  • "require" / "verify-ca" / "verify-full" : TLS is mandatory; rejected with ErrSSLNotSupported. Users with managed cloud DBs (RDS, CloudSQL, etc.) must wait for v1.4.x TLS support.

type Driver

type Driver struct{}

Driver implements database/sql/driver.Driver for PostgreSQL on top of the celeris event loop.

func (*Driver) Open

func (d *Driver) Open(name string) (driver.Conn, error)

Open parses name as a DSN and returns a single driver.Conn. Callers generally prefer sql.Open + database/sql's built-in pooling over calling this directly.

func (*Driver) OpenConnector

func (d *Driver) OpenConnector(name string) (driver.Connector, error)

OpenConnector parses name and returns a Connector. database/sql calls this on sql.Open; the returned Connector is used for every Conn the pool dials.

type Option

type Option func(*options)

Option configures Open.

func WithApplication

func WithApplication(name string) Option

WithApplication sets the "application_name" startup parameter.

func WithEngine

func WithEngine(sp eventloop.ServerProvider) Option

WithEngine routes pool connections through the event loop of a running celeris.Server. When unset, Open resolves a standalone loop.

func WithHealthCheck

func WithHealthCheck(d time.Duration) Option

WithHealthCheck sets the background sweep interval. Zero disables it.

func WithMaxIdlePerWorker

func WithMaxIdlePerWorker(n int) Option

WithMaxIdlePerWorker bounds each worker's idle list. Default: 2.

func WithMaxIdleTime

func WithMaxIdleTime(d time.Duration) Option

WithMaxIdleTime sets the max idle duration. Default: 5m.

func WithMaxLifetime

func WithMaxLifetime(d time.Duration) Option

WithMaxLifetime sets the max age of a pooled conn. Default: 30m.

func WithMaxOpen

func WithMaxOpen(n int) Option

WithMaxOpen sets the total connection cap. Default: NumWorkers * 4.

func WithStatementCacheSize

func WithStatementCacheSize(n int) Option

WithStatementCacheSize sets the per-conn prepared-statement LRU capacity.

type Options

type Options struct {
	// ConnectTimeout bounds dial + startup. Zero means no client-side timeout.
	ConnectTimeout time.Duration
	// StatementCacheSize is the per-conn LRU capacity for named prepared
	// statements. Zero disables the cache.
	StatementCacheSize int
	// AutoCacheStatements, when true AND StatementCacheSize > 0, causes
	// cacheable SELECT-style QueryContext calls to transparently auto-
	// prepare on first use and reuse via the extended protocol on
	// subsequent calls. Default false preserves the legacy simple-query
	// behaviour for queries without an explicit db.Prepare. Equivalent to
	// pgx's QueryExecModeCacheStatement at steady state.
	AutoCacheStatements bool
	// Application is copied into the "application_name" startup parameter.
	Application string
	// SSLMode is the parsed sslmode value. "disable" and "prefer" are
	// accepted; "require" / "verify-ca" / "verify-full" are rejected at Open
	// time with ErrSSLNotSupported.
	SSLMode string
}

Options holds client-side driver knobs parsed out of the DSN before they are passed to the server as startup parameters.

type PGError

type PGError = protocol.PGError

PGError re-exports the server-side ErrorResponse type so callers can type-assert without importing the protocol package.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is an alternative to database/sql.DB that keeps connections pinned to worker-affinity slots. It is usable from any goroutine but routes idle connections back to the worker that created them, so a handler running on worker N preferentially acquires conns whose event-loop callbacks land on worker N.

func Open

func Open(dsnStr string, opts ...Option) (*Pool, error)

Open opens a worker-affinity pool. The DSN is parsed once here; connect errors surface on the first Acquire.

Example

ExampleOpen shows the database/sql entry point: import the driver for its side-effect (the init registers "celeris-postgres") and sql.Open a DSN. database/sql owns the pool and the driver runs against a package-level standalone event loop.

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"
)

func main() {
	// The blank import at package scope is what registers the driver.
	db, err := sql.Open("celeris-postgres",
		"postgres://app:secret@localhost:5432/mydb?sslmode=disable&application_name=svc")
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = db.Close() }()

	ctx := context.Background()
	var greeting string
	if err := db.QueryRowContext(ctx, "SELECT 'hello, world'").Scan(&greeting); err != nil {
		log.Fatal(err)
	}
	fmt.Println(greeting)
}
Example (WithEngine)

ExampleOpen_withEngine shows the direct Pool path. Opening with postgres.WithEngine binds the pool to a running celeris.Server's event loop so driver FDs are serviced by the same epoll/io_uring instance as the HTTP workers, and per-worker idle conns match the HTTP handler's CPU affinity.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/goceleris/celeris/driver/postgres"
)

func main() {
	// In real code, pass postgres.WithEngine(srv) where srv is a
	// *celeris.Server that has already been Start()-ed. Doing so binds
	// the pool to the HTTP event loop so driver FDs and HTTP workers
	// share the same epoll/io_uring instance.
	pool, err := postgres.Open(
		"postgres://app:secret@localhost:5432/mydb?sslmode=disable",
		postgres.WithMaxOpen(32),
		postgres.WithStatementCacheSize(512),
		postgres.WithApplication("api"),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = pool.Close() }()

	ctx := context.Background()
	rows, err := pool.QueryContext(ctx,
		"SELECT id, name FROM users WHERE tenant = $1", 42)
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = rows.Close() }()

	for rows.Next() {
		var id int
		var name string
		if err := rows.Scan(&id, &name); err != nil {
			log.Fatal(err)
		}
		fmt.Printf("id=%d name=%s\n", id, name)
	}
	if err := rows.Err(); err != nil {
		log.Fatal(err)
	}
}

func (*Pool) BeginTx

func (p *Pool) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error)

BeginTx acquires a conn, issues BEGIN, and returns a Tx that pins the conn until Commit/Rollback.

Example

ExamplePool_BeginTx wraps a pair of updates in a serializable transaction and rolls back on any error.

package main

import (
	"context"
	"database/sql"
	"log"
	"time"

	"github.com/goceleris/celeris/driver/postgres"
)

func main() {
	pool, err := postgres.Open("postgres://app:secret@localhost/mydb?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = pool.Close() }()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	tx, err := pool.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
	if err != nil {
		log.Fatal(err)
	}
	if _, err := tx.ExecContext(ctx,
		"UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, "a"); err != nil {
		_ = tx.Rollback()
		log.Fatal(err)
	}
	if _, err := tx.ExecContext(ctx,
		"UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100, "b"); err != nil {
		_ = tx.Rollback()
		log.Fatal(err)
	}
	if err := tx.Commit(); err != nil {
		log.Fatal(err)
	}
}

func (*Pool) Close

func (p *Pool) Close() error

Close closes the pool and releases the event loop if it was owned. Safe to call concurrently with Acquire — Acquire returns ErrPoolClosed once Close has set the closed flag.

func (*Pool) CopyFrom

func (p *Pool) CopyFrom(ctx context.Context, tableName string, columns []string, src CopyFromSource) (int64, error)

CopyFrom streams rows from src to the server via COPY FROM STDIN. Returns the number of rows imported (parsed from the server's CommandComplete tag) or any transport / iteration error. Column names may be empty to default to the table's natural column order.

Example

ExamplePool_CopyFrom bulk-loads a []row fixture into a table via PG's COPY FROM STDIN protocol. For large, live-generated streams, implement postgres.CopyFromSource directly instead of materialising to a slice.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/goceleris/celeris/driver/postgres"
)

func main() {
	pool, err := postgres.Open("postgres://app:secret@localhost/mydb?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = pool.Close() }()

	ctx := context.Background()
	rows := [][]any{
		{1, "alice", true},
		{2, "bob", false},
		{3, "carol", true},
	}
	n, err := pool.CopyFrom(ctx, "users", []string{"id", "name", "active"},
		postgres.CopyFromSlice(rows))
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("inserted %d rows\n", n)
}

func (*Pool) CopyTo

func (p *Pool) CopyTo(ctx context.Context, query string, dest func(row []byte) error) error

CopyTo executes query (typically "COPY <table> TO STDOUT ...") and invokes dest for each row. The dest slice is freshly allocated per call, so callers are free to retain it without copying. Aborting mid-stream by returning a non-nil error from dest terminates the copy — the server continues to stream bytes until ReadyForQuery, so the conn remains usable on return.

func (*Pool) ExecContext

func (p *Pool) ExecContext(ctx context.Context, query string, args ...any) (Result, error)

ExecContext runs a statement and returns a Result.

func (*Pool) IdleConnWorkers

func (p *Pool) IdleConnWorkers() []int

IdleConnWorkers returns the Worker() IDs of every currently-idle connection across all worker slots. Same worker ID may appear multiple times when the slot holds more than one idle conn. In-use conns do not appear.

Intended for tests and introspection asserting that per-CPU affinity is actually honored by the dial path — callers should not use it for load balancing decisions.

func (*Pool) Ping

func (p *Pool) Ping(ctx context.Context) error

Ping acquires a conn, pings, and returns it.

func (*Pool) QueryContext

func (p *Pool) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)

QueryContext runs query on an acquired conn, returns a Rows wrapper that returns the conn to the pool when closed.

func (*Pool) QueryRow

func (p *Pool) QueryRow(ctx context.Context, query string, args ...any) *Row

QueryRow executes a query expected to return at most one row. The returned Row is always non-nil; errors are deferred until Row.Scan.

func (*Pool) Stats

func (p *Pool) Stats() async.PoolStats

Stats returns a snapshot of pool occupancy.

type PoolConfig

type PoolConfig struct {
	DSN                string
	MaxOpen            int
	MaxIdlePerWorker   int
	MaxLifetime        time.Duration
	MaxIdleTime        time.Duration
	HealthCheck        time.Duration
	StatementCacheSize int
	Application        string
}

PoolConfig controls the worker-affinity pool returned by Open.

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result is a pool-level Result that implements database/sql/driver.Result.

func (Result) LastInsertId

func (r Result) LastInsertId() (int64, error)

LastInsertId is not supported by PostgreSQL. Use RETURNING instead.

func (Result) RowsAffected

func (r Result) RowsAffected() (int64, error)

RowsAffected returns the number of affected rows parsed from the CommandComplete tag.

type Row

type Row struct {
	// contains filtered or unexported fields
}

Row is the result of calling Pool.QueryRow. It wraps a single-row query with automatic Close semantics, matching database/sql.Row.

func (*Row) Err

func (r *Row) Err() error

Err returns the error, if any, encountered during the query. Unlike Row.Scan, it does not close the underlying Rows.

func (*Row) Scan

func (r *Row) Scan(dest ...any) error

Scan copies the columns from the single result row into dest. If the query returned no rows, Scan returns sql.ErrNoRows. The underlying Rows is closed automatically.

type Rows

type Rows struct {
	// contains filtered or unexported fields
}

Rows is the pool-level rows wrapper. It returns the underlying conn to the pool when Close is called. The iteration API matches database/sql.Rows:

for rows.Next() {
    if err := rows.Scan(&id, &name); err != nil { ... }
}
if err := rows.Err(); err != nil { ... }

func (*Rows) Close

func (r *Rows) Close() error

Close releases resources and returns the conn to the pool.

func (*Rows) Columns

func (r *Rows) Columns() []string

Columns returns column names.

func (*Rows) Err

func (r *Rows) Err() error

Err returns the first non-EOF error encountered during iteration.

func (*Rows) Next

func (r *Rows) Next() bool

Next advances the cursor to the next row. It returns false when no more rows are available or an error occurred. After Next returns false, call Rows.Err to distinguish normal end-of-data from an error.

Fast path: when inner is *pgRows (the production path), Next captures the raw wire bytes + per-column codecs without boxing into driver.Value. Scan() then decodes directly into user pointers, saving one heap alloc per non-zero-size cell (int64 / time.Time etc. would otherwise escape via interface conversion).

func (*Rows) Scan

func (r *Rows) Scan(dest ...any) error

Scan copies the columns from the current row into dest. Each dest must be a pointer. On the fast path (inner is *pgRows), Scan decodes directly from the wire bytes into dest without ever materialising a driver.Value — zero interface{} boxing per cell.

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

Tx is the pool-level transaction wrapper.

func (*Tx) Commit

func (t *Tx) Commit() error

Commit commits the transaction and returns the conn to the pool. On failure, done is left false so a deferred Rollback can still fire.

func (*Tx) ExecContext

func (t *Tx) ExecContext(ctx context.Context, query string, args ...any) (Result, error)

ExecContext runs a statement on the pinned conn.

func (*Tx) QueryContext

func (t *Tx) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)

QueryContext runs a query on the pinned conn. The returned Rows must be closed before Commit/Rollback.

func (*Tx) QueryRow

func (t *Tx) QueryRow(ctx context.Context, query string, args ...any) *Row

QueryRow executes a query expected to return at most one row on the pinned conn. The returned Row is always non-nil; errors are deferred until Row.Scan.

func (*Tx) ReleaseSavepoint

func (t *Tx) ReleaseSavepoint(ctx context.Context, name string) error

ReleaseSavepoint issues RELEASE SAVEPOINT <name>. Same name rules as Savepoint.

func (*Tx) Rollback

func (t *Tx) Rollback() error

Rollback rolls back the transaction and returns the conn to the pool.

func (*Tx) RollbackToSavepoint

func (t *Tx) RollbackToSavepoint(ctx context.Context, name string) error

RollbackToSavepoint issues ROLLBACK TO SAVEPOINT <name>. Same name rules as Savepoint.

func (*Tx) Savepoint

func (t *Tx) Savepoint(ctx context.Context, name string) error

Savepoint issues SAVEPOINT <name> inside this transaction. name must match [A-Za-z0-9_]+; anything else is rejected before the wire write to avoid SQL-injection.

Example

ExampleTx_Savepoint demonstrates partial rollback via savepoints inside a larger transaction.

package main

import (
	"context"
	"log"

	"github.com/goceleris/celeris/driver/postgres"
)

func main() {
	pool, err := postgres.Open("postgres://app:secret@localhost/mydb?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = pool.Close() }()

	ctx := context.Background()
	tx, err := pool.BeginTx(ctx, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = tx.Rollback() }()

	if _, err := tx.ExecContext(ctx, "INSERT INTO jobs(name) VALUES ('always')"); err != nil {
		log.Fatal(err)
	}

	if err := tx.Savepoint(ctx, "maybe"); err != nil {
		log.Fatal(err)
	}
	_, err = tx.ExecContext(ctx, "INSERT INTO jobs(name) VALUES ('might-fail')")
	if err != nil {
		// Undo just the 'might-fail' insert; the 'always' row survives.
		if rbErr := tx.RollbackToSavepoint(ctx, "maybe"); rbErr != nil {
			log.Fatal(rbErr)
		}
	} else {
		if err := tx.ReleaseSavepoint(ctx, "maybe"); err != nil {
			log.Fatal(err)
		}
	}

	if err := tx.Commit(); err != nil {
		log.Fatal(err)
	}
}

Directories

Path Synopsis
Package protocol implements the PostgreSQL v3 wire protocol framing and per-type encode/decode tables used by the celeris native PostgreSQL driver.
Package protocol implements the PostgreSQL v3 wire protocol framing and per-type encode/decode tables used by the celeris native PostgreSQL driver.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL