Documentation
¶
Index ¶
- Constants
- Variables
- func BigintOrNull(n int64) pgtype.Int8
- func Date(t time.Time) pgtype.Date
- func Int32(n int32) pgtype.Int4
- func Int64(n int64) pgtype.Int8
- func IsConstraintError(err error, constraint string) bool
- func PBool(b *bool) pgtype.Bool
- func PInt2(n *int16) pgtype.Int2
- func PInt32(n *int32) pgtype.Int4
- func PInt64(n *int64) pgtype.Int8
- func PText(s *string) pgtype.Text
- func PTime(t *time.Time) pgtype.Timestamptz
- func PUUID(u *uuid.UUID) pgtype.UUID
- func Publish(ctx context.Context, db DBExec, channel string, message any) error
- func Rollback(tx pgx.Tx, outErr *error)
- func RunInJobLock(ctx context.Context, db *pgxpool.Pool, logger *slog.Logger, serviceName string, ...) error
- func SafeRollback(ctx context.Context, logger *slog.Logger, tx pgx.Tx, txName string)deprecated
- func SendListenerPing(ctx context.Context, db DBExec) error
- func SetConnStringVariables(conn string, vars url.Values) (string, error)
- func Subscribe(ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, ...)deprecated
- func Text(s string) pgtype.Text
- func TextOrNull(s string) pgtype.Text
- func Time(t time.Time) pgtype.Timestamptz
- func TimeOrNull(t time.Time) pgtype.Timestamptz
- func ToUUIDPointer(v pgtype.UUID) *uuid.UUID
- func UUID(u uuid.UUID) pgtype.UUID
- func WithTX(ctx context.Context, pool TransactionBeginner, fn func(tx pgx.Tx) error) (outErr error)
- type ChannelSubscription
- type DBExec
- type FanOut
- func (f *FanOut[T]) ChannelName() string
- func (f *FanOut[T]) EnableRecovery(metrics *elephantine.MetricsHelper, bounce func(), ...) error
- func (f *FanOut[T]) Listen(ctx context.Context, l chan T, test func(v T) bool)
- func (f *FanOut[T]) ListenAll(ctx context.Context, l chan T)
- func (f *FanOut[T]) Notify(msg T) error
- func (f *FanOut[T]) NotifyWithPayload(data []byte) error
- func (f *FanOut[T]) Polled(found int)
- func (f *FanOut[T]) Publish(ctx context.Context, db DBExec, msg T) error
- type FanOutOption
- type FanOutRecoveryOption
- type JobLock
- type JobLockOptions
- type JobLockState
- type OverflowPolicy
- type Subscriber
- type SubscriberOption
- type TransactionBeginner
Constants ¶
const ( JobLockStateNone = "" JobLockStateHeld = "held" JobLockStateLost = "lost" JobLockStateReleased = "released" )
const PingChannel = "listener_ping"
PingChannel is the PostgreSQL NOTIFY channel used for listener health checking. The Subscriber sends periodic pings on this channel and uses the received notifications to verify that the LISTEN connection is still alive.
Variables ¶
var ErrListenerOverflow = errors.New("listener channel full")
ErrListenerOverflow is returned by Notify when a listener's channel is full and the overflow policy is OverflowError.
Functions ¶
func BigintOrNull ¶
BigintOrNull returns a pgtype.Int8 for the given value, but will return a Int8 value that represents null in the database if the value is zero.
func IsConstraintError ¶
IsConstraintError checks if an error was caused by a specific constraint violation.
func PInt2 ¶ added in v0.7.2
PInt2 returns a pgtype.Int2 for the given value, but will return a Int2 value that represents null in the database if the value is nil.
func PTime ¶ added in v0.7.2
func PTime(t *time.Time) pgtype.Timestamptz
PTime converts a stdlib *time.Time to a pgtype.Timestamptz.
func Rollback ¶ added in v0.15.1
Rollback rolls back a transaction and joins the rollback error to the outError if the rollback fails. If the transaction already has been committed/closed it's not treated as an error.
Defer a call to Rollback directly after a transaction has been created. That will give you the guarantee that everything you've done will be rolled back if you return early before committing.
func RunInJobLock ¶ added in v0.17.8
func RunInJobLock( ctx context.Context, db *pgxpool.Pool, logger *slog.Logger, serviceName string, lockName string, options JobLockOptions, fn func(ctx context.Context) error, ) error
RunInJobLock will attempt to acquire a job lock and run the provided function until the context is cancelled.
func SendListenerPing ¶ added in v0.24.0
SendListenerPing sends a ping notification on the PingChannel. This can be used by external processes to keep the listener alive when the built-in ping sender is disabled.
func SetConnStringVariables ¶
SetConnStringVariables parses a connection string URI and adds the given query string variables to it.
func Subscribe
deprecated
added in
v0.17.2
func Subscribe( ctx context.Context, logger *slog.Logger, pool *pgxpool.Pool, channels ...ChannelSubscription, )
Subscribe opens a connection to the database and subscribes to the provided channels. Blocks until the context is cancelled.
Deprecated: use NewSubscriber and Subscriber.Run instead, which adds ping-based health checking to detect dead connections.
func TextOrNull ¶
TextOrNull returns a pgtype.Text for the given string, but will return a Text value that represents null in the database if the string is empty.
func Time ¶
func Time(t time.Time) pgtype.Timestamptz
Time converts a stdlib time.Time to a pgtype.Timestamptz.
func TimeOrNull ¶
func TimeOrNull(t time.Time) pgtype.Timestamptz
Time converts a stdlib time.Time to a pgtype.Timestamptz, but will return a Timestamptz that represents a null value in the database if t is zero.
func ToUUIDPointer ¶ added in v0.11.5
ToUUIDPointer converts a pgtype.UUID to a *uuid.UUID.
Types ¶
type ChannelSubscription ¶ added in v0.17.2
type FanOut ¶ added in v0.17.2
type FanOut[T any] struct { // contains filtered or unexported fields }
func NewFanOut ¶ added in v0.17.2
func NewFanOut[T any](channel string, opts ...FanOutOption) *FanOut[T]
func (*FanOut[T]) ChannelName ¶ added in v0.17.2
Implements ChannelSubscription.
func (*FanOut[T]) EnableRecovery ¶ added in v0.27.1
func (f *FanOut[T]) EnableRecovery( metrics *elephantine.MetricsHelper, bounce func(), opts ...FanOutRecoveryOption, ) error
EnableRecovery wires this FanOut into the recovery pattern documented in elephantine's docs/fanout-recovery.md: callers report fallback-poll findings via Polled, the FanOut resets the streak whenever it dispatches a wire-side notification (NotifyWithPayload), and bounce is invoked once the poll-saved streak crosses the configured threshold (default 5). Pass Subscriber.Bounce for bounce so the recovery loop closes back onto the listener.
EnableRecovery registers a per-channel poll-saved counter and streak gauge through metrics, using the channel name (sanitized) as the metric name suffix. Registration errors are returned and also recorded on the metrics helper.
Calling EnableRecovery more than once replaces the previous wiring; that is rarely useful but legal.
func (*FanOut[T]) Listen ¶ added in v0.17.2
Listen for notifications until the context is cancelled. The test function is used to filter out events before they are posted to the channel.
func (*FanOut[T]) ListenAll ¶ added in v0.17.2
ListenAll listens for notifications until the context is cancelled.
func (*FanOut[T]) NotifyWithPayload ¶ added in v0.17.2
Implements ChannelSubscription. The wire-side delivery path: invoked by the Subscriber when a Postgres NOTIFY arrives on this channel. Successful dispatch resets the recovery streak (if enabled), signalling that the LISTEN connection is healthy.
func (*FanOut[T]) Polled ¶ added in v0.27.1
Polled reports that a fallback-poll iteration discovered `found` items of work. With recovery enabled, a call with found > 0 advances the consecutive non-empty-poll streak by one (regardless of how many items were drained, so a backlog-clearing poll does not trip the threshold by itself) and adds `found` to the per-item counter. Once the streak crosses the configured threshold the bounce callback fires and the streak resets. With recovery not enabled, Polled is a no-op; consumers may call it unconditionally. A call with found <= 0 is always a no-op.
type FanOutOption ¶ added in v0.24.0
type FanOutOption func(*fanOutConfig)
FanOutOption configures a FanOut.
func WithOverflowPolicy ¶ added in v0.24.0
func WithOverflowPolicy(p OverflowPolicy) FanOutOption
WithOverflowPolicy sets the overflow policy for the FanOut.
type FanOutRecoveryOption ¶ added in v0.27.1
type FanOutRecoveryOption func(*fanOutRecoveryConfig)
FanOutRecoveryOption configures the recovery behaviour wired up by FanOut.EnableRecovery.
func WithBounceThreshold ¶ added in v0.27.1
func WithBounceThreshold(n int) FanOutRecoveryOption
WithBounceThreshold sets how many consecutive non-empty fallback-poll drains (calls to FanOut.Polled with found > 0) without an intervening notification trigger a bounce. The count is of *calls*, not items: a single chunky drain that catches up a backlog counts as one, so a busy publisher does not trip the threshold with one mistimed poll. Zero or negative disables the bounce; the metric is still maintained. Defaults to 5.
type JobLock ¶
type JobLock struct {
// contains filtered or unexported fields
}
JobLock helps separate processes coordinate who should be performing a (background) task through postgres.
func NewJobLock ¶
func NewJobLock( db *pgxpool.Pool, logger *slog.Logger, name string, opts JobLockOptions, ) (*JobLock, error)
NewJobLock creates a new job lock.
func (*JobLock) RunWithContext ¶
RunWithContext runs the provided function once the job lock has been acquired. The context provided to the function will be cancelled if the job lock is lost.
type JobLockOptions ¶ added in v0.7.0
type JobLockOptions struct {
// PingInterval controls how often the job locked should be
// pinged/renewed. Defaults to 10s.
PingInterval time.Duration
// StaleAfter controls after how long a time a held lock should be
// considered stale and other clients will start attempting to steal
// it. Must be longer than the ping interval. Defaults to four times the
// ping interval.
StaleAfter time.Duration
// CheckInterval controls how often clients should check if a held lock
// has become stale. Defaults to twice the ping interval.
CheckInterval time.Duration
// Timeout is the timeout that should be used for all lock
// operations. Must be shorter than the ping interval. Defaults to half
// the ping interval.
Timeout time.Duration
}
JobLockOptions controls how a job lock should behave.
type JobLockState ¶
type JobLockState string
type OverflowPolicy ¶ added in v0.24.0
type OverflowPolicy int
OverflowPolicy controls what happens when a FanOut listener's channel is full.
const ( // OverflowDrop silently drops the message. This is the default and // preserves the existing behavior. OverflowDrop OverflowPolicy = iota // OverflowError causes Notify to return an error when a listener's // channel is full. OverflowError )
type Subscriber ¶ added in v0.24.0
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber manages a PostgreSQL LISTEN connection with ping-based health checking. It detects silently dead connections (TCP drops, PgBouncer timeouts, network partitions) by sending periodic pings and using deadlines on the notification wait.
func NewSubscriber ¶ added in v0.24.0
func NewSubscriber( logger *slog.Logger, pool *pgxpool.Pool, channels []ChannelSubscription, opts ...SubscriberOption, ) *Subscriber
NewSubscriber creates a new Subscriber that listens on the given channels. By default it uses the provided pool for both listening and sending pings.
func (*Subscriber) Bounce ¶ added in v0.27.1
func (s *Subscriber) Bounce()
Bounce signals the listen loop to tear down the current PostgreSQL LISTEN connection and reconnect on the next iteration. The call is non-blocking; concurrent Bounce invocations coalesce into a single pending signal, and the listen loop drains any pending signal at the start of each connection so a signal queued during a previous outage does not immediately tear down a fresh connection.
Bounce is intended for forcing a reconnect when an external signal (e.g. a RecoveryTracker noticing that polling is keeping a consumer afloat without notifications) suggests the LISTEN connection is silently broken even though the ping path appears healthy.
type SubscriberOption ¶ added in v0.24.0
type SubscriberOption func(*Subscriber)
SubscriberOption configures a Subscriber.
func WithOnReconnect ¶ added in v0.24.0
func WithOnReconnect(fn func(ctx context.Context) error) SubscriberOption
WithOnReconnect sets a callback that is called before the initial listen and after each reconnect. This is useful for reloading state that may have changed while disconnected.
func WithPingDB ¶ added in v0.24.0
func WithPingDB(db DBExec) SubscriberOption
WithPingDB sets a separate database connection for sending pings. This is useful when the listen pool goes through PgBouncer in transaction mode, where LISTEN is not supported, but the ping sender needs a regular connection. Set to nil to disable the built-in ping sender entirely (useful when an external process sends pings).
func WithPingGrace ¶ added in v0.24.0
func WithPingGrace(d time.Duration) SubscriberOption
WithPingGrace sets how long the subscriber waits for a ping before declaring the connection dead. Must be longer than the ping interval. Defaults to 7 minutes.
func WithPingInterval ¶ added in v0.24.0
func WithPingInterval(d time.Duration) SubscriberOption
WithPingInterval sets how often the subscriber sends a ping notification. Defaults to 5 minutes.