Documentation
¶
Index ¶
- Constants
- Variables
- func BigintOrNull(n int64) pgtype.Int8
- func Date(t time.Time) pgtype.Date
- func Int32(n int32) pgtype.Int4
- func Int64(n int64) pgtype.Int8
- func IsConstraintError(err error, constraint string) bool
- func PBool(b *bool) pgtype.Bool
- func PInt2(n *int16) pgtype.Int2
- func PInt32(n *int32) pgtype.Int4
- func PInt64(n *int64) pgtype.Int8
- func PText(s *string) pgtype.Text
- func PTime(t *time.Time) pgtype.Timestamptz
- func PUUID(u *uuid.UUID) pgtype.UUID
- func Publish(ctx context.Context, db DBExec, channel string, message any) error
- func Rollback(tx pgx.Tx, outErr *error)
- func RunInJobLock(ctx context.Context, db *pgxpool.Pool, logger *slog.Logger, serviceName string, ...) error
- func SafeRollback(ctx context.Context, logger *slog.Logger, tx pgx.Tx, txName string)deprecated
- func SendListenerPing(ctx context.Context, db DBExec) error
- func SetConnStringVariables(conn string, vars url.Values) (string, error)
- func Subscribe(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, ...)deprecated
- func Text(s string) pgtype.Text
- func TextOrNull(s string) pgtype.Text
- func Time(t time.Time) pgtype.Timestamptz
- func TimeOrNull(t time.Time) pgtype.Timestamptz
- func ToUUIDPointer(v pgtype.UUID) *uuid.UUID
- func UUID(u uuid.UUID) pgtype.UUID
- func WithTX(ctx context.Context, pool TransactionBeginner, fn func(tx pgx.Tx) error) (outErr error)
- type ChannelSubscription
- type DBExec
- type FanOut
- func (f *FanOut[T]) ChannelName() string
- func (f *FanOut[T]) Listen(ctx context.Context, l chan T, test func(v T) bool)
- func (f *FanOut[T]) ListenAll(ctx context.Context, l chan T)
- func (f *FanOut[T]) Notify(msg T) error
- func (f *FanOut[T]) NotifyWithPayload(data []byte) error
- func (f *FanOut[T]) Publish(ctx context.Context, db DBExec, msg T) error
- type FanOutOption
- type JobLock
- type JobLockOptions
- type JobLockState
- type OverflowPolicy
- type Subscriber
- type SubscriberOption
- type TransactionBeginner
Constants ¶
const ( JobLockStateNone = "" JobLockStateHeld = "held" JobLockStateLost = "lost" JobLockStateReleased = "released" )
const PingChannel = "listener_ping"
PingChannel is the PostgreSQL NOTIFY channel used for listener health checking. The Subscriber sends periodic pings on this channel and uses the received notifications to verify that the LISTEN connection is still alive.
Variables ¶
var ErrListenerOverflow = errors.New("listener channel full")
ErrListenerOverflow is returned by Notify when a listener's channel is full and the overflow policy is OverflowError.
Functions ¶
func BigintOrNull ¶
BigintOrNull returns a pgtype.Int8 for the given value, but will return a Int8 value that represents null in the database if the value is zero.
func IsConstraintError ¶
IsConstraintError checks if an error was caused by a specific constraint violation.
func PInt2 ¶ added in v0.7.2
PInt2 returns a pgtype.Int2 for the given value, but will return a Int2 value that represents null in the database if the value is nil.
func PTime ¶ added in v0.7.2
func PTime(t *time.Time) pgtype.Timestamptz
PTime converts a stdlib *time.Time to a pgtype.Timestamptz.
func Rollback ¶ added in v0.15.1
Rollback rolls back a transaction and joins the rollback error to the outError if the rollback fails. If the transaction already has been committed/closed it's not treated as an error.
Defer a call to Rollback directly after a transaction has been created. That will give you the guarantee that everything you've done will be rolled back if you return early before committing.
func RunInJobLock ¶ added in v0.17.8
func RunInJobLock( ctx context.Context, db *pgxpool.Pool, logger *slog.Logger, serviceName string, lockName string, options JobLockOptions, fn func(ctx context.Context) error, ) error
RunInJobLock will attempt to acquire a job lock and run the provided function until the context is cancelled.
func SendListenerPing ¶ added in v0.24.0
SendListenerPing sends a ping notification on the PingChannel. This can be used by external processes to keep the listener alive when the built-in ping sender is disabled.
func SetConnStringVariables ¶
SetConnStringVariables parses a connection string URI and adds the given query string variables to it.
func Subscribe
deprecated
added in
v0.17.2
func Subscribe( ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, channels ...ChannelSubscription, )
Subscribe opens a connection to the database and subscribes to the provided channels. Blocks until the context is cancelled.
Deprecated: use NewSubscriber and Subscriber.Run instead, which adds ping-based health checking to detect dead connections.
func TextOrNull ¶
TextOrNull returns a pgtype.Text for the given string, but will return a Text value that represents null in the database if the string is empty.
func Time ¶
func Time(t time.Time) pgtype.Timestamptz
Time converts a stdlib time.Time to a pgtype.Timestamptz.
func TimeOrNull ¶
func TimeOrNull(t time.Time) pgtype.Timestamptz
Time converts a stdlib time.Time to a pgtype.Timestamptz, but will return a Timestamptz that represents a null value in the database if t is zero.
func ToUUIDPointer ¶ added in v0.11.5
ToUUIDPointer converts a pgtype.UUID to a *uuid.UUID.
Types ¶
type ChannelSubscription ¶ added in v0.17.2
type FanOut ¶ added in v0.17.2
type FanOut[T any] struct { // contains filtered or unexported fields }
func NewFanOut ¶ added in v0.17.2
func NewFanOut[T any](channel string, opts ...FanOutOption) *FanOut[T]
func (*FanOut[T]) ChannelName ¶ added in v0.17.2
Implements ChannelSubscription.
func (*FanOut[T]) Listen ¶ added in v0.17.2
Listen for notifications until the context is cancelled. The test function is used to filter out events before they are posted to the channel.
func (*FanOut[T]) ListenAll ¶ added in v0.17.2
ListenAll listens for notifications until the context is cancelled.
func (*FanOut[T]) NotifyWithPayload ¶ added in v0.17.2
Implements ChannelSubscription.
type FanOutOption ¶ added in v0.24.0
type FanOutOption func(*fanOutConfig)
FanOutOption configures a FanOut.
func WithOverflowPolicy ¶ added in v0.24.0
func WithOverflowPolicy(p OverflowPolicy) FanOutOption
WithOverflowPolicy sets the overflow policy for the FanOut.
type JobLock ¶
type JobLock struct {
// contains filtered or unexported fields
}
JobLock helps separate processes coordinate who should be performing a (background) task through postgres.
func NewJobLock ¶
func NewJobLock( db *pgxpool.Pool, logger *slog.Logger, name string, opts JobLockOptions, ) (*JobLock, error)
NewJobLock creates a new job lock.
func (*JobLock) RunWithContext ¶
RunWithContext runs the provided function once the job lock has been acquired. The context provided to the function will be cancelled if the job lock is lost.
type JobLockOptions ¶ added in v0.7.0
type JobLockOptions struct {
// PingInterval controls how often the job locked should be
// pinged/renewed. Defaults to 10s.
PingInterval time.Duration
// StaleAfter controls after how long a time a held lock should be
// considered stale and other clients will start attempting to steal
// it. Must be longer than the ping interval. Defaults to four times the
// ping interval.
StaleAfter time.Duration
// CheckInterval controls how often clients should check if a held lock
// has become stale. Defaults to twice the ping interval.
CheckInterval time.Duration
// Timeout is the timeout that should be used for all lock
// operations. Must be shorter than the ping interval. Defaults to half
// the ping interval.
Timeout time.Duration
}
JobLockOptions controls how a job lock should behave.
type JobLockState ¶
type JobLockState string
type OverflowPolicy ¶ added in v0.24.0
type OverflowPolicy int
OverflowPolicy controls what happens when a FanOut listener's channel is full.
const ( // OverflowDrop silently drops the message. This is the default and // preserves the existing behavior. OverflowDrop OverflowPolicy = iota // OverflowError causes Notify to return an error when a listener's // channel is full. OverflowError )
type Subscriber ¶ added in v0.24.0
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber manages a PostgreSQL LISTEN connection with ping-based health checking. It detects silently dead connections (TCP drops, PgBouncer timeouts, network partitions) by sending periodic pings and using deadlines on the notification wait.
func NewSubscriber ¶ added in v0.24.0
func NewSubscriber( logger *slog.Logger, pool *pgxpool.Pool, channels []ChannelSubscription, opts ...SubscriberOption, ) *Subscriber
NewSubscriber creates a new Subscriber that listens on the given channels. By default it uses the provided pool for both listening and sending pings.
type SubscriberOption ¶ added in v0.24.0
type SubscriberOption func(*Subscriber)
SubscriberOption configures a Subscriber.
func WithOnReconnect ¶ added in v0.24.0
func WithOnReconnect(fn func(ctx context.Context) error) SubscriberOption
WithOnReconnect sets a callback that is called before the initial listen and after each reconnect. This is useful for reloading state that may have changed while disconnected.
func WithPingDB ¶ added in v0.24.0
func WithPingDB(db DBExec) SubscriberOption
WithPingDB sets a separate database connection for sending pings. This is useful when the listen pool goes through PgBouncer in transaction mode, where LISTEN is not supported, but the ping sender needs a regular connection. Set to nil to disable the built-in ping sender entirely (useful when an external process sends pings).
func WithPingGrace ¶ added in v0.24.0
func WithPingGrace(d time.Duration) SubscriberOption
WithPingGrace sets how long the subscriber waits for a ping before declaring the connection dead. Must be longer than the ping interval. Defaults to 7 minutes.
func WithPingInterval ¶ added in v0.24.0
func WithPingInterval(d time.Duration) SubscriberOption
WithPingInterval sets how often the subscriber sends a ping notification. Defaults to 5 minutes.