pgz

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBatchAborted = fmt.Errorf("pgz: batch aborted by earlier item")

ErrBatchAborted is returned from BatchResults.Exec / Query for items that follow one that errored.

View Source
var ErrResponseTooLarge = errors.New("pgz: response exceeded configured cap")

ErrResponseTooLarge is the sentinel returned (wrapped in *ResponseTooLargeError) when MaxResponseBytes or MaxResponseRows is hit. Use errors.Is to detect it.

Functions

func DriverValueDecoder

func DriverValueDecoder(oid uint32, format int16) func([]byte) any

DriverValueDecoder returns a decoder that converts the raw wire bytes of a cell to the canonical driver.Value that database/sql expects: int64 / float64 / bool / []byte / string / time.Time, or nil on NULL. Used by the pgz/stdlib adapter; callers of the native API typically do not need this.

func ScanStruct

func ScanStruct[T any](c *Client, ctx context.Context, sql string, args ...any) ([]T, error)

ScanStruct runs sql and materialises each row into a value of type T. Field-to-column mapping uses the `pgz:"col_name"` struct tag when present, otherwise the field name case-insensitively. Unmapped columns are silently skipped; unmapped fields stay zero-valued.

Spike scope — explicitly narrow:

  • Supported cell types: bool, int2/4/8, float4/8, text/varchar/bpchar, uuid, bytea, jsonb (into []byte / json.RawMessage), date, timestamp/timestamptz (into time.Time).
  • NULL: only meaningful when the target field is a pointer. Non-pointer fields stay zero on a NULL cell (lenient; not an error).
  • No arrays, no sql.NullXxx, no custom sql.Scanner, no embedded structs.
  • No INSERT/UPDATE/DELETE — this remains a SELECT-only driver.

Those limits are what keeps the hot path allocation-free and the code tractable. If a query column does not map cleanly to its target field, the call returns an error at the first row rather than silently coercing.

func ScanStructBatched

func ScanStructBatched[T any](
	c *Client, ctx context.Context,
	batchSize int,
	cb func(batch []T) error,
	sql string, args ...any,
) error

ScanStructBatched is the memory-bounded variant of ScanStruct.

Instead of materialising the full result set as []T, it calls cb with a batch slice of up to batchSize rows, waits for cb to return, then reuses the slice for the next batch. Memory use is O(batchSize * sizeof(T)) regardless of the total row count — use it for queries that would return millions of rows (audit logs, analytics exports, bulk processing).

The slice passed to cb aliases the internal buffer and is valid only until cb returns. If cb needs to retain rows beyond that, it must copy them. Returning a non-nil error from cb aborts the scan: pgz issues a server-side CancelRequest and drains to ReadyForQuery so the connection is reusable.

Returns io.EOF from cb for an early stop? No — return a distinguished sentinel if you want to stop without reporting an error. Any non-nil error is surfaced to the caller.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch accumulates queries to be flushed to the server in a single round-trip.

b := pgz.NewBatch()
b.Queue("UPDATE users SET last_seen = now() WHERE id = $1", 42)
b.Queue("INSERT INTO audit (user_id, action) VALUES ($1, $2)", 42, "login")
br := c.SendBatch(ctx, b)
for range b.Len() { _, err := br.Exec() ; ... }
br.Close()

All queued items ship in one TCP write. The server executes them sequentially; an error in one item aborts the rest with ErrBatchAborted. SendBatch keeps the shared prepared-statement cache warm — the first occurrence of a given SQL triggers Parse + Describe + Bind + Execute, and subsequent items in the same batch (and future batches) only pay Bind + Execute.

func NewBatch

func NewBatch() *Batch

NewBatch allocates an empty batch. Reusing a batch across SendBatch calls is allowed — items are not cleared automatically.

func (*Batch) Len

func (b *Batch) Len() int

Len returns the number of queued items.

func (*Batch) Queue

func (b *Batch) Queue(sql string, args ...any)

Queue appends a statement + args to the batch.

func (*Batch) Reset

func (b *Batch) Reset()

Reset clears the batch so the underlying slice can be reused.

type BatchResults

type BatchResults struct {
	// contains filtered or unexported fields
}

BatchResults consumes the ordered response stream produced by SendBatch. Call Exec (or Query) once per queued item, in order; Close drains the stream and must be called before using the underlying Client for anything else.

func (*BatchResults) Close

func (br *BatchResults) Close() error

Close drains remaining items and the trailing RFQ, releases the ctx watcher, and reports the batch duration to the Observer.

func (*BatchResults) Exec

func (br *BatchResults) Exec() (ExecResult, error)

Exec consumes the next item as a DML statement and returns its row count. Returns ErrBatchAborted once a previous item has failed.

func (*BatchResults) Query

func (br *BatchResults) Query() (*Iterator, error)

Query consumes the next item as a row-producing statement and returns an Iterator. The caller must drive the iterator to completion before calling the next result method.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a single PostgreSQL connection specialised for SELECT-to-JSON. It is not safe for concurrent use; wrap it in a pool if you need that.

func Open

func Open(ctx context.Context, cfg Config) (*Client, error)

Open dials the server and completes the startup handshake. The full context (including any deadline) covers the dial, TLS negotiation, and the startup message exchange — not just the TCP connect.

func (*Client) Close

func (c *Client) Close() error

func (*Client) CopyFrom

func (c *Client) CopyFrom(ctx context.Context, sql string, r io.Reader) (int64, error)

CopyFrom streams bytes from r into the server using COPY ... FROM STDIN. The SQL statement must be a valid COPY statement, e.g.:

"COPY users (id, name, email) FROM STDIN (FORMAT csv)"
"COPY log FROM STDIN"                    // default: TEXT

r's bytes are chunked into CopyData messages. The format of those bytes (CSV, TEXT, binary header + tuples) must match whatever the COPY statement declared; this method is format-agnostic.

Returns the row count reported by the server on CommandComplete.

CopyFrom bypasses the prepared-statement cache entirely (COPY uses the simple-query protocol), so this call never affects SELECT / Exec hot paths. Observer.OnQueryStart/End fire exactly as for any other query.

func (*Client) CopyFromBinary

func (c *Client) CopyFromBinary(ctx context.Context, sql string,
	fieldCount int, emit func(w *CopyWriter) error) (int64, error)

CopyFromBinary runs COPY ... FROM STDIN (FORMAT binary) and drives the binary wire format directly. The caller emits rows through the CopyWriter handed to emit; return io.EOF from emit to signal clean end of stream. The statement must declare binary format, e.g.:

"COPY users (id, name, created_at) FROM STDIN (FORMAT binary)"

fieldCount is the number of columns each tuple carries; it is enforced on every row so a mismatched column count is caught client-side before the server sees a malformed tuple.

func (*Client) CopyTo

func (c *Client) CopyTo(ctx context.Context, sql string, w io.Writer) (int64, error)

CopyTo runs COPY ... TO STDOUT and pipes the server's output straight to w. The SQL must be a valid COPY statement, e.g.:

"COPY (SELECT * FROM users) TO STDOUT (FORMAT csv)"
"COPY logs TO STDOUT"                                 // default TEXT

CopyData bodies are written to w as they arrive — the bytes are aliased from the wire's internal buffer so the pump does not copy through an intermediate scratch. Returns the row count reported by the server on CommandComplete.

CopyTo uses the simple-query protocol and shares no state with the prepared-statement cache or the SELECT hot path.

func (*Client) CopyToBinary

func (c *Client) CopyToBinary(ctx context.Context, sql string, fieldCount int,
	handler func(r *CopyReader) error) (int64, error)

CopyToBinary runs COPY ... TO STDOUT (FORMAT binary) and delivers each tuple to handler through a CopyReader. The SQL must request binary format:

"COPY (SELECT id, name, created_at FROM users) TO STDOUT (FORMAT binary)"

fieldCount is the number of columns per tuple declared by the COPY statement. Enforced per row: a server tuple with a different width aborts the stream. handler may return io.EOF to stop early.

The CopyReader fields are read directly from the wire buffer with no per-field allocation; Text and Bytes return slices aliasing the message buffer so the handler must copy if it wants to keep them.

func (*Client) Exec

func (c *Client) Exec(ctx context.Context, sql string, args ...any) (ExecResult, error)

Exec executes a DML statement (INSERT, UPDATE, DELETE, CALL, etc.) that does not return rows. For statements with RETURNING clauses use ExecReturning instead.

The extended-query protocol is used so parameters are supported. The statement cache is shared with the SELECT path — PgBouncer-txn re-prepare works identically.

func (*Client) ExecReturning

func (c *Client) ExecReturning(ctx context.Context, w io.Writer, sql string, args ...any) error

ExecReturning executes a DML statement with a RETURNING clause and streams the returned rows as NDJSON to w. On the wire this is identical to a SELECT — the server sends RowDescription + DataRow* + CommandComplete. We reuse the full JSON streaming path, bypassing the SELECT-only guard.

func (*Client) ExecReturningJSON

func (c *Client) ExecReturningJSON(ctx context.Context, sql string, args ...any) ([]byte, error)

ExecReturningJSON is the buffered variant of ExecReturning. Returns the JSON output as []byte (NDJSON).

func (*Client) ParameterStatus

func (c *Client) ParameterStatus(name string) string

ParameterStatus returns the most recent value the server sent for the given runtime parameter (e.g. "server_version", "client_encoding").

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping issues an empty simple-query and waits for ReadyForQuery. It is a cheap liveness check used by the pool after long idle periods (helps catch connections silently killed by PgBouncer's server_idle_timeout, firewall NAT timeouts, etc.). The cost is one round-trip.

func (*Client) QueryJSON

func (c *Client) QueryJSON(ctx context.Context, sql string, args ...any) ([]byte, error)

QueryJSON runs sql and returns a JSON array of objects in a single allocated slice. For large result sets prefer StreamJSON / StreamNDJSON.

func (*Client) RawQuery

func (c *Client) RawQuery(ctx context.Context, sql string, args ...any) (*Iterator, error)

RawQuery runs sql and returns an Iterator for lazy row consumption. The Iterator pins the Client's connection until Close() is called; do not issue another query on the same Client in the interim.

The returned DataRow bodies alias the internal wire buffer and are valid only until the next NextRaw() call, matching the contract of internal/wire.Conn.ReadMessage.

RawQuery rejects non-SELECT statements. Use RawQueryAny when DML with RETURNING must be iterated — INSERT/UPDATE/DELETE … RETURNING ships DataRows exactly like SELECT. The database/sql adapter uses RawQueryAny so sql.DB.QueryRow("INSERT ... RETURNING id") works.

func (*Client) RawQueryAny

func (c *Client) RawQueryAny(ctx context.Context, sql string, args ...any) (*Iterator, error)

RawQueryAny is RawQuery without the SELECT-only guard. Accepts any statement shape that produces rows — typically SELECT or DML with RETURNING. Prefer RawQuery for the SELECT-only contract.

func (*Client) SendBatch

func (c *Client) SendBatch(ctx context.Context, b *Batch) *BatchResults

SendBatch pipelines every queued item in a single TCP write and returns the BatchResults iterator. The first occurrence of each unique SQL inside the batch is Parsed + Described and cached on the Client so the next batch (or plain Exec) skips the Parse.

func (*Client) SendCancel

func (c *Client) SendCancel(ctx context.Context) error

SendCancel opens a brand-new TCP connection to the same server and sends a CancelRequest carrying this connection's BackendKeyData. The server cancels any in-flight query on the original connection. The side connection is closed immediately. Errors are best-effort: if the cancel fails, the caller's context-cancellation path will still take effect via the deadline mechanism.

func (*Client) SetObserver

func (c *Client) SetObserver(o Observer)

SetObserver installs the telemetry hook. Pass nil to revert to no-op.

func (*Client) Stats

func (c *Client) Stats() CounterStats

Stats returns a snapshot of the per-Client counters.

func (*Client) StreamColumnar

func (c *Client) StreamColumnar(ctx context.Context, w io.Writer, sql string, args ...any) error

StreamColumnar streams the columnar form to w.

func (*Client) StreamJSON

func (c *Client) StreamJSON(ctx context.Context, w io.Writer, sql string, args ...any) error

StreamJSON streams a JSON array of objects to w.

func (*Client) StreamNDJSON

func (c *Client) StreamNDJSON(ctx context.Context, w io.Writer, sql string, args ...any) error

StreamNDJSON streams newline-delimited JSON to w.

func (*Client) StreamTOON

func (c *Client) StreamTOON(ctx context.Context, w io.Writer, sql string, args ...any) error

StreamTOON streams rows in TOON form to w:

[?]{col1,col2,...}
val1,val2,...
val3,val4,...

Scalars are bare, strings JSON-escaped (`"..."`), json/jsonb are emitted as raw JSON (parser must bracket-balance). Compared to ModeNDJSON the keys are dropped so every row pays only for values + commas + newline. For LLM / agent consumers the token savings are substantial; for browser consumers this is a binary-compat break from JSON — the caller must speak the format.

type Config

type Config struct {
	Host            string
	Port            int
	Database        string
	User            string
	Password        string
	ApplicationName string

	DialTimeout time.Duration
	// FlushBytes is the streaming buffer threshold. Default 32 KiB.
	FlushBytes int
	// RuntimeParams is sent during startup (e.g. "search_path", "timezone").
	RuntimeParams map[string]string

	// SSLMode controls TLS negotiation. Default: SSLPrefer.
	SSLMode SSLMode
	// StmtCacheSize bounds the per-connection prepared-statement cache.
	// Default 64. Set to a negative value to disable the cache (fresh
	// Parse on every call).
	StmtCacheSize int
	// TLSConfig overrides the TLS config we would otherwise build from
	// SSLMode + Host. Set this if you need custom CA pools, client certs,
	// SNI, ALPN, etc. Mutually exclusive with SSLMode in practice — if
	// non-nil, SSLMode only decides whether TLS is required vs preferred.
	TLSConfig *tls.Config

	// MaxResponseBytes aborts a query whose JSON output crosses this many
	// bytes. The driver issues a CancelRequest and returns
	// *ResponseTooLargeError. 0 = unlimited (default). Counted across
	// already-flushed bytes plus the in-memory tail, so the bound is on
	// the total response, not the per-flush chunk.
	MaxResponseBytes int64
	// MaxResponseRows aborts a query whose row count crosses this value.
	// Same abort semantics as MaxResponseBytes. 0 = unlimited (default).
	MaxResponseRows int

	// SlowQueryThreshold triggers Observer.OnQuerySlow in addition to
	// OnQueryEnd when a query's total duration meets or exceeds it.
	// 0 = disabled (default).
	SlowQueryThreshold time.Duration

	// FlushInterval bounds the time the streaming buffer may sit without
	// being flushed downstream. When > 0, the driver flushes the pending
	// buffer if this much time elapsed since the last flush, even if the
	// byte threshold was not reached. Useful for Citus queries whose
	// first-byte latency is high and whose consumers (HTTP responses,
	// SSE, NDJSON clients) need steady byte delivery. 0 = disabled.
	FlushInterval time.Duration

	// Keepalive, if > 0, enables TCP keepalive on dialed connections with
	// this interval. Default 30s. Idles behind PgBouncer / NAT die
	// silently without this on long-running sessions.
	Keepalive time.Duration

	// RetryOnSerialization, if true, transparently retries queries that
	// fail with SQLSTATE class 40 (serialization_failure, deadlock_detected,
	// transaction_rollback) when no bytes have been flushed downstream.
	// Max 3 attempts total, exponential backoff starting at 10ms. Default
	// false. Citus shard rebalance surfaces 40001 routinely; enable in
	// environments that run rebalance in production.
	RetryOnSerialization bool

	// DefaultQueryTimeout is a convenience default applied when a caller
	// passes a context with no deadline. The driver derives
	// context.WithTimeout(ctx, DefaultQueryTimeout) and the existing
	// cancel watcher turns an elapsed timeout into a real server-side
	// CancelRequest. 0 = no default (the caller's ctx is used as-is).
	//
	// This is a *sane default*, not a hard security boundary. The hard
	// ceiling belongs in postgresql.conf (statement_timeout). Layers:
	//
	//   1. postgresql.conf statement_timeout  — DBA-owned, final say.
	//   2. Config.DefaultQueryTimeout          — gateway default.
	//   3. ctx.WithTimeout in the HTTP handler — per-request override.
	DefaultQueryTimeout time.Duration

	// WireReadBufferSize is the size of the bufio.Reader fronting the
	// network connection. Messages whose total length (5-byte header +
	// body) fits inside this buffer take a zero-copy fast path via Peek;
	// larger messages fall back to a copy through an internal scratch
	// slice. Default 128 KiB, which covers essentially all DataRow /
	// RowDescription / auth messages. Raise only if you know your
	// workload returns cells larger than 128 KiB that would otherwise
	// trigger the copy fallback on every row. Minimum 4 KiB enforced.
	WireReadBufferSize int

	// TCPRecvBuffer, if > 0, sets SO_RCVBUF on the dialed TCP socket via
	// net.TCPConn.SetReadBuffer. On LAN with non-trivial RTT and wide
	// rows this is the single biggest wire-level knob: the default kernel
	// receive window saturates at a few hundred KB in practice, and the
	// server stalls on ACK. Typical useful values: 1–4 MiB. 0 leaves the
	// kernel default. Ignored for non-TCP transports.
	TCPRecvBuffer int
	// TCPSendBuffer is the matching SO_SNDBUF knob for outbound traffic.
	// Rarely useful for this driver (we send small Bind/Execute/Sync
	// bursts), but exposed for symmetry. 0 = kernel default.
	TCPSendBuffer int

	// BinaryOIDs lists PostgreSQL type OIDs for which the driver
	// should always request binary result format, beyond the
	// built-in set. Primary use is user-defined composite types:
	// their OID is assigned at CREATE TYPE time so pgz cannot
	// know about them statically. Look up once (SELECT oid FROM
	// pg_type WHERE typname = 'addr') and pass it here.
	//
	// When binary format is requested for an OID with no
	// specialised decoder, JSON output of that column falls back
	// to emitting the raw bytes as an opaque JSON string. The
	// struct-scan path recognises composite wire format
	// automatically when the target Go field is a struct.
	BinaryOIDs []uint32

	// RowsHint, if > 0, tells the driver roughly how many rows the query
	// will return. After the first DataRow is encoded the driver grows
	// the output buffer once to `firstRowBytes * RowsHint + slack`,
	// avoiding the repeated doubling-and-copy cost of append() on large
	// result sets. Buffered (QueryJSON) mode is where this pays off;
	// streaming mode caps growth at ~2× FlushBytes because anything
	// above the flush threshold just delays the flush. 0 = disabled.
	RowsHint int

	// BinaryNumeric asks the server for PostgreSQL NUMERIC columns in
	// binary wire format. Default false — text is faster on loopback
	// and common precisions because the server's C formatter beats the
	// Go binary decoder in the common case. Enable when one of:
	//
	//   - The link has non-trivial RTT (≥ 5 ms) and cells are wide.
	//   - Precision is high (numeric(30,10)+); binary halves the bytes.
	//   - A profile shows text-numeric parsing dominating CPU.
	//
	// Correctness is identical in both modes. NaN / ±Infinity are
	// rendered as JSON strings in either case.
	BinaryNumeric bool
}

Config is the connection configuration. Fields are intentionally minimal; add as needed.

func ParseDSN

func ParseDSN(dsn string) (Config, error)

ParseDSN parses a libpq-style URL: postgres://user:pass@host:port/dbname?param=val Only enough is supported to be useful for tests and the demo binary.

type CopyReader

type CopyReader struct {
	// contains filtered or unexported fields
}

CopyReader is the binary-tuple reader surface handed to CopyToBinary's handler callback. Field order must match the COPY column order. The slices returned by Text and Bytes alias the internal wire buffer and are valid only until the next handler invocation — the caller must copy to keep them.

func (*CopyReader) Bool

func (r *CopyReader) Bool() (bool, bool)

Bool reads a 1-byte PG bool.

func (*CopyReader) Bytes

func (r *CopyReader) Bytes() ([]byte, bool)

Bytes is an alias for Text for bytea-typed columns (identical wire format at this layer; the server already delivered raw bytes).

func (*CopyReader) Err

func (r *CopyReader) Err() error

Err returns any error that occurred during field reads. Handlers may check this after each row instead of every individual reader.

func (*CopyReader) Float4

func (r *CopyReader) Float4() (float32, bool)

Float4 reads a 4-byte IEEE-754 big-endian.

func (*CopyReader) Float8

func (r *CopyReader) Float8() (float64, bool)

Float8 reads an 8-byte IEEE-754 big-endian.

func (*CopyReader) Int2

func (r *CopyReader) Int2() (int16, bool)

Int2 reads a 2-byte int. NULL becomes 0 with the bool set to true.

func (*CopyReader) Int4

func (r *CopyReader) Int4() (int32, bool)

Int4 reads a 4-byte int.

func (*CopyReader) Int8

func (r *CopyReader) Int8() (int64, bool)

Int8 reads an 8-byte int.

func (*CopyReader) IsNull

func (r *CopyReader) IsNull() bool

IsNull peeks at the next field's length header without consuming the value bytes. Handy when the caller wants to branch before picking a typed reader.

func (*CopyReader) SkipNull

func (r *CopyReader) SkipNull() bool

SkipNull consumes a NULL field (-1 length). Returns true if the next field was NULL and has been consumed; false otherwise (field still pending). Convenient with Null() follow-up readers.

func (*CopyReader) Text

func (r *CopyReader) Text() ([]byte, bool)

Text returns a slice aliasing the wire buffer. Valid only until the next handler invocation — copy if you need to keep it.

func (*CopyReader) Timestamp

func (r *CopyReader) Timestamp() (int64, bool)

Timestamp returns the raw int64 microseconds since the PG epoch (2000-01-01 UTC). Use time.Time(...) conversion at the call site if a Go time is needed.

func (*CopyReader) TimestampTime

func (r *CopyReader) TimestampTime() (time.Time, bool)

TimestampTime returns a time.Time built from the Timestamp int64.

func (*CopyReader) UUID

func (r *CopyReader) UUID() ([16]byte, bool)

UUID reads a 16-byte UUID.

type CopyWriter

type CopyWriter struct {
	// contains filtered or unexported fields
}

CopyWriter is the binary-row builder for CopyFromBinary. Fields must match the column order in the COPY statement. All methods append into a single reusable buffer that is framed and flushed in-place; a finished row is never copied a second time between construction and the TCP write.

func (*CopyWriter) Bool

func (w *CopyWriter) Bool(v bool)

Bool writes a PG bool (1 byte, 0 or 1).

func (*CopyWriter) Bytes

func (w *CopyWriter) Bytes(b []byte)

Bytes writes a bytea column.

func (*CopyWriter) Float4

func (w *CopyWriter) Float4(v float32)

Float4 writes a float32 in IEEE-754 big-endian.

func (*CopyWriter) Float8

func (w *CopyWriter) Float8(v float64)

Float8 writes a float64 in IEEE-754 big-endian.

func (*CopyWriter) Int2

func (w *CopyWriter) Int2(v int16)

Int2 writes an int16 column.

func (*CopyWriter) Int4

func (w *CopyWriter) Int4(v int32)

Int4 writes an int32 column.

func (*CopyWriter) Int8

func (w *CopyWriter) Int8(v int64)

Int8 writes an int64 column.

func (*CopyWriter) Null

func (w *CopyWriter) Null()

Null writes a SQL NULL placeholder (-1 length).

func (*CopyWriter) Text

func (w *CopyWriter) Text(s string)

Text writes a UTF-8 string (PG text, varchar, bpchar).

func (*CopyWriter) Timestamp

func (w *CopyWriter) Timestamp(t time.Time)

Timestamp writes a PG timestamp (microseconds since 2000-01-01 UTC).

func (*CopyWriter) TimestampTZ

func (w *CopyWriter) TimestampTZ(t time.Time)

TimestampTZ writes a PG timestamptz. Wire format is identical to timestamp; PG stores UTC microseconds since 2000-01-01.

func (*CopyWriter) UUID

func (w *CopyWriter) UUID(u [16]byte)

UUID writes a uuid as its 16 binary bytes.

type CounterStats

type CounterStats struct {
	Queries uint64
	Errors  uint64
	Rows    uint64
	Bytes   uint64
}

CounterStats is a tiny lock-free counter snapshot exposed by Stats(). Useful when you don't want to wire a full Observer just to plot QPS.

type ExecResult

type ExecResult struct {
	// RowsAffected is the number of rows inserted, updated, or deleted.
	// For CALL / DDL it is 0.
	RowsAffected int64
	// Tag is the raw CommandComplete tag, e.g. "INSERT 0 5", "DELETE 3".
	Tag string
}

ExecResult holds the outcome of a non-SELECT statement.

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

Iterator streams DataRows one at a time from an in-flight SELECT.

Lifecycle: obtain via (*Client).RawQuery, then call NextRaw() until it returns io.EOF or an error. Call Close() to drain any unread rows and return the underlying connection to a usable state — the Client cannot run another query until Close is called, so use defer.

This is the foundation for the database/sql adapter (driver.Rows expects row-at-a-time semantics) and for callers that want to weave pgz decoding into their own iteration loop without buffering the full result.

func (*Iterator) Close

func (it *Iterator) Close() error

Close drains any unread rows and returns the underlying connection to a usable state. Safe to call multiple times and after an error. Callers should always defer Close.

When Close is called mid-stream (the caller stopped reading before io.EOF) we issue a CancelRequest to stop the backend from producing more rows. The server then emits ErrorResponse 57014 (canceling_statement_due_to_user_request) followed by ReadyForQuery. That specific SQLSTATE is a direct consequence of our own cancel and is not surfaced to the caller — Close returns nil in that case.

func (*Iterator) NextRaw

func (it *Iterator) NextRaw() ([]byte, error)

NextRaw returns the body of the next DataRow, or io.EOF when the result set is exhausted. The returned slice aliases the wire buffer and is invalidated by the next NextRaw / Close call.

func (*Iterator) Plan

func (it *Iterator) Plan() *rows.Plan

Plan returns the compiled row plan for the active result set. Valid once at least one NextRaw call has returned (or for queries that ship RowDescription pre-Bind, immediately).

type Mode

type Mode int

Mode controls the JSON shape produced by the streaming engine.

const (
	ModeArray    Mode = iota // [{...},{...}]
	ModeNDJSON               // {...}\n{...}\n
	ModeColumnar             // {"columns":[...],"rows":[[...],[...]]}
	ModeTOON                 // [?]{col1,col2}\nval1,val2\nval3,val4\n
)

type Observer

type Observer interface {
	OnQueryStart(sql string)
	OnQueryEnd(ev QueryEvent)
	OnNotice(n *pgerr.Error)
	OnQuerySlow(ev QueryEvent)
}

Observer is a hook for telemetry. Implementations are called on the hot path; keep them allocation-light. Setting an Observer is optional — the default no-op observer adds zero overhead.

Implementations MUST be safe for concurrent use even if the Client itself is not (multiple Clients may share one Observer through a Pool).

OnNotice is invoked for every server-sent NoticeResponse. Citus surfaces important diagnostics from worker nodes this way (shard rebalance progress, plan warnings, deprecation notices). The *pgerr.Error argument aliases the wire buffer; implementations that keep it past return MUST clone the fields they need.

OnQuerySlow is invoked from OnQueryEnd when duration exceeds Config.SlowQueryThreshold (if > 0). It is additive to OnQueryEnd; both fire for the same query. Useful for a dedicated slow-path alerting channel without having to filter every OnQueryEnd event.

type PGError

type PGError = pgerr.Error

PGError is the structured error returned by the server as an ErrorResponse. Use errors.As to extract it from the error returned by any pgz call:

var pgErr *pgz.PGError
if errors.As(err, &pgErr) && pgErr.IsUniqueViolation() { ... }

The Fields map holds every raw field the server sent, keyed by the single-byte field code from the wire format ('C'=Code, 'M'=Message, 'D'=Detail, 'H'=Hint, 'P'=Position, 's'=Schema, 't'=Table, 'n'=Constraint, etc.). The convenience accessors (SQLState, helper predicates) cover the common cases without touching Fields.

type QueryEvent

type QueryEvent struct {
	SQL      string
	Rows     int
	Bytes    int
	Duration time.Duration
	Err      error
	SQLState string
	Retries  int
}

QueryEvent is the per-query summary delivered to OnQueryEnd. SQLState is "" on success, otherwise the SQLSTATE string returned by the server (e.g. "26000", "57014"). Retries counts transparent retries performed by the driver (SQLSTATE 26000 re-prepare, SQLSTATE 40xxx retryable class).

type RangeBytes

type RangeBytes struct {
	Flags          uint8
	Lower          []byte // nil if LowerInfinite or Empty
	Upper          []byte // nil if UpperInfinite or Empty
	Empty          bool
	LowerInclusive bool
	UpperInclusive bool
	LowerInfinite  bool
	UpperInfinite  bool
}

--- Range type support -----------------------------------------------

RangeBytes captures a PostgreSQL range value in its raw wire form: the two bound bytes plus the flags that say whether each bound is inclusive, exclusive, or infinite (open on that side). The inner bound bytes arrive in the same binary encoding as the element type — int4 for int4range, int8 for int8range, timestamp for tsrange, etc. Decoding them further is the caller's job because the shape varies per range kind; `pgz.DriverValueDecoder(oid, 1)(raw)` on each bound field is the usual path.

Wire layout, reference: src/backend/utils/adt/rangetypes.c (range_send / range_recv):

uint8 flags
if !empty && !lower_inf: int32 lower_len, bytes
if !empty && !upper_inf: int32 upper_len, bytes

Flag bits:

0x01 empty
0x02 lower-bound inclusive
0x04 upper-bound inclusive
0x08 lower bound is -infinity (no bytes follow for it)
0x10 upper bound is +infinity

For callers that want a typed bound (time.Time, int64, etc.) an idiomatic sql.Scanner on a user-defined range type also works: pgz's scanner dispatch passes the full raw wire body when the OID is not in the canonical set.

type ResponseTooLargeError

type ResponseTooLargeError struct {
	Limit     string // "bytes" or "rows"
	LimitVal  int64
	Observed  int64
	Committed bool
}

ResponseTooLargeError is the typed error returned when a query is aborted because Config.MaxResponseBytes or Config.MaxResponseRows was crossed.

Committed reports whether any bytes had already been flushed to the user's downstream io.Writer when the cap tripped. If true, the partial JSON downstream is malformed (truncated mid-array / mid-NDJSON line) and the caller is responsible for terminating its own output channel — the same contract that applies to mid-stream Citus worker errors.

func (*ResponseTooLargeError) Error

func (e *ResponseTooLargeError) Error() string

func (*ResponseTooLargeError) Unwrap

func (e *ResponseTooLargeError) Unwrap() error

type SSLMode

type SSLMode int

SSLMode controls how the client negotiates TLS during connection.

const (
	SSLDisable    SSLMode = iota // never use TLS
	SSLPrefer                    // try TLS, fall back to plaintext if server refuses
	SSLRequire                   // demand TLS, no fallback; skip cert verification
	SSLVerifyCA                  // require TLS + valid cert chain (no hostname check)
	SSLVerifyFull                // require TLS + cert chain + hostname match
)

Directories

Path Synopsis
Package otel provides an OpenTelemetry-backed pgz.Observer.
Package otel provides an OpenTelemetry-backed pgz.Observer.
Package pool is a small, dependency-free connection pool for *pgz.Client.
Package pool is a small, dependency-free connection pool for *pgz.Client.
Package stdlib registers pgz as a database/sql driver named "pgz".
Package stdlib registers pgz as a database/sql driver named "pgz".

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL