pg

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

go-pg

Postgresql Support for Go. This module provides:

  • Binding SQL statements to named arguments;
  • Support for mapping go structures to SQL tables, and vice versa;
  • Easy semantics for Insert, Delete, Update, Get and List operations;
  • Bulk insert operations and transactions;
  • Support for tracing and observability.

Documentation: https://pkg.go.dev/github.com/djthorpe/go-pg

Motivation

The package provides a simple way to interact with a Postgresql database from Go, to reduce the amount of boilerplate code required to interact with the database. The supported operations align with API calls POST, PUT, GET, DELETE and PATCH.

  • Insert - Insert a row into a table, and return the inserted row (POST or PUT);
  • Delete - Delete one or more rows from a table, and optionally return the deleted rows (DELETE);
  • Update - Update one or more rows in a table, and optionally return the updated rows (PATCH);
  • Get - Get a single row from a table (GET);
  • List - Get a list of rows from a table (GET).

In order to support database operations on go types, those types need to implement one or more of the following interfaces:

Selector
type Selector interface {
  // Bind row selection variables, returning the SQL statement required for the operation
  // The operation can be Get, Update, Delete or List
  Select(*Bind, Op) (string, error)
}

A type which implements a Selector interface can be used to select rows from a table, for get, list, update and deleting operations.

Reader
type Reader interface {
  // Scan a row into the receiver
  Scan(Row) error
}

A type which implements a Reader interface can be used to translate SQL types to the type instance. If multiple rows are returned, then the Scan method is called repeatly until no more rows are returned.

type ListReader interface {
  // Scan a count of returned rows into the receiver
  ScanCount(Row) error
}

A type which implements a ListReader interface can be used to scan the count of rows returned.

Writer
type Writer interface {
  // Bind insert parameters, returning the SQL statement required for the insert
  Insert(*Bind) (string, error)

  // Bind update parameters
  Update(*Bind) error
}

A type which implements a Writer interface can be used to bind object instance variables to SQL parameters. An example of how to implement an API gateway using this package is shown below.

Database Server Connection Pool

You can create a connection pool to a database server using the pg.NewPool function:

import (
  pg "github.com/djthorpe/go-pg"
)

func main() {
  pool, err := pg.NewPool(ctx,
    pg.WithHostPort(host, port),
    pg.WithCredentials("postgres", "password"),
    pg.WithDatabase(name),
  )
  if err != nil {
      panic(err)
  }
  defer pool.Close()

  // ...
}

The options that can be passed to pg.NewPool are:

  • WithCredentials(string,string) - Set connection pool username and password. If the database name is not set, then the username will be used as the default database name.
  • WithDatabase(string) - Set the database name for the connection. If the user name is not set, then the database name will be used as the user name.
  • WithAddr(string) - Set the address (host) or (host:port) for the connection
  • WithHostPort(string, string) - Set the hostname and port for the connection. If the port is not set, then the default port 5432 will be used.
  • WithSSLMode( string) - Set the SSL connection mode. Valid values are "disable", "allow", "prefer", "require", "verify-ca", "verify-full". See https://www.postgresql.org/docs/current/libpq-ssl.html for more information.
  • pg.WithTrace(pg.TraceFn) - Set the trace function for the connection pool. The signature of the trace unction is func(ctx context.Context, sql string, args any, err error) and is called for every query executed by the connection pool.
  • pg.WithBind(string,any) - Set the bind variable to a value the the lifetime of the connection.

Executing Statements

To simply execute a statement, use the Exec call:

  if err := pool.Exec(ctx, `CREATE TABLE test (id SERIAL PRIMARY KEY, name TEXT)`); err != nil {
    panic(err)
  }

You can use bind variables to bind named arguments to a statement using the With function. Within the statement, the following formats are replaced with bound values:

  • ${"name"} - Replace with the value of the named argument "name", double-quoted string
  • ${'name'} - Replace with the value of the named argument "name", single-quoted string
  • ${name} - Replace with the value of the named argument "name", unquoted string
  • $$ - Pass a literal dollar sign
  • @name - Pass by bound variable parameter

For example,

  var name string
  // ...
  if err := pool.With("table", "test", "name", name).Exec(ctx, `INSERT INTO ${"table"} (name) VALUES (@name)`); err != nil {
    panic(err)
  }

This will re-use or create a new database connection from the connection, pool, bind the named arguments, replace the named arguments in the statement, and execute the statement.

Implementing Get

If you have a http handler which needs to get a row from a table, you can implement a Selector interface. For example,

type MyObject struct {
  Id int
  Name string
}

// Reader - bind to object
func (obj *MyObject) Scan(row pg.Row) error {
  return row.Scan(&obj.Id, &obj.Name)
}

// Selector - select rows from database
func (obj MyObject) Select(bind *pg.Bind, op pg.Op) (string, error) {
  switch op {
  case pg.Get:
    bind.Set("id", obj.Id)
    return `SELECT id, name FROM mytable WHERE id=@id`, nil
  }
}

// Select the row from the database
func main() {
  // ...
  var obj MyObject
  if err := conn.Get(ctx, &obj, MyObject{ Id: 1 }); err != nil {
    panic(err)
  }
  // ...
}

Implementing List

You may wish to use paging to list rows from a table. The List operation is used to list rows from a table, with offset and limit parameters. The http handler may look like this:

func ListHandler(w http.ResponseWriter, r *http.Request) {
  var conn pg.Conn

  // ....Set pool....

  // Get up to 10 rows
  var response MyList
  if err := conn.List(ctx, &response, MyListRequest{Offset: 0, Limit: 10}); err != nil {
    http.Error(w, err.Error(), http.StatusInternalServerError)
    return
  }

  // Write the row to the response - TODO: Add Content-Type header
  json.NewEncoder(w).Encode(response)
}

The implementation of MyList and MyListRequest may look like this:

type MyListRequest struct {
  Offset uint64
  Limit uint64
}

type MyList struct {
  Count uint64
  Names []string
}

// Reader - note this needs to be a pointer receiver
func (obj *MyList) Scan(row pg.Row) error {
  var name string
  if err := row.Scan(&name); err != nil {
    return err
  }
  obj = append(obj, row.String())
  return nil
}

// ListReader - optional interface to scan count of all rows
func (obj MyList) Scan(row pg.Row) error {
 return row.Scan(&obj.Count)
}

// Selector - select rows from database. Use bind variables
// offsetlimit, groupby and orderby to filter the selected rows.
func (obj MyListRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
  bind.Set("offsetlimit", fmt.Sprintf("OFFSET %v LIMIT %v",obj.Offset,obj.Limit))
  switch op {
  case pg.List:
    return `SELECT name FROM mytable`, nil
  default:
    return "", fmt.Errorf("Unsupported operation: ",op)
  }
}

You can of course use a WHERE clause in your query to filter the rows returned from the table. Always implement the offsetlimit as a bind variable.

Implementing Insert

TODO

Implementing Patch

TODO

Implementing Delete

TODO

Transactions

Transactions are executed within a function called Tx. For example,

  if err := pool.Tx(ctx, func(tx pg.Tx) error {
    if err := tx.Exec(ctx, `CREATE TABLE test (id SERIAL PRIMARY KEY, name TEXT)`); err != nil {
      return err
    }
    if err := tx.Exec(ctx, `INSERT INTO test (name) VALUES ('hello')`); err != nil {
      return err
    }
    return nil
  }); err != nil {
    panic(err)
  }

Any error returned from the function will cause the transaction to be rolled back. If the function returns nil, then the transaction will be committed. Transactions can be nested.

Notify and Listen

TODO

Schema Support

  • Checking if a schema exists
  • Creating a schema
  • Dropping a schema

Error Handing and Tracing

TODO

Testing Support

TODO

Documentation

Index

Constants

View Source
const (
	DefaultPort = "5432"
)

Variables

This section is empty.

Functions

func NewListener

func NewListener(conn PoolConn) *listener

NewListener return a Listener for the given pool. If pool is nil then return nil

func NewTracer

func NewTracer(fn TraceFn) *tracer

Create a new query tracer

func SchemaCreate

func SchemaCreate(ctx context.Context, conn Conn, name string) error

func SchemaDrop

func SchemaDrop(ctx context.Context, conn Conn, name string) error

func SchemaExists

func SchemaExists(ctx context.Context, conn Conn, name string) (bool, error)

Types

type Bind

type Bind struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Bind represents a set of variables and arguments to be used in a query. The vars are substituted in the query string itself, while the args are passed as arguments to the query.

func NewBind

func NewBind(pairs ...any) *Bind

Create a new Bind object with the given name/value pairs Returns nil if the number of arguments is not even

func (*Bind) Append

func (bind *Bind) Append(key string, value any) bool

Append a bind var to a list. Returns false if the key is not a list, or the value is not a list.

func (*Bind) Copy

func (bind *Bind) Copy(pairs ...any) *Bind

Make a copy of the bind object

func (*Bind) Del

func (bind *Bind) Del(key string)

Delete a bind var

func (*Bind) Exec

func (bind *Bind) Exec(ctx context.Context, conn pgx.Tx, query string) error

Execute a query

func (*Bind) Get

func (bind *Bind) Get(key string) any

Get a bind var

func (*Bind) Has

func (bind *Bind) Has(key string) bool

Return true if there is a bind var with the given key

func (*Bind) Join

func (bind *Bind) Join(key, sep string) string

Join a bind var with a separator, when it is a []any and return the result as a string. Returns an empty string if the key does not exist.

func (*Bind) MarshalJSON

func (bind *Bind) MarshalJSON() ([]byte, error)

func (*Bind) Query

func (bind *Bind) Query(ctx context.Context, conn pgx.Tx, query string) (pgx.Rows, error)

Query a set of rows and return the result

func (*Bind) QueryRow

func (bind *Bind) QueryRow(ctx context.Context, conn pgx.Tx, query string) pgx.Row

Query a row and return the result

func (*Bind) Replace

func (bind *Bind) Replace(query string) string

Return a query string with ${subtitution} replaced by the values:

  • ${key} => value
  • ${'key'} => 'value'
  • ${"key"} => "value"
  • $1 => $1
  • $$ => $$

func (*Bind) Set

func (bind *Bind) Set(key string, value any) string

Set a bind var, and return the parameter name

func (*Bind) String

func (bind *Bind) String() string

type Conn

type Conn interface {
	// Return a new connection with bound parameters
	With(...any) Conn

	// Perform a transaction within a function
	Tx(context.Context, func(Conn) error) error

	// Perform a bulk operation within a function (and indicate whether this
	// should be in a transaction)
	Bulk(context.Context, func(Conn) error) error

	// Execute a query
	Exec(context.Context, string) error

	// Perform an insert
	Insert(context.Context, Reader, Writer) error

	// Perform an update
	Update(context.Context, Reader, Selector, Writer) error

	// Perform a delete
	Delete(context.Context, Reader, Selector) error

	// Perform a get
	Get(context.Context, Reader, Selector) error

	// Perform a list. If the reader is a ListReader, then the
	// count of items is also calculated
	List(context.Context, Reader, Selector) error
}

type Err

type Err int
const (
	ErrSuccess Err = iota
	ErrNotFound
	ErrNotImplemented
	ErrBadParameter
)

func (Err) Error

func (e Err) Error() string

type ListReader

type ListReader interface {
	Reader

	// Scan count into the result
	ScanCount(Row) error
}

Bind a row to an object, and also count the number of rows

type Listener

type Listener interface {
	// Listen to a topic
	Listen(context.Context, string) error

	// Unlisten from a topic
	Unlisten(context.Context, string) error

	// Wait for a notification and return it
	WaitForNotification(context.Context) (*Notification, error)

	// Free resources
	Close(context.Context) error
}

Listener is an interface for listening to notifications

type Notification

type Notification struct {
	Channel string
	Payload []byte
}

type OffsetLimit

type OffsetLimit struct {
	Offset uint64  `json:"offset,omitempty"`
	Limit  *uint64 `json:"limit,omitempty"`
}

func (*OffsetLimit) Bind

func (r *OffsetLimit) Bind(bind *Bind, max uint64)

Bind the offset and limit SQL fragment to the bind object

func (*OffsetLimit) Clamp

func (r *OffsetLimit) Clamp(len uint64)

Clamp the limit to the maximum length

type Op

type Op uint

Operation type

const (
	None Op = iota
	Get
	Insert
	Update
	Delete
	List
)

Operations

func (Op) String

func (o Op) String() string

type Opt

type Opt func(*opt) error

Opt is a function which applies options for a connection pool

func WithAddr

func WithAddr(addr string) Opt

Set the address (host) or (host:port) for the connection

func WithBind

func WithBind(k string, v any) Opt

Set the bind variables for the connection pool

func WithCredentials

func WithCredentials(user, password string) Opt

Set connection pool username and password. If the database name is not set, then the username will be used as the default database name.

func WithDatabase

func WithDatabase(name string) Opt

Set the database name for the connection. If the user name is not set, then the database name will be used as the user name.

func WithHostPort

func WithHostPort(host, port string) Opt

Set the hostname and port for the connection. If the port is not set, then the default port 5432 will be used.

func WithSSLMode

func WithSSLMode(mode string) Opt

Set the postgresql SSL mode. Valid values are "disable", "allow", "prefer", "require", "verify-ca", "verify-full". See https://www.postgresql.org/docs/current/libpq-ssl.html for more information.

func WithTrace

func WithTrace(fn TraceFn) Opt

Set the trace function for the connection pool. The signature of the trace function is func(query string, args any, err error) and is called for every query executed by the connection pool.

type PoolConn

type PoolConn interface {
	Conn

	// Acquire a connection and ping it
	Ping(context.Context) error

	// Release resources
	Close()

	// Reset the connection pool
	Reset()
}

func NewPool

func NewPool(ctx context.Context, opts ...Opt) (PoolConn, error)

Create a new connection pool

type Reader

type Reader interface {
	// Scan row into a result
	Scan(Row) error
}

Bind a row to an object

type Row

type Row pgx.Row

Row scanner

type Selector

type Selector interface {
	// Set bind parameters for getting, updating or deleting
	Select(*Bind, Op) (string, error)
}

Bind selection parameters for getting, updating or deleting

type TraceFn

type TraceFn func(context.Context, string, any, error)

TraceFn is a function which is called when a query is executed, with the execution context, the SQL and arguments, and the error if any was generated

type Writer

type Writer interface {
	// Set bind parameters for an insert
	Insert(*Bind) (string, error)

	// Set bind parameters for an update
	Update(*Bind) error
}

Bind an object to bind parameters for inserting or updating

Directories

Path Synopsis
pkg
test
The `test` package supports running unit tests with containerized supporting services.
The `test` package supports running unit tests with containerized supporting services.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL