postgres

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2026 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Package postgres is celeris's native PostgreSQL driver. It speaks the PostgreSQL v3 wire protocol directly on top of the celeris event loop, exposes a database/sql-compatible driver, and also provides a lower-level worker-affinity Pool for callers that want to skip database/sql overhead.

There are two entry points. Use database/sql for portability (works with any ORM); use the direct Pool for peak throughput.

// database/sql — registers itself under DriverName in init.
import _ "github.com/goceleris/celeris/driver/postgres"
db, err := sql.Open(postgres.DriverName, "postgres://app:pass@localhost/mydb?sslmode=disable")

// Direct Pool — optionally bound to a celeris.Server event loop.
pool, err := postgres.Open(dsn, postgres.WithEngine(srv))

Open accepts both the libpq URL form and the key=value DSN form. Pool behavior is tuned with functional Option values: WithEngine, WithMaxOpen, WithMaxIdlePerWorker, WithMaxLifetime, WithMaxIdleTime, WithHealthCheck, WithStatementCacheSize, and WithApplication. WithEngine is optional and affects performance only: when set, the pool shares the HTTP server's event loop instead of resolving a standalone one. WithWorker adds a per-call worker-affinity hint to a context.

Query with Pool.QueryContext (returns *Rows), Pool.QueryRow (returns *Row), or Pool.ExecContext (returns Result). Transactions come from Pool.BeginTx, which returns a *Tx with Tx.Commit, Tx.Rollback, and savepoint helpers (Tx.Savepoint, Tx.ReleaseSavepoint, Tx.RollbackToSavepoint). Bulk loads use Pool.CopyFrom (feed rows via CopyFromSource or CopyFromSlice) and Pool.CopyTo.

Server errors surface as *PGError (alias of protocol.PGError) carrying the SQLSTATE code; match with errors.As. Package sentinels include ErrPoolClosed, ErrClosed, ErrBadConn, ErrSSLNotSupported, ErrUnsupportedAuth, and ErrResultTooBig. Custom type codecs can be registered with protocol.RegisterType.

Current limitations: TLS is not yet supported (use sslmode=disable behind a VPC/loopback/sidecar); authentication is limited to trust, cleartext, MD5, and SCRAM-SHA-256 without channel binding; there is no LISTEN/NOTIFY, pipeline mode, or replica routing.

Documentation

Full guides and examples: https://goceleris.dev/docs/data-stores

Index

Examples

Constants

View Source
const DriverName = "celeris-postgres"

DriverName is the name under which the driver is registered with database/sql. Use it in sql.Open: sql.Open(postgres.DriverName, dsn).

Variables

View Source
var ErrBadConn = errors.New("celeris-postgres: bad connection")

ErrBadConn is returned when a connection is in an unusable state; returning it causes database/sql to evict the conn and open a fresh one.

View Source
var ErrClosed = errors.New("celeris-postgres: connection closed")

ErrClosed is returned from operations on a closed pgConn.

View Source
var ErrDirectModeUnsupported = errors.New("celeris-postgres: LISTEN/UNLISTEN/NOTIFY are not supported in direct mode; use a non-async engine pool or a dedicated listener conn")

ErrDirectModeUnsupported is returned by operations that rely on unsolicited server messages between queries — LISTEN/UNLISTEN/NOTIFY (NotificationResponse) most notably. Direct-mode conns dial a plain net.TCPConn and drive reads from the caller goroutine only during an active query; between queries no reader is active, so any async-delivered message is silently dropped. COPY FROM/TO is supported via a short-lived per-call reader goroutine; only the persistent-listener class of operations returns this error.

Workarounds: open the pool against an engine with AsyncHandlers=false (drivers will pick the mini-loop path which has an always-on reader), or use a separate non-pooled listener conn dedicated to LISTEN/NOTIFY in a non-async configuration.

View Source
var ErrNoLastInsertID = errors.New("celeris-postgres: LastInsertId is not supported; use RETURNING")

ErrNoLastInsertID is returned from pgResult.LastInsertId. PG exposes sequence values via RETURNING rather than a generic last-insert-id.

View Source
var ErrNoLastInsertId = ErrNoLastInsertID //nolint:revive // backward-compat alias

ErrNoLastInsertId is a deprecated alias retained for API compatibility.

Deprecated: use ErrNoLastInsertID instead.

View Source
var ErrPoolClosed = errors.New("celeris-postgres: pool is closed")

ErrPoolClosed is returned from Pool methods after Close has been called.

View Source
var ErrResultTooBig = errors.New("celeris-postgres: query result exceeds direct-mode buffer cap (64 MiB); paginate or use streaming mode")

ErrResultTooBig is returned when a query's cumulative DataRow payload exceeds the direct-mode buffering cap (maxDirectResultBytes, 64 MiB). Direct mode pins syncMode=true and cannot lazily promote to streaming, so huge SELECTs would otherwise balloon per-conn memory. To iterate large result sets, use a non-direct pool (set up with a non-async engine via WithEngine) which supports lazy streaming, or paginate the query with LIMIT/OFFSET or cursor-based paging.

View Source
var ErrSSLNotSupported = errors.New("celeris-postgres: TLS/SSL is not yet supported; use sslmode=disable for VPC/loopback deployments, or terminate TLS at a sidecar / VPC boundary")

ErrSSLNotSupported is returned from Connect / Open when the DSN requests sslmode=require, verify-ca, or verify-full. Plaintext (sslmode=disable or prefer) is the only mode the driver currently supports.

View Source
var ErrUnsupportedAuth = errors.New("celeris-postgres: unsupported authentication method")

ErrUnsupportedAuth is returned when the server demands an authentication method this driver cannot fulfill (for example GSS, SSPI, or a SASL mechanism other than SCRAM-SHA-256).

Functions

func WithWorker

func WithWorker(ctx context.Context, workerID int) context.Context

WithWorker returns ctx with a worker-affinity hint. Pool.Acquire uses the hint as the preferred worker index.

Types

type Conn

type Conn = pgConn

Conn is the public alias of the driver's underlying connection type. It is exposed so that callers using database/sql can reach celeris-specific connection features (Savepoint, ReleaseSavepoint, RollbackToSavepoint, ServerParam) via sql.Conn.Raw:

db.Conn(ctx, func(c *sql.Conn) error {
    return c.Raw(func(dc any) error {
        pc := dc.(*postgres.Conn)
        return pc.Savepoint(ctx, "sp1")
    })
})

The type is otherwise opaque; methods intended for external use are defined on *pgConn in conn.go.

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector is a database/sql/driver.Connector that opens celeris-backed PostgreSQL connections. It implements driver.Connector and additionally offers a WithEngine method that rebinds the connector to a running celeris.Server's event loop.

func NewConnector

func NewConnector(dsn string) (*Connector, error)

NewConnector parses dsn and returns a Connector. Callers typically reach this via sql.OpenDB(postgres.NewConnector(dsn)) or via sql.Open, which calls Driver.OpenConnector.

func (*Connector) Close

func (c *Connector) Close() error

Close releases the standalone event loop if this connector owns one. database/sql calls Close via DB.Close.

func (*Connector) Connect

func (c *Connector) Connect(ctx context.Context) (driver.Conn, error)

Connect returns a new driver.Conn. database/sql's pool reuses this connector across all pooled conns, so we resolve the provider once and share it for the connector's lifetime.

func (*Connector) Driver

func (c *Connector) Driver() driver.Driver

Driver returns the singleton Driver.

func (*Connector) WithEngine

func (c *Connector) WithEngine(sp eventloop.ServerProvider) *Connector

WithEngine rebinds this Connector to sp's event loop. The returned Connector shares the DSN but replaces the provider; it does not own the loop (so Close does not tear it down).

type CopyFromSource

type CopyFromSource interface {
	Next() bool
	Values() ([]any, error)
	Err() error
}

CopyFromSource is the interface consumed by Pool.CopyFrom. It is modeled after pgx.CopyFromSource: Next advances one row at a time, Values returns the current row's column values, and Err surfaces any iteration error.

Implementations are driven synchronously on the caller's goroutine; no concurrent access is expected. Returned slices may be reused between iterations — the driver consumes them before calling Next again.

func CopyFromSlice

func CopyFromSlice(rows [][]any) CopyFromSource

CopyFromSlice wraps a [][]any fixture as a CopyFromSource. Useful for tests and for callers that already have their rows materialised in memory.

type DSN

type DSN struct {
	Host     string
	Port     string
	User     string
	Password string
	Database string
	// Params is the set of server-side startup parameters (e.g. "search_path",
	// "timezone") that will be sent in the StartupMessage.
	Params  map[string]string
	Options Options
}

DSN is the parsed form of a PostgreSQL connection string.

func ParseDSN

func ParseDSN(raw string) (DSN, error)

ParseDSN parses either the URL form ("postgres://user:pass@host:port/db?...") or the key=value form ("host=... port=... user=...") and returns a populated DSN. Unknown keys are carried as server-side startup parameters rather than rejected; the PG server validates them during the StartupMessage exchange.

func (*DSN) CheckSSL

func (d *DSN) CheckSSL() error

CheckSSL returns ErrSSLNotSupported if the DSN requests a TLS mode this driver version cannot satisfy.

In v1.4.0 the driver has no TLS stack. sslmode semantics:

  • "" / "disable" : plaintext, always allowed.
  • "prefer" / "allow" : the libpq semantics are "try TLS, fall back to plaintext" (prefer) or "try plaintext, upgrade if the server demands TLS" (allow). Since we can only ever do plaintext, these are treated as equivalent to "disable" but WITH a warning — the user expected opportunistic encryption and got none.
  • "require" / "verify-ca" / "verify-full" : TLS is mandatory; rejected with ErrSSLNotSupported. Users with managed cloud DBs (RDS, CloudSQL, etc.) need to terminate TLS at a sidecar / VPC boundary until first-class TLS support lands.

type Driver

type Driver struct{}

Driver implements database/sql/driver.Driver for PostgreSQL on top of the celeris event loop.

func (*Driver) Open

func (d *Driver) Open(name string) (driver.Conn, error)

Open parses name as a DSN and returns a single driver.Conn. Callers generally prefer sql.Open + database/sql's built-in pooling over calling this directly.

func (*Driver) OpenConnector

func (d *Driver) OpenConnector(name string) (driver.Connector, error)

OpenConnector parses name and returns a Connector. database/sql calls this on sql.Open; the returned Connector is used for every Conn the pool dials.

type Option

type Option func(*options)

Option configures Open.

func WithApplication

func WithApplication(name string) Option

WithApplication sets the "application_name" startup parameter.

func WithEngine

func WithEngine(sp eventloop.ServerProvider) Option

WithEngine routes pool connections through the event loop of a running celeris.Server. When unset, Open resolves a standalone loop.

func WithHealthCheck

func WithHealthCheck(d time.Duration) Option

WithHealthCheck sets the background sweep interval. Zero disables it.

func WithMaxIdlePerWorker

func WithMaxIdlePerWorker(n int) Option

WithMaxIdlePerWorker bounds each worker's idle list. Default: 2.

func WithMaxIdleTime

func WithMaxIdleTime(d time.Duration) Option

WithMaxIdleTime sets the max idle duration. Default: 5m.

func WithMaxLifetime

func WithMaxLifetime(d time.Duration) Option

WithMaxLifetime sets the max age of a pooled conn. Default: 30m.

func WithMaxOpen

func WithMaxOpen(n int) Option

WithMaxOpen sets the total connection cap. Default: NumWorkers * 4.

func WithStatementCacheSize

func WithStatementCacheSize(n int) Option

WithStatementCacheSize sets the per-conn prepared-statement LRU capacity.

type Options

type Options struct {
	// ConnectTimeout bounds dial + startup. Zero means no client-side timeout.
	ConnectTimeout time.Duration
	// StatementCacheSize is the per-conn LRU capacity for named prepared
	// statements. Zero disables the cache.
	StatementCacheSize int
	// AutoCacheStatements, when true AND StatementCacheSize > 0, causes
	// cacheable SELECT-style QueryContext calls to transparently auto-
	// prepare on first use and reuse via the extended protocol on
	// subsequent calls. Equivalent to pgx's QueryExecModeCacheStatement
	// at steady state. Default true; opt out per DSN with
	// auto_cache_statements=false (preserves the simple-query behaviour
	// for callers that do not want per-conn statement caching).
	AutoCacheStatements bool

	// Application is copied into the "application_name" startup parameter.
	Application string
	// SSLMode is the parsed sslmode value. "disable" and "prefer" are
	// accepted; "require" / "verify-ca" / "verify-full" are rejected at Open
	// time with ErrSSLNotSupported.
	SSLMode string
	// contains filtered or unexported fields
}

Options holds client-side driver knobs parsed out of the DSN before they are passed to the server as startup parameters.

type PGError

type PGError = protocol.PGError

PGError re-exports the server-side ErrorResponse type so callers can type-assert without importing the protocol package.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is an alternative to database/sql.DB that keeps connections pinned to worker-affinity slots. It is usable from any goroutine but routes idle connections back to the worker that created them, so a handler running on worker N preferentially acquires conns whose event-loop callbacks land on worker N.

func Open

func Open(dsnStr string, opts ...Option) (*Pool, error)

Open opens a worker-affinity pool. The DSN is parsed once here; connect errors surface on the first Acquire.

Example

ExampleOpen shows the database/sql entry point: import the driver for its side-effect (the init registers "celeris-postgres") and sql.Open a DSN. database/sql owns the pool and the driver runs against a package-level standalone event loop.

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"
)

func main() {
	// The blank import at package scope is what registers the driver.
	db, err := sql.Open("celeris-postgres",
		"postgres://app:secret@localhost:5432/mydb?sslmode=disable&application_name=svc")
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = db.Close() }()

	ctx := context.Background()
	var greeting string
	if err := db.QueryRowContext(ctx, "SELECT 'hello, world'").Scan(&greeting); err != nil {
		log.Fatal(err)
	}
	fmt.Println(greeting)
}
Example (WithEngine)

ExampleOpen_withEngine shows the direct Pool path. Opening with postgres.WithEngine binds the pool to a running celeris.Server's event loop so driver FDs are serviced by the same epoll/io_uring instance as the HTTP workers, and per-worker idle conns match the HTTP handler's CPU affinity.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/goceleris/celeris/driver/postgres"
)

func main() {
	// In real code, pass postgres.WithEngine(srv) where srv is a
	// *celeris.Server that has already been Start()-ed. Doing so binds
	// the pool to the HTTP event loop so driver FDs and HTTP workers
	// share the same epoll/io_uring instance.
	pool, err := postgres.Open(
		"postgres://app:secret@localhost:5432/mydb?sslmode=disable",
		postgres.WithMaxOpen(32),
		postgres.WithStatementCacheSize(512),
		postgres.WithApplication("api"),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = pool.Close() }()

	ctx := context.Background()
	rows, err := pool.QueryContext(ctx,
		"SELECT id, name FROM users WHERE tenant = $1", 42)
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = rows.Close() }()

	for rows.Next() {
		var id int
		var name string
		if err := rows.Scan(&id, &name); err != nil {
			log.Fatal(err)
		}
		fmt.Printf("id=%d name=%s\n", id, name)
	}
	if err := rows.Err(); err != nil {
		log.Fatal(err)
	}
}

func (*Pool) BeginTx

func (p *Pool) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error)

BeginTx acquires a conn, issues BEGIN, and returns a Tx that pins the conn until Commit/Rollback.

Example

ExamplePool_BeginTx wraps a pair of updates in a serializable transaction and rolls back on any error.

package main

import (
	"context"
	"database/sql"
	"log"
	"time"

	"github.com/goceleris/celeris/driver/postgres"
)

func main() {
	pool, err := postgres.Open("postgres://app:secret@localhost/mydb?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = pool.Close() }()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	tx, err := pool.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
	if err != nil {
		log.Fatal(err)
	}
	if _, err := tx.ExecContext(ctx,
		"UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, "a"); err != nil {
		_ = tx.Rollback()
		log.Fatal(err)
	}
	if _, err := tx.ExecContext(ctx,
		"UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100, "b"); err != nil {
		_ = tx.Rollback()
		log.Fatal(err)
	}
	if err := tx.Commit(); err != nil {
		log.Fatal(err)
	}
}

func (*Pool) Close

func (p *Pool) Close() error

Close closes the pool and releases the event loop if it was owned. Safe to call concurrently with Acquire — Acquire returns ErrPoolClosed once Close has set the closed flag.

func (*Pool) CopyFrom

func (p *Pool) CopyFrom(ctx context.Context, tableName string, columns []string, src CopyFromSource) (int64, error)

CopyFrom streams rows from src to the server via COPY FROM STDIN. Returns the number of rows imported (parsed from the server's CommandComplete tag) or any transport / iteration error. Column names may be empty to default to the table's natural column order.

Example

ExamplePool_CopyFrom bulk-loads a []row fixture into a table via PG's COPY FROM STDIN protocol. For large, live-generated streams, implement postgres.CopyFromSource directly instead of materialising to a slice.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/goceleris/celeris/driver/postgres"
)

func main() {
	pool, err := postgres.Open("postgres://app:secret@localhost/mydb?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = pool.Close() }()

	ctx := context.Background()
	rows := [][]any{
		{1, "alice", true},
		{2, "bob", false},
		{3, "carol", true},
	}
	n, err := pool.CopyFrom(ctx, "users", []string{"id", "name", "active"},
		postgres.CopyFromSlice(rows))
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("inserted %d rows\n", n)
}

func (*Pool) CopyTo

func (p *Pool) CopyTo(ctx context.Context, query string, dest func(row []byte) error) error

CopyTo executes query (typically "COPY <table> TO STDOUT ...") and invokes dest for each row. The dest slice is freshly allocated per call, so callers are free to retain it without copying. Aborting mid-stream by returning a non-nil error from dest terminates the copy — the server continues to stream bytes until ReadyForQuery, so the conn remains usable on return.

func (*Pool) ExecContext

func (p *Pool) ExecContext(ctx context.Context, query string, args ...any) (Result, error)

ExecContext runs a statement and returns a Result.

func (*Pool) IdleConnWorkers

func (p *Pool) IdleConnWorkers() []int

IdleConnWorkers returns the Worker() IDs of every currently-idle connection across all worker slots. Same worker ID may appear multiple times when the slot holds more than one idle conn. In-use conns do not appear.

Intended for tests and introspection asserting that per-CPU affinity is actually honored by the dial path — callers should not use it for load balancing decisions.

func (*Pool) Ping

func (p *Pool) Ping(ctx context.Context) error

Ping acquires a conn, pings, and returns it.

func (*Pool) QueryContext

func (p *Pool) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)

QueryContext runs query on an acquired conn, returns a Rows wrapper that returns the conn to the pool when closed.

func (*Pool) QueryRow

func (p *Pool) QueryRow(ctx context.Context, query string, args ...any) *Row

QueryRow executes a query expected to return at most one row. The returned Row is always non-nil; errors are deferred until Row.Scan.

func (*Pool) Stats

func (p *Pool) Stats() async.PoolStats

Stats returns a snapshot of pool occupancy.

type PoolConfig

type PoolConfig struct {
	DSN                string
	MaxOpen            int
	MaxIdlePerWorker   int
	MaxLifetime        time.Duration
	MaxIdleTime        time.Duration
	HealthCheck        time.Duration
	StatementCacheSize int
	Application        string
}

PoolConfig controls the worker-affinity pool returned by Open.

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result is a pool-level Result that implements database/sql/driver.Result.

func (Result) LastInsertId

func (r Result) LastInsertId() (int64, error)

LastInsertId is not supported by PostgreSQL. Use RETURNING instead.

func (Result) RowsAffected

func (r Result) RowsAffected() (int64, error)

RowsAffected returns the number of affected rows parsed from the CommandComplete tag.

type Row

type Row struct {
	// contains filtered or unexported fields
}

Row is the result of calling Pool.QueryRow. It wraps a single-row query with automatic Close semantics, matching database/sql.Row.

func (*Row) Err

func (r *Row) Err() error

Err returns the error, if any, encountered during the query. Unlike Row.Scan, it does not close the underlying Rows.

func (*Row) Scan

func (r *Row) Scan(dest ...any) error

Scan copies the columns from the single result row into dest. If the query returned no rows, Scan returns sql.ErrNoRows. The underlying Rows is closed automatically.

type Rows

type Rows struct {
	// contains filtered or unexported fields
}

Rows is the pool-level rows wrapper. It returns the underlying conn to the pool when Close is called. The iteration API matches database/sql.Rows:

for rows.Next() {
    if err := rows.Scan(&id, &name); err != nil { ... }
}
if err := rows.Err(); err != nil { ... }

func (*Rows) Close

func (r *Rows) Close() error

Close releases resources and returns the conn to the pool.

func (*Rows) Columns

func (r *Rows) Columns() []string

Columns returns column names.

func (*Rows) Err

func (r *Rows) Err() error

Err returns the first non-EOF error encountered during iteration.

func (*Rows) Next

func (r *Rows) Next() bool

Next advances the cursor to the next row. It returns false when no more rows are available or an error occurred. After Next returns false, call Rows.Err to distinguish normal end-of-data from an error.

Fast path: when inner is *pgRows (the production path), Next captures the raw wire bytes + per-column codecs without boxing into driver.Value. Scan() then decodes directly into user pointers, saving one heap alloc per non-zero-size cell (int64 / time.Time etc. would otherwise escape via interface conversion).

func (*Rows) Scan

func (r *Rows) Scan(dest ...any) error

Scan copies the columns from the current row into dest. Each dest must be a pointer. On the fast path (inner is *pgRows), Scan decodes directly from the wire bytes into dest without ever materialising a driver.Value — zero interface{} boxing per cell.

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

Tx is the pool-level transaction wrapper.

func (*Tx) Commit

func (t *Tx) Commit() error

Commit commits the transaction and returns the conn to the pool. On failure, done is left false so a deferred Rollback can still fire.

func (*Tx) ExecContext

func (t *Tx) ExecContext(ctx context.Context, query string, args ...any) (Result, error)

ExecContext runs a statement on the pinned conn.

func (*Tx) QueryContext

func (t *Tx) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)

QueryContext runs a query on the pinned conn. The returned Rows must be closed before Commit/Rollback.

func (*Tx) QueryRow

func (t *Tx) QueryRow(ctx context.Context, query string, args ...any) *Row

QueryRow executes a query expected to return at most one row on the pinned conn. The returned Row is always non-nil; errors are deferred until Row.Scan.

func (*Tx) ReleaseSavepoint

func (t *Tx) ReleaseSavepoint(ctx context.Context, name string) error

ReleaseSavepoint issues RELEASE SAVEPOINT <name>. Same name rules as Savepoint.

func (*Tx) Rollback

func (t *Tx) Rollback() error

Rollback rolls back the transaction and returns the conn to the pool.

func (*Tx) RollbackToSavepoint

func (t *Tx) RollbackToSavepoint(ctx context.Context, name string) error

RollbackToSavepoint issues ROLLBACK TO SAVEPOINT <name>. Same name rules as Savepoint.

func (*Tx) Savepoint

func (t *Tx) Savepoint(ctx context.Context, name string) error

Savepoint issues SAVEPOINT <name> inside this transaction. name must match [A-Za-z0-9_]+; anything else is rejected before the wire write to avoid SQL-injection.

Example

ExampleTx_Savepoint demonstrates partial rollback via savepoints inside a larger transaction.

package main

import (
	"context"
	"log"

	"github.com/goceleris/celeris/driver/postgres"
)

func main() {
	pool, err := postgres.Open("postgres://app:secret@localhost/mydb?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = pool.Close() }()

	ctx := context.Background()
	tx, err := pool.BeginTx(ctx, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer func() { _ = tx.Rollback() }()

	if _, err := tx.ExecContext(ctx, "INSERT INTO jobs(name) VALUES ('always')"); err != nil {
		log.Fatal(err)
	}

	if err := tx.Savepoint(ctx, "maybe"); err != nil {
		log.Fatal(err)
	}
	_, err = tx.ExecContext(ctx, "INSERT INTO jobs(name) VALUES ('might-fail')")
	if err != nil {
		// Undo just the 'might-fail' insert; the 'always' row survives.
		if rbErr := tx.RollbackToSavepoint(ctx, "maybe"); rbErr != nil {
			log.Fatal(rbErr)
		}
	} else {
		if err := tx.ReleaseSavepoint(ctx, "maybe"); err != nil {
			log.Fatal(err)
		}
	}

	if err := tx.Commit(); err != nil {
		log.Fatal(err)
	}
}

Directories

Path Synopsis
Package protocol implements the PostgreSQL v3 wire protocol framing and per-type encode/decode tables used by the celeris native PostgreSQL driver.
Package protocol implements the PostgreSQL v3 wire protocol framing and per-type encode/decode tables used by the celeris native PostgreSQL driver.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL