pgkit

package module
v2.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: MIT Imports: 20 Imported by: 0

README

pgkit

Go Reference Go Report Card MIT License

lightweight PostgreSQL sugar combining the use of..

  • github.com/jackc/pgx
  • github.com/Masterminds/squirrel
  • github.com/georgysavva/scany/v2
  • plus, code & ideas from github.com/upper/db

Usage

go get github.com/goware/pgkit/v2

Please see ./tests/pgkit_test.go

LICENSE

MIT

Documentation

Index

Constants

View Source
const (
	// DefaultPageSize is the default number of rows per page.
	DefaultPageSize = 10
	// MaxPageSize is the maximum number of rows per page.
	MaxPageSize = 50
)

Variables

View Source
var (
	ErrExpectingPointerToEitherMapOrStruct = fmt.Errorf(`expecting a pointer to either a map or a struct`)
)
View Source
var ErrNoRows = pgx.ErrNoRows
View Source
var Mapper = reflectx.NewMapper(dbTagName)

Functions

func ConnectWithStdlib

func ConnectWithStdlib(appName string, cfg Config) (*sql.DB, error)

func Map

func Map(record interface{}) ([]string, []interface{}, error)

Map converts a struct object (aka record) to a mapping of column names and values which can be directly passed to a query executor. This allows you to use structs/objects to build easy insert/update queries without having to specify the column names manually. The mapper works by reading the column names from a struct fields `db:""` struct tag. If you specify `,omitempty` as a tag option, then it will omit the column from the list, which allows the database to take over and use its default value.

func MapWithOptions

func MapWithOptions(record interface{}, options *MapOptions) ([]string, []interface{}, error)

func NoTx

func NoTx(ctx context.Context) context.Context

func TxFromContext

func TxFromContext(ctx context.Context) pgx.Tx

func WithTx

func WithTx(ctx context.Context, tx pgx.Tx) context.Context

Types

type Config

type Config struct {
	Database        string `toml:"database"`
	Host            string `toml:"host"`
	Username        string `toml:"username"`
	Password        string `toml:"password"`
	MaxConns        int32  `toml:"max_conns"`
	MinConns        int32  `toml:"min_conns"`
	ConnMaxLifetime string `toml:"conn_max_lifetime"`  // ie. "3600s" or "1h"
	ConnMaxIdleTime string `toml:"conn_max_idle_time"` // ie. "1800s" or "30m"

	Override func(cfg *pgx.ConnConfig) `toml:"-"`
	Tracer   pgx.QueryTracer
}

type DB

type DB struct {
	Conn  *pgxpool.Pool
	SQL   *StatementBuilder
	Query *Querier
}

func Connect

func Connect(appName string, cfg Config) (*DB, error)

func ConnectWithPGX

func ConnectWithPGX(appName string, pgxConfig *pgxpool.Config) (*DB, error)

func (*DB) TxQuery

func (d *DB) TxQuery(tx pgx.Tx) *Querier

TxQuery returns a new Querier that uses the given pgx.Tx

func (*DB) TxQueryFromContext

func (d *DB) TxQueryFromContext(ctx context.Context) *Querier

TxQueryFromContext returns a new Querier that uses the pgx.Tx in the given context

type HasSetCreatedAt

type HasSetCreatedAt interface {
	SetCreatedAt(time.Time)
}

HasSetCreatedAt is implemented by records that track creation time. Insert will automatically call SetCreatedAt with the current UTC time.

type HasSetDeletedAt

type HasSetDeletedAt interface {
	SetDeletedAt(time.Time)
}

HasSetDeletedAt is implemented by records that support soft delete. DeleteByID will call SetDeletedAt with the current UTC time to soft-delete, and RestoreByID will call SetDeletedAt with a zero time.Time{} to restore.

Implementations should treat a zero time as a restore (clear the timestamp):

func (r *MyRecord) SetDeletedAt(t time.Time) {
	if t.IsZero() {
		r.DeletedAt = nil // restore: clear the timestamp
		return
	}
	r.DeletedAt = &t // soft delete: set the timestamp
}

type HasSetUpdatedAt

type HasSetUpdatedAt interface {
	SetUpdatedAt(time.Time)
}

HasSetUpdatedAt is implemented by records that track update time. Insert, Update, and Save will automatically call SetUpdatedAt with the current UTC time.

type ID

type ID comparable

ID is a comparable type used for record IDs.

type InsertBuilder

type InsertBuilder struct {
	sq.InsertBuilder
	// contains filtered or unexported fields
}

func (InsertBuilder) Err

func (b InsertBuilder) Err() error

type MapOptions

type MapOptions struct {
	IncludeZeroed bool
	IncludeNil    bool
}

type Order

type Order string
const (
	Desc Order = "DESC"
	Asc  Order = "ASC"
)

type Page

type Page struct {
	Size   uint32
	Page   uint32
	More   bool
	Column string
	Sort   []Sort
}

func NewPage

func NewPage(size, page uint32, sort ...Sort) *Page

func (*Page) GetOrder

func (p *Page) GetOrder(columnFunc func(string) string, defaultSort ...string) []Sort

func (*Page) Limit

func (p *Page) Limit() uint64

func (*Page) Offset

func (p *Page) Offset() uint64

func (*Page) SetDefaults

func (p *Page) SetDefaults(o *PaginatorSettings)

type Paginator

type Paginator[T any] struct {
	// contains filtered or unexported fields
}

Paginator is a helper to paginate results.

func NewPaginator

func NewPaginator[T any](options ...PaginatorOption) Paginator[T]

NewPaginator creates a new paginator with the given options. If MaxSize is less than DefaultSize, MaxSize is set to DefaultSize.

func (Paginator[T]) PrepareQuery

func (p Paginator[T]) PrepareQuery(q sq.SelectBuilder, page *Page) ([]T, sq.SelectBuilder)

PrepareQuery adds pagination to the query. It sets the number of max rows to limit+1.

func (Paginator[T]) PrepareRaw

func (p Paginator[T]) PrepareRaw(q string, args []any, page *Page) ([]T, string, []any)

func (Paginator[T]) PrepareResult

func (p Paginator[T]) PrepareResult(result []T, page *Page) []T

PrepareResult prepares the paginated result. If the number of rows is n+1: - it removes the last element, returning n elements - it sets more to true in the page object

type PaginatorOption

type PaginatorOption func(*PaginatorSettings)

func WithColumnFunc

func WithColumnFunc(f func(string) string) PaginatorOption

WithColumnFunc sets a function to transform column names.

func WithDefaultSize

func WithDefaultSize(size uint32) PaginatorOption

WithDefaultSize sets the default page size.

func WithMaxSize

func WithMaxSize(size uint32) PaginatorOption

WithMaxSize sets the maximum page size.

func WithSort

func WithSort(sort ...string) PaginatorOption

WithSort sets the default sort order.

type PaginatorSettings

type PaginatorSettings struct {
	// DefaultSize is the default number of rows per page.
	// If zero, DefaultPageSize is used.
	DefaultSize uint32

	// MaxSize is the maximum number of rows per page.
	// If zero, MaxPageSize is used. If less than DefaultSize, it is set to DefaultSize.
	MaxSize uint32

	// Sort is the default sort order.
	Sort []string

	// ColumnFunc is a transformation applied to  column names.
	ColumnFunc func(string) string
}

PaginatorSettings are the settings for the paginator.

type Querier

type Querier struct {
	Tx   pgx.Tx
	Scan *pgxscan.API
	SQL  *StatementBuilder
	// contains filtered or unexported fields
}

func (*Querier) BatchExec

func (q *Querier) BatchExec(ctx context.Context, queries Queries) ([]pgconn.CommandTag, error)

func (*Querier) BatchQuery

func (q *Querier) BatchQuery(ctx context.Context, queries Queries) (pgx.BatchResults, int, error)

func (*Querier) Exec

func (q *Querier) Exec(ctx context.Context, query Sqlizer) (pgconn.CommandTag, error)

func (*Querier) GetAll

func (q *Querier) GetAll(ctx context.Context, query Sqlizer, dest interface{}) error

func (*Querier) GetOne

func (q *Querier) GetOne(ctx context.Context, query Sqlizer, dest interface{}) error

func (*Querier) QueryRow

func (q *Querier) QueryRow(ctx context.Context, query Sqlizer) pgx.Row

func (*Querier) QueryRows

func (q *Querier) QueryRows(ctx context.Context, query Sqlizer) (pgx.Rows, error)

type Queries

type Queries []Query

func (*Queries) Add

func (q *Queries) Add(query Sqlizer)

func (Queries) Len

func (q Queries) Len() int

type Query

type Query Sqlizer

type RawSQL

type RawSQL struct {
	Query string
	Args  []interface{}
	// contains filtered or unexported fields
}

RawSQL allows you to build queries by hand easily. Note, it will auto-replace `?“ placeholders to postgres $X format. As well, if you run the same query over and over, consider to use `RawQuery(..)` instead, as it's a cached version of RawSQL.

func (RawSQL) Err

func (r RawSQL) Err() error

func (RawSQL) Prepare

func (r RawSQL) Prepare(query string) (string, int, error)

func (RawSQL) ToSql

func (r RawSQL) ToSql() (string, []interface{}, error)

type RawStatement

type RawStatement struct {
	// contains filtered or unexported fields
}

RawStatement allows you to build query statements by hand, where the query will remain the same but the arguments can change. The number of arguments must always be the same.

func RawQuery

func RawQuery(query string) RawStatement

func RawQueryf

func RawQueryf(queryFormat string, a ...interface{}) RawStatement

func (RawStatement) Build

func (r RawStatement) Build(args ...interface{}) Sqlizer

func (RawStatement) Err

func (r RawStatement) Err() error

func (RawStatement) GetQuery

func (r RawStatement) GetQuery() string

func (RawStatement) NumArgs

func (r RawStatement) NumArgs() int

type Record

type Record[T any, I ID] interface {
	*T // Enforce T is a pointer.
	GetID() I
	Validate() error
}

Records must be a pointer with the methods defined on the pointer.

type Sort

type Sort struct {
	Column string
	Order  Order
}

func NewSort

func NewSort(s string) (Sort, bool)

func (Sort) String

func (s Sort) String() string

type Sqlizer

type Sqlizer interface {
	// ToSql converts a runtime builder structure to an executable SQL query, returns:
	// query string, query values, and optional error
	ToSql() (string, []interface{}, error)
}

type StatementBuilder

type StatementBuilder struct {
	sq.StatementBuilderType
}

func (*StatementBuilder) InsertRecord

func (s *StatementBuilder) InsertRecord(record interface{}, optTableName ...string) InsertBuilder

func (StatementBuilder) InsertRecords

func (s StatementBuilder) InsertRecords(recordsSlice interface{}, optTableName ...string) InsertBuilder

func (StatementBuilder) UpdateRecord

func (s StatementBuilder) UpdateRecord(record interface{}, whereExpr sq.Eq, optTableName ...string) UpdateBuilder

func (StatementBuilder) UpdateRecordColumns

func (s StatementBuilder) UpdateRecordColumns(record interface{}, whereExpr sq.Eq, filterCols []string, optTableName ...string) UpdateBuilder

type Table

type Table[T any, P Record[T, I], I ID] struct {
	*DB
	Name      string
	IDColumn  string
	Paginator Paginator[P]
}

Table provides basic CRUD operations for database records. NOTICE: Experimental. Table and its methods are subject to change.

func (*Table[T, P, I]) Count

func (t *Table[T, P, I]) Count(ctx context.Context, where sq.Sqlizer) (uint64, error)

Count returns the number of matching records.

func (*Table[T, P, I]) DeleteByID

func (t *Table[T, P, I]) DeleteByID(ctx context.Context, id I) (bool, error)

DeleteByID deletes a record by ID. Uses soft delete if .SetDeletedAt() method exists. Returns (true, nil) if a row was deleted, (false, nil) if no row matched.

func (*Table[T, P, I]) Get

func (t *Table[T, P, I]) Get(ctx context.Context, where sq.Sqlizer, orderBy []string) (P, error)

Get returns the first record matching the condition.

func (*Table[T, P, I]) GetByID

func (t *Table[T, P, I]) GetByID(ctx context.Context, id I) (P, error)

GetByID returns a record by its ID.

func (*Table[T, P, I]) HardDeleteByID

func (t *Table[T, P, I]) HardDeleteByID(ctx context.Context, id I) (bool, error)

HardDeleteByID permanently deletes a record by ID. Returns (true, nil) if a row was deleted, (false, nil) if no row matched.

func (*Table[T, P, I]) Insert

func (t *Table[T, P, I]) Insert(ctx context.Context, records ...P) error

Insert inserts one or more records. Sets CreatedAt and UpdatedAt timestamps if available. Records are returned with their generated fields populated via RETURNING *.

func (*Table[T, P, I]) Iter

func (t *Table[T, P, I]) Iter(ctx context.Context, where sq.Sqlizer, orderBy []string) (iter.Seq2[P, error], error)

Iter returns an iterator for records matching the condition.

func (*Table[T, P, I]) List

func (t *Table[T, P, I]) List(ctx context.Context, where sq.Sqlizer, orderBy []string) ([]P, error)

List returns all records matching the condition.

func (*Table[T, P, I]) ListByIDs

func (t *Table[T, P, I]) ListByIDs(ctx context.Context, ids []I) ([]P, error)

ListByIDs returns records by their IDs.

func (*Table[T, P, I]) ListPaged

func (t *Table[T, P, I]) ListPaged(ctx context.Context, where sq.Sqlizer, page *Page) ([]P, *Page, error)

ListPaged returns paginated records matching the condition.

func (*Table[T, P, I]) LockForUpdate

func (t *Table[T, P, I]) LockForUpdate(ctx context.Context, where sq.Sqlizer, orderBy []string, updateFn func(record P)) error

LockForUpdate locks and updates one record using PostgreSQL's FOR UPDATE SKIP LOCKED pattern within a database transaction for safe concurrent processing. The record is processed exactly once across multiple workers. The record is automatically updated after updateFn() completes.

Keep updateFn() fast to avoid holding the transaction. For long-running work, update status to "processing" and return early, then process asynchronously. Use defer LockForUpdate() to update status to "completed" or "failed".

Returns ErrNoRows if no matching records are available for locking.

func (*Table[T, P, I]) LockForUpdates

func (t *Table[T, P, I]) LockForUpdates(ctx context.Context, where sq.Sqlizer, orderBy []string, limit uint64, updateFn func(records []P)) error

LockForUpdates locks and updates records using PostgreSQL's FOR UPDATE SKIP LOCKED pattern within a database transaction for safe concurrent processing. Each record is processed exactly once across multiple workers. Records are automatically updated after updateFn() completes.

Keep updateFn() fast to avoid holding the transaction. For long-running work, update status to "processing" and return early, then process asynchronously. Use defer LockForUpdate() to update status to "completed" or "failed".

func (*Table[T, P, I]) RestoreByID

func (t *Table[T, P, I]) RestoreByID(ctx context.Context, id I) error

RestoreByID restores a soft-deleted record by ID by clearing its DeletedAt timestamp. Returns an error if the record does not implement .SetDeletedAt().

func (*Table[T, P, I]) Save

func (t *Table[T, P, I]) Save(ctx context.Context, records ...P) error

Save inserts or updates given records. Auto-detects insert vs update by ID based on zerovalue of ID from GetID() method on record.

func (*Table[T, P, I]) Update

func (t *Table[T, P, I]) Update(ctx context.Context, records ...P) (bool, error)

Update updates one or more records by their ID. Sets UpdatedAt timestamp if available. Returns (true, nil) if at least one row was updated, (false, nil) if no rows matched.

func (*Table[T, P, I]) WithPaginator

func (t *Table[T, P, I]) WithPaginator(opts ...PaginatorOption) *Table[T, P, I]

WithPaginator returns a table instance with the given paginator.

func (*Table[T, P, I]) WithTx

func (t *Table[T, P, I]) WithTx(tx pgx.Tx) *Table[T, P, I]

WithTx returns a table instance bound to the given transaction.

type UpdateBuilder

type UpdateBuilder struct {
	sq.UpdateBuilder
	// contains filtered or unexported fields
}

func (UpdateBuilder) Err

func (b UpdateBuilder) Err() error

Directories

Path Synopsis
internal
reflectx
Package reflectx implements extensions to the standard reflect lib suitable for implementing marshalling and unmarshalling packages.
Package reflectx implements extensions to the standard reflect lib suitable for implementing marshalling and unmarshalling packages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL