pgkit

package module
v2.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: MIT Imports: 24 Imported by: 0

README

pgkit

Go Reference Go Report Card MIT License

lightweight PostgreSQL sugar combining the use of..

  • github.com/jackc/pgx
  • github.com/Masterminds/squirrel
  • github.com/georgysavva/scany/v2
  • plus, code & ideas from github.com/upper/db

Usage

go get github.com/goware/pgkit/v2

Please see ./tests/pgkit_test.go

LICENSE

MIT

Documentation

Index

Constants

View Source
const (
	// DefaultPageSize is the default number of rows per page.
	DefaultPageSize = 10
	// MaxPageSize is the maximum number of rows per page.
	MaxPageSize = 50
)

Variables

View Source
var (
	// ErrInvalidCursor signals a client-supplied cursor that failed to decode - map to 400, not 500.
	ErrInvalidCursor = errors.New("invalid cursor")
	// ErrCursorQueryOrdered signals a cursor-paginated query that already had ORDER BY.
	ErrCursorQueryOrdered = errors.New("cursor query already has order by")
	// ErrCursorPageOrdered signals page-level ordering that does not match the cursor order.
	ErrCursorPageOrdered = errors.New("cursor page order does not match cursor order")
)
View Source
var (
	ErrExpectingPointerToEitherMapOrStruct = fmt.Errorf(`expecting a pointer to either a map or a struct`)
)
View Source
var ErrNoRows = pgx.ErrNoRows
View Source
var Mapper = reflectx.NewMapper(dbTagName)

Functions

func ConnectWithStdlib

func ConnectWithStdlib(appName string, cfg Config) (*sql.DB, error)

func DecodeCursor added in v2.8.0

func DecodeCursor[C any](value string) (*C, error)

DecodeCursor returns (nil, nil) for empty input so callers can compose with a nil-check.

func EncodeCursor added in v2.8.0

func EncodeCursor[C any](cursor C) (string, error)

EncodeCursor produces an opaque cursor: base64-JSON, not signed, never use it for authorization.

func Map

func Map(record interface{}) ([]string, []interface{}, error)

Map converts a struct to (column, value) slices using `db:""` struct tags.

,omitempty and ,omitzero (mutually exclusive) both skip zero values, but ,omitzero keeps non-nil empty slices/maps so a clear-to-empty UPDATE actually clears the column. Matches encoding/json's omitzero (Go 1.24+). IncludeNil surfaces nil pointers as DEFAULT under ,omitempty and as NULL under ,omitzero.

func MapWithOptions

func MapWithOptions(record interface{}, options *MapOptions) ([]string, []interface{}, error)

func NoTx

func NoTx(ctx context.Context) context.Context

func TxFromContext

func TxFromContext(ctx context.Context) pgx.Tx

func WithTx

func WithTx(ctx context.Context, tx pgx.Tx) context.Context

Types

type Config

type Config struct {
	Database        string `toml:"database"`
	Host            string `toml:"host"`
	Username        string `toml:"username"`
	Password        string `toml:"password"`
	MaxConns        int32  `toml:"max_conns"`
	MinConns        int32  `toml:"min_conns"`
	ConnMaxLifetime string `toml:"conn_max_lifetime"`  // ie. "3600s" or "1h"
	ConnMaxIdleTime string `toml:"conn_max_idle_time"` // ie. "1800s" or "30m"

	Override func(cfg *pgx.ConnConfig) `toml:"-"`
	Tracer   pgx.QueryTracer
}

type Cursor added in v2.8.0

type Cursor[Self any, Row any] interface {
	PtrTo[Self]
	Apply(sq.SelectBuilder) sq.SelectBuilder
	From(Row) error
	// OrderBy must match Apply and should include a unique tiebreaker.
	OrderBy() []Sort
}

Cursor is the interface a typed keyset cursor satisfies — mirrors pgkit.Record[T, I]'s self-pointer pattern.

type CursorPaginator added in v2.8.0

type CursorPaginator[T any, C any, PC Cursor[C, T]] struct {
	// contains filtered or unexported fields
}

CursorPaginator is the keyset sibling of Paginator[T] for ordering-stable pagination under concurrent writes.

func NewCursorPaginator added in v2.8.0

func NewCursorPaginator[T any, C any, PC Cursor[C, T]](options ...PaginatorOption) CursorPaginator[T, C, PC]

NewCursorPaginator honors only size options - the cursor owns ORDER BY.

func (CursorPaginator[T, C, PC]) Paginate added in v2.8.0

func (p CursorPaginator[T, C, PC]) Paginate(ctx context.Context, query *Querier, q sq.SelectBuilder, page *Page) ([]T, *Page, error)

Paginate returns cursor-paginated rows and the page populated with More and NextCursor.

func (CursorPaginator[T, C, PC]) PrepareQuery added in v2.8.0

func (p CursorPaginator[T, C, PC]) PrepareQuery(q sq.SelectBuilder, page *Page) ([]T, sq.SelectBuilder, error)

PrepareQuery chains LIMIT n+1 so PrepareResult can detect a next page without a second round-trip.

func (CursorPaginator[T, C, PC]) PrepareResult added in v2.8.0

func (p CursorPaginator[T, C, PC]) PrepareResult(result []T, page *Page) ([]T, error)

PrepareResult must be called after GetAll to populate page.More and page.NextCursor.

type DB

type DB struct {
	Conn  *pgxpool.Pool
	SQL   *StatementBuilder
	Query *Querier
}

func Connect

func Connect(appName string, cfg Config) (*DB, error)

func ConnectWithPGX

func ConnectWithPGX(appName string, pgxConfig *pgxpool.Config) (*DB, error)

func (*DB) InTx added in v2.7.0

func (d *DB) InTx(tx pgx.Tx) *DB

InTx returns a new *DB that shares Conn and SQL with d but routes queries through tx. Use it when a function takes *DB and you want it to participate in a transaction the caller controls.

func (*DB) TxQuery

func (d *DB) TxQuery(tx pgx.Tx) *Querier

TxQuery returns a new Querier that uses the given pgx.Tx

func (*DB) TxQueryFromContext

func (d *DB) TxQueryFromContext(ctx context.Context) *Querier

TxQueryFromContext returns a new Querier that uses the pgx.Tx in the given context

type DefaultValuesBuilder added in v2.8.0

type DefaultValuesBuilder struct {
	// contains filtered or unexported fields
}

DefaultValuesBuilder is the sq.Sqlizer returned by InsertDefaults.

func (DefaultValuesBuilder) Err added in v2.8.0

func (b DefaultValuesBuilder) Err() error

func (DefaultValuesBuilder) Suffix added in v2.8.0

Suffix appends literal SQL; no placeholder rewriting, use sq.Expr for that.

func (DefaultValuesBuilder) ToSql added in v2.8.0

func (b DefaultValuesBuilder) ToSql() (string, []any, error)

type HasSetCreatedAt

type HasSetCreatedAt interface {
	SetCreatedAt(time.Time)
}

HasSetCreatedAt is implemented by records that track creation time. Insert will automatically call SetCreatedAt with the current UTC time.

type HasSetDeletedAt

type HasSetDeletedAt interface {
	SetDeletedAt(time.Time)
}

HasSetDeletedAt is implemented by records that support soft delete. DeleteByID will call SetDeletedAt with the current UTC time to soft-delete, and RestoreByID will call SetDeletedAt with a zero time.Time{} to restore.

Implementations should treat a zero time as a restore (clear the timestamp):

func (r *MyRecord) SetDeletedAt(t time.Time) {
	if t.IsZero() {
		r.DeletedAt = nil // restore: clear the timestamp
		return
	}
	r.DeletedAt = &t // soft delete: set the timestamp
}

type HasSetUpdatedAt

type HasSetUpdatedAt interface {
	SetUpdatedAt(time.Time)
}

HasSetUpdatedAt is implemented by records that track update time. Insert, Update, and Save will automatically call SetUpdatedAt with the current UTC time.

type ID

type ID comparable

ID is a comparable type used for record IDs.

type InsertBuilder

type InsertBuilder struct {
	sq.InsertBuilder
	// contains filtered or unexported fields
}

func (InsertBuilder) Err

func (b InsertBuilder) Err() error

type MapOptions

type MapOptions struct {
	IncludeZeroed bool
	IncludeNil    bool
}

type Order

type Order string
const (
	Desc Order = "DESC"
	Asc  Order = "ASC"
)

type Page

type Page struct {
	Size   uint32
	Page   uint32
	More   bool
	Column string
	Sort   []Sort

	// Unused by the offset Paginator — shared here so callers can swap paginators without changing the Page type.
	Cursor     string
	NextCursor string
}

func NewPage

func NewPage(size, page uint32, sort ...Sort) *Page

func (*Page) GetOrder

func (p *Page) GetOrder(columnFunc func(string) string, defaultSort ...string) []Sort

func (*Page) Limit

func (p *Page) Limit() uint64

func (*Page) Offset

func (p *Page) Offset() uint64

func (*Page) SetDefaults

func (p *Page) SetDefaults(o *PaginatorSettings)

type Paginator

type Paginator[T any] struct {
	// contains filtered or unexported fields
}

Paginator is a helper to paginate results.

func NewPaginator

func NewPaginator[T any](options ...PaginatorOption) Paginator[T]

NewPaginator creates a new paginator with the given options. If MaxSize is less than DefaultSize, MaxSize is set to DefaultSize.

func (Paginator[T]) Paginate added in v2.8.0

func (p Paginator[T]) Paginate(ctx context.Context, query *Querier, q sq.SelectBuilder, page *Page) ([]T, *Page, error)

Paginate returns offset-paginated rows and the page populated with More.

func (Paginator[T]) PrepareQuery

func (p Paginator[T]) PrepareQuery(q sq.SelectBuilder, page *Page) ([]T, sq.SelectBuilder)

PrepareQuery adds pagination to the query. It sets the number of max rows to limit+1.

func (Paginator[T]) PrepareRaw

func (p Paginator[T]) PrepareRaw(q string, args []any, page *Page) ([]T, string, []any)

func (Paginator[T]) PrepareResult

func (p Paginator[T]) PrepareResult(result []T, page *Page) []T

PrepareResult prepares the paginated result. If the number of rows is n+1: - it removes the last element, returning n elements - it sets more to true in the page object

type PaginatorOption

type PaginatorOption func(*PaginatorSettings)

func WithColumnFunc

func WithColumnFunc(f func(string) string) PaginatorOption

WithColumnFunc sets a function to transform column names.

func WithDefaultSize

func WithDefaultSize(size uint32) PaginatorOption

WithDefaultSize sets the default page size.

func WithMaxSize

func WithMaxSize(size uint32) PaginatorOption

WithMaxSize sets the maximum page size.

func WithSort

func WithSort(sort ...string) PaginatorOption

WithSort sets the default sort order.

type PaginatorSettings

type PaginatorSettings struct {
	// DefaultSize is the default number of rows per page.
	// If zero, DefaultPageSize is used.
	DefaultSize uint32

	// MaxSize is the maximum number of rows per page.
	// If zero, MaxPageSize is used. If less than DefaultSize, it is set to DefaultSize.
	MaxSize uint32

	// Sort is the default sort order.
	Sort []string

	// ColumnFunc is a transformation applied to  column names.
	ColumnFunc func(string) string
}

PaginatorSettings are the settings for the paginator.

type PtrTo added in v2.8.0

type PtrTo[T any] interface {
	*T
}

PtrTo constrains a type parameter to be a pointer to T.

type Querier

type Querier struct {
	Tx   pgx.Tx
	Scan *pgxscan.API
	SQL  *StatementBuilder
	// contains filtered or unexported fields
}

func (*Querier) BatchExec

func (q *Querier) BatchExec(ctx context.Context, queries Queries) ([]pgconn.CommandTag, error)

func (*Querier) BatchQuery

func (q *Querier) BatchQuery(ctx context.Context, queries Queries) (pgx.BatchResults, int, error)

func (*Querier) Exec

func (q *Querier) Exec(ctx context.Context, query Sqlizer) (pgconn.CommandTag, error)

func (*Querier) GetAll

func (q *Querier) GetAll(ctx context.Context, query Sqlizer, dest interface{}) error

func (*Querier) GetOne

func (q *Querier) GetOne(ctx context.Context, query Sqlizer, dest interface{}) error

func (*Querier) QueryRow

func (q *Querier) QueryRow(ctx context.Context, query Sqlizer) pgx.Row

func (*Querier) QueryRows

func (q *Querier) QueryRows(ctx context.Context, query Sqlizer) (pgx.Rows, error)

type Queries

type Queries []Query

func (*Queries) Add

func (q *Queries) Add(query Sqlizer)

func (Queries) Len

func (q Queries) Len() int

type Query

type Query Sqlizer

type RawSQL

type RawSQL struct {
	Query string
	Args  []interface{}
	// contains filtered or unexported fields
}

RawSQL allows you to build queries by hand easily. Note, it will auto-replace `?“ placeholders to postgres $X format. As well, if you run the same query over and over, consider to use `RawQuery(..)` instead, as it's a cached version of RawSQL.

func (RawSQL) Err

func (r RawSQL) Err() error

func (RawSQL) Prepare

func (r RawSQL) Prepare(query string) (string, int, error)

func (RawSQL) ToSql

func (r RawSQL) ToSql() (string, []interface{}, error)

type RawStatement

type RawStatement struct {
	// contains filtered or unexported fields
}

RawStatement allows you to build query statements by hand, where the query will remain the same but the arguments can change. The number of arguments must always be the same.

func RawQuery

func RawQuery(query string) RawStatement

func RawQueryf

func RawQueryf(queryFormat string, a ...interface{}) RawStatement

func (RawStatement) Build

func (r RawStatement) Build(args ...interface{}) Sqlizer

func (RawStatement) Err

func (r RawStatement) Err() error

func (RawStatement) GetQuery

func (r RawStatement) GetQuery() string

func (RawStatement) NumArgs

func (r RawStatement) NumArgs() int

type Record

type Record[T any, I ID] interface {
	PtrTo[T]
	GetID() I
	Validate() error
}

Records must be a pointer with the methods defined on the pointer.

type Sort

type Sort struct {
	Column string
	Order  Order
}

func NewSort

func NewSort(s string) (Sort, bool)

func (Sort) String

func (s Sort) String() string

type Sqlizer

type Sqlizer interface {
	// ToSql converts a runtime builder structure to an executable SQL query, returns:
	// query string, query values, and optional error
	ToSql() (string, []interface{}, error)
}

type StatementBuilder

type StatementBuilder struct {
	sq.StatementBuilderType
}

func (StatementBuilder) InsertDefaults added in v2.8.0

func (s StatementBuilder) InsertDefaults(table string) DefaultValuesBuilder

InsertDefaults builds INSERT INTO <table> DEFAULT VALUES; table must be non-empty.

func (*StatementBuilder) InsertRecord

func (s *StatementBuilder) InsertRecord(record interface{}, optTableName ...string) InsertBuilder

func (StatementBuilder) InsertRecords

func (s StatementBuilder) InsertRecords(recordsSlice interface{}, optTableName ...string) InsertBuilder

InsertRecords builds a multi-row INSERT from a slice of records, unioning columns across rows and emitting DEFAULT for any slot a given row skipped.

func (StatementBuilder) UpdateRecord

func (s StatementBuilder) UpdateRecord(record interface{}, whereExpr sq.Eq, optTableName ...string) UpdateBuilder

func (StatementBuilder) UpdateRecordColumns

func (s StatementBuilder) UpdateRecordColumns(record interface{}, whereExpr sq.Eq, filterCols []string, optTableName ...string) UpdateBuilder

type Table

type Table[T any, P Record[T, I], I ID] struct {
	*DB
	Name      string
	IDColumn  string
	Paginator Paginator[P]
}

Table provides basic CRUD operations for database records. NOTICE: Experimental. Table and its methods are subject to change.

func (*Table[T, P, I]) Count

func (t *Table[T, P, I]) Count(ctx context.Context, where sq.Sqlizer) (uint64, error)

Count returns the number of matching records.

func (*Table[T, P, I]) DeleteByID

func (t *Table[T, P, I]) DeleteByID(ctx context.Context, id I) (bool, error)

DeleteByID deletes a record by ID. Uses soft delete if .SetDeletedAt() method exists. Returns (true, nil) if a row was deleted, (false, nil) if no row matched.

func (*Table[T, P, I]) Get

func (t *Table[T, P, I]) Get(ctx context.Context, where sq.Sqlizer, orderBy []string) (P, error)

Get returns the first record matching the condition.

func (*Table[T, P, I]) GetByID

func (t *Table[T, P, I]) GetByID(ctx context.Context, id I) (P, error)

GetByID returns a record by its ID.

func (*Table[T, P, I]) HardDeleteByID

func (t *Table[T, P, I]) HardDeleteByID(ctx context.Context, id I) (bool, error)

HardDeleteByID permanently deletes a record by ID. Returns (true, nil) if a row was deleted, (false, nil) if no row matched.

func (*Table[T, P, I]) Insert

func (t *Table[T, P, I]) Insert(ctx context.Context, records ...P) error

Insert inserts one or more records. Sets CreatedAt and UpdatedAt timestamps if available. Records are returned with their generated fields populated via RETURNING *.

func (*Table[T, P, I]) Iter

func (t *Table[T, P, I]) Iter(ctx context.Context, where sq.Sqlizer, orderBy []string) (iter.Seq2[P, error], error)

Iter returns an iterator for records matching the condition.

func (*Table[T, P, I]) List

func (t *Table[T, P, I]) List(ctx context.Context, where sq.Sqlizer, orderBy []string) ([]P, error)

List returns all records matching the condition.

func (*Table[T, P, I]) ListByIDs

func (t *Table[T, P, I]) ListByIDs(ctx context.Context, ids []I) ([]P, error)

ListByIDs returns records by their IDs.

func (*Table[T, P, I]) ListPaged

func (t *Table[T, P, I]) ListPaged(ctx context.Context, where sq.Sqlizer, page *Page) ([]P, *Page, error)

ListPaged returns paginated records matching the condition.

func (*Table[T, P, I]) LockForUpdate

func (t *Table[T, P, I]) LockForUpdate(ctx context.Context, where sq.Sqlizer, orderBy []string, updateFn func(record P)) error

LockForUpdate locks and updates one record using PostgreSQL's FOR UPDATE SKIP LOCKED pattern within a database transaction for safe concurrent processing. The record is processed exactly once across multiple workers. The record is automatically updated after updateFn() completes.

Keep updateFn() fast to avoid holding the transaction. For long-running work, update status to "processing" and return early, then process asynchronously. Use defer LockForUpdate() to update status to "completed" or "failed".

Returns ErrNoRows if no matching records are available for locking.

func (*Table[T, P, I]) LockForUpdates

func (t *Table[T, P, I]) LockForUpdates(ctx context.Context, where sq.Sqlizer, orderBy []string, limit uint64, updateFn func(records []P)) error

LockForUpdates locks and updates records using PostgreSQL's FOR UPDATE SKIP LOCKED pattern within a database transaction for safe concurrent processing. Each record is processed exactly once across multiple workers. Records are automatically updated after updateFn() completes.

Keep updateFn() fast to avoid holding the transaction. For long-running work, update status to "processing" and return early, then process asynchronously. Use defer LockForUpdate() to update status to "completed" or "failed".

func (*Table[T, P, I]) RestoreByID

func (t *Table[T, P, I]) RestoreByID(ctx context.Context, id I) error

RestoreByID restores a soft-deleted record by ID by clearing its DeletedAt timestamp. Returns an error if the record does not implement .SetDeletedAt().

func (*Table[T, P, I]) Save

func (t *Table[T, P, I]) Save(ctx context.Context, records ...P) error

Save inserts or updates given records. Auto-detects insert vs update by ID based on zerovalue of ID from GetID() method on record.

func (*Table[T, P, I]) Update

func (t *Table[T, P, I]) Update(ctx context.Context, records ...P) (bool, error)

Update updates one or more records by their ID. Sets UpdatedAt timestamp if available. Returns (true, nil) if at least one row was updated, (false, nil) if no rows matched.

func (*Table[T, P, I]) WithPaginator

func (t *Table[T, P, I]) WithPaginator(opts ...PaginatorOption) *Table[T, P, I]

WithPaginator returns a table instance with the given paginator.

func (*Table[T, P, I]) WithTx

func (t *Table[T, P, I]) WithTx(tx pgx.Tx) *Table[T, P, I]

WithTx returns a table instance bound to the given transaction.

type UpdateBuilder

type UpdateBuilder struct {
	sq.UpdateBuilder
	// contains filtered or unexported fields
}

func (UpdateBuilder) Err

func (b UpdateBuilder) Err() error

Directories

Path Synopsis
internal
reflectx
Package reflectx implements extensions to the standard reflect lib suitable for implementing marshalling and unmarshalling packages.
Package reflectx implements extensions to the standard reflect lib suitable for implementing marshalling and unmarshalling packages.
Package pgerr provides PostgreSQL error inspection helpers built on pgconn.PgError.
Package pgerr provides PostgreSQL error inspection helpers built on pgconn.PgError.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL