pg

package
v0.25.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2026 License: MIT Imports: 18 Imported by: 5

Documentation

Index

Constants

View Source
const (
	JobLockStateNone     = ""
	JobLockStateHeld     = "held"
	JobLockStateLost     = "lost"
	JobLockStateReleased = "released"
)
View Source
const PingChannel = "listener_ping"

PingChannel is the PostgreSQL NOTIFY channel used for listener health checking. The Subscriber sends periodic pings on this channel and uses the received notifications to verify that the LISTEN connection is still alive.

Variables

View Source
var ErrListenerOverflow = errors.New("listener channel full")

ErrListenerOverflow is returned by Notify when a listener's channel is full and the overflow policy is OverflowError.

Functions

func BigintOrNull

func BigintOrNull(n int64) pgtype.Int8

BigintOrNull returns a pgtype.Int8 for the given value, but will return a Int8 value that represents null in the database if the value is zero.

func Date added in v0.7.2

func Date(t time.Time) pgtype.Date

Date converts a stdlib time.Time to a pgtype.Date.

func Int32 added in v0.23.4

func Int32(n int32) pgtype.Int4

func Int64 added in v0.19.0

func Int64(n int64) pgtype.Int8

Int64 converts a int64 to a pgtype.Int8.

func IsConstraintError

func IsConstraintError(err error, constraint string) bool

IsConstraintError checks if an error was caused by a specific constraint violation.

func PBool added in v0.11.2

func PBool(b *bool) pgtype.Bool

PBool converts a *bool to a pgtype.Bool.

func PInt2 added in v0.7.2

func PInt2(n *int16) pgtype.Int2

PInt2 returns a pgtype.Int2 for the given value, but will return a Int2 value that represents null in the database if the value is nil.

func PInt32 added in v0.11.6

func PInt32(n *int32) pgtype.Int4

PInt32 converts a *int32 to a pgtype.Int4.

func PInt64 added in v0.17.3

func PInt64(n *int64) pgtype.Int8

PInt64 converts a *int64 to a pgtype.Int8.

func PText added in v0.8.1

func PText(s *string) pgtype.Text

PText converts a *string to a pgtype.Text.

func PTime added in v0.7.2

func PTime(t *time.Time) pgtype.Timestamptz

PTime converts a stdlib *time.Time to a pgtype.Timestamptz.

func PUUID added in v0.11.5

func PUUID(u *uuid.UUID) pgtype.UUID

PUUID converts a *uuid.UUID to a pgtype.UUID.

func Publish added in v0.17.4

func Publish(
	ctx context.Context, db DBExec,
	channel string, message any,
) error

Publish a JSON message on a pubsub channel.

func Rollback added in v0.15.1

func Rollback(tx pgx.Tx, outErr *error)

Rollback rolls back a transaction and joins the rollback error to the outError if the rollback fails. If the transaction already has been committed/closed it's not treated as an error.

Defer a call to Rollback directly after a transaction has been created. That will give you the guarantee that everything you've done will be rolled back if you return early before committing.

func RunInJobLock added in v0.17.8

func RunInJobLock(
	ctx context.Context,
	db *pgxpool.Pool,
	logger *slog.Logger,
	serviceName string,
	lockName string,
	options JobLockOptions,
	fn func(ctx context.Context) error,
) error

RunInJobLock will attempt to acquire a job lock and run the provided function until the context is cancelled.

func SafeRollback deprecated

func SafeRollback(
	ctx context.Context, logger *slog.Logger, tx pgx.Tx, txName string,
)

SafeRollback rolls back a transaction and logs if the rollback fails. If the transaction already has been closed it's not treated as an error.

Deprecated: use Rollback() instead.

func SendListenerPing added in v0.24.0

func SendListenerPing(ctx context.Context, db DBExec) error

SendListenerPing sends a ping notification on the PingChannel. This can be used by external processes to keep the listener alive when the built-in ping sender is disabled.

func SetConnStringVariables

func SetConnStringVariables(conn string, vars url.Values) (string, error)

SetConnStringVariables parses a connection string URI and adds the given query string variables to it.

func Subscribe deprecated added in v0.17.2

func Subscribe(
	ctx context.Context,
	logger *slog.Logger,
	pool *pgxpool.Pool,
	channels ...ChannelSubscription,
)

Subscribe opens a connection to the database and subscribes to the provided channels. Blocks until the context is cancelled.

Deprecated: use NewSubscriber and Subscriber.Run instead, which adds ping-based health checking to detect dead connections.

func Text added in v0.11.5

func Text(s string) pgtype.Text

Text converts a string to a pgtype.Text.

func TextOrNull

func TextOrNull(s string) pgtype.Text

TextOrNull returns a pgtype.Text for the given string, but will return a Text value that represents null in the database if the string is empty.

func Time

func Time(t time.Time) pgtype.Timestamptz

Time converts a stdlib time.Time to a pgtype.Timestamptz.

func TimeOrNull

func TimeOrNull(t time.Time) pgtype.Timestamptz

Time converts a stdlib time.Time to a pgtype.Timestamptz, but will return a Timestamptz that represents a null value in the database if t is zero.

func ToUUIDPointer added in v0.11.5

func ToUUIDPointer(v pgtype.UUID) *uuid.UUID

ToUUIDPointer converts a pgtype.UUID to a *uuid.UUID.

func UUID added in v0.11.5

func UUID(u uuid.UUID) pgtype.UUID

UUID converts a uuid.UUID to a pgtype.UUID.

func WithTX

func WithTX(
	ctx context.Context, pool TransactionBeginner,
	fn func(tx pgx.Tx) error,
) (outErr error)

WithTX starts a transaction and calls the given function with it. If the function returns an error or panics the transaction will be rolled back.

Types

type ChannelSubscription added in v0.17.2

type ChannelSubscription interface {
	// ChannelName to listen to.
	ChannelName() string
	// NotifyWithPayload notifies local consumers of the message.
	NotifyWithPayload(data []byte) error
}

type DBExec added in v0.17.4

type DBExec interface {
	Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
}

type FanOut added in v0.17.2

type FanOut[T any] struct {
	// contains filtered or unexported fields
}

func NewFanOut added in v0.17.2

func NewFanOut[T any](channel string, opts ...FanOutOption) *FanOut[T]

func (*FanOut[T]) ChannelName added in v0.17.2

func (f *FanOut[T]) ChannelName() string

Implements ChannelSubscription.

func (*FanOut[T]) Listen added in v0.17.2

func (f *FanOut[T]) Listen(ctx context.Context, l chan T, test func(v T) bool)

Listen for notifications until the context is cancelled. The test function is used to filter out events before they are posted to the channel.

func (*FanOut[T]) ListenAll added in v0.17.2

func (f *FanOut[T]) ListenAll(ctx context.Context, l chan T)

ListenAll listens for notifications until the context is cancelled.

func (*FanOut[T]) Notify added in v0.17.2

func (f *FanOut[T]) Notify(msg T) error

Notify local consumers of a message.

func (*FanOut[T]) NotifyWithPayload added in v0.17.2

func (f *FanOut[T]) NotifyWithPayload(data []byte) error

Implements ChannelSubscription.

func (*FanOut[T]) Publish added in v0.17.4

func (f *FanOut[T]) Publish(ctx context.Context, db DBExec, msg T) error

Publish a message to the channel.

type FanOutOption added in v0.24.0

type FanOutOption func(*fanOutConfig)

FanOutOption configures a FanOut.

func WithOverflowPolicy added in v0.24.0

func WithOverflowPolicy(p OverflowPolicy) FanOutOption

WithOverflowPolicy sets the overflow policy for the FanOut.

type JobLock

type JobLock struct {
	// contains filtered or unexported fields
}

JobLock helps separate processes coordinate who should be performing a (background) task through postgres.

func NewJobLock

func NewJobLock(
	db *pgxpool.Pool, logger *slog.Logger, name string,
	opts JobLockOptions,
) (*JobLock, error)

NewJobLock creates a new job lock.

func (*JobLock) Identity added in v0.9.3

func (jl *JobLock) Identity() string

func (*JobLock) RunWithContext

func (jl *JobLock) RunWithContext(
	ctx context.Context,
	fn func(ctx context.Context) error,
) error

RunWithContext runs the provided function once the job lock has been acquired. The context provided to the function will be cancelled if the job lock is lost.

func (*JobLock) Stop

func (jl *JobLock) Stop()

Stop releases the job lock if held and stops all polling.

type JobLockOptions added in v0.7.0

type JobLockOptions struct {
	// PingInterval controls how often the job locked should be
	// pinged/renewed. Defaults to 10s.
	PingInterval time.Duration
	// StaleAfter controls after how long a time a held lock should be
	// considered stale and other clients will start attempting to steal
	// it. Must be longer than the ping interval. Defaults to four times the
	// ping interval.
	StaleAfter time.Duration
	// CheckInterval controls how often clients should check if a held lock
	// has become stale. Defaults to twice the ping interval.
	CheckInterval time.Duration
	// Timeout is the timeout that should be used for all lock
	// operations. Must be shorter than the ping interval. Defaults to half
	// the ping interval.
	Timeout time.Duration
}

JobLockOptions controls how a job lock should behave.

type JobLockState

type JobLockState string

type OverflowPolicy added in v0.24.0

type OverflowPolicy int

OverflowPolicy controls what happens when a FanOut listener's channel is full.

const (
	// OverflowDrop silently drops the message. This is the default and
	// preserves the existing behavior.
	OverflowDrop OverflowPolicy = iota
	// OverflowError causes Notify to return an error when a listener's
	// channel is full.
	OverflowError
)

type Subscriber added in v0.24.0

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber manages a PostgreSQL LISTEN connection with ping-based health checking. It detects silently dead connections (TCP drops, PgBouncer timeouts, network partitions) by sending periodic pings and using deadlines on the notification wait.

func NewSubscriber added in v0.24.0

func NewSubscriber(
	logger *slog.Logger,
	pool *pgxpool.Pool,
	channels []ChannelSubscription,
	opts ...SubscriberOption,
) *Subscriber

NewSubscriber creates a new Subscriber that listens on the given channels. By default it uses the provided pool for both listening and sending pings.

func (*Subscriber) Run added in v0.24.0

func (s *Subscriber) Run(ctx context.Context) error

Run starts the subscriber and blocks until the context is cancelled or a fatal error occurs. It automatically reconnects on ping timeouts.

type SubscriberOption added in v0.24.0

type SubscriberOption func(*Subscriber)

SubscriberOption configures a Subscriber.

func WithOnReconnect added in v0.24.0

func WithOnReconnect(fn func(ctx context.Context) error) SubscriberOption

WithOnReconnect sets a callback that is called before the initial listen and after each reconnect. This is useful for reloading state that may have changed while disconnected.

func WithPingDB added in v0.24.0

func WithPingDB(db DBExec) SubscriberOption

WithPingDB sets a separate database connection for sending pings. This is useful when the listen pool goes through PgBouncer in transaction mode, where LISTEN is not supported, but the ping sender needs a regular connection. Set to nil to disable the built-in ping sender entirely (useful when an external process sends pings).

func WithPingGrace added in v0.24.0

func WithPingGrace(d time.Duration) SubscriberOption

WithPingGrace sets how long the subscriber waits for a ping before declaring the connection dead. Must be longer than the ping interval. Defaults to 7 minutes.

func WithPingInterval added in v0.24.0

func WithPingInterval(d time.Duration) SubscriberOption

WithPingInterval sets how often the subscriber sends a ping notification. Defaults to 5 minutes.

type TransactionBeginner

type TransactionBeginner interface {
	Begin(context.Context) (pgx.Tx, error)
}

TransactionBeginner is the interface for something that can start a pgx transaction for use with WithTX().

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL