postgres

package
v0.0.0-...-c92fa92 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package postgres implements storage.Store on top of PostgreSQL via the pgx stdlib driver. It mirrors the SQLite backend but uses $N placeholders and a SELECT ... FOR UPDATE SKIP LOCKED claim so the queue scales to many workers and instances.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is the Postgres-backed storage.Store implementation.

func Open

func Open(ctx context.Context, dsn string) (*Store, error)

Open connects to the Postgres database at dsn, applies migrations, and returns a ready Store.

func (*Store) ClaimDue

func (s *Store) ClaimDue(ctx context.Context, now time.Time, lease time.Duration, limit int) ([]core.PendingDelivery, error)

func (*Store) Close

func (s *Store) Close() error

Close closes the underlying database.

func (*Store) CreateDestination

func (s *Store) CreateDestination(ctx context.Context, d core.Destination) error

func (*Store) CreateSource

func (s *Store) CreateSource(ctx context.Context, src core.Source, secret []byte) error

func (*Store) DeliveriesForEvent

func (s *Store) DeliveriesForEvent(ctx context.Context, eventID string) ([]core.PendingDelivery, error)

func (*Store) Enqueue

func (s *Store) Enqueue(ctx context.Context, p core.PendingDelivery) error

func (*Store) GetDestination

func (s *Store) GetDestination(ctx context.Context, id string) (core.Destination, error)

func (*Store) GetEvent

func (s *Store) GetEvent(ctx context.Context, id string) (core.Event, error)

func (*Store) GetSource

func (s *Store) GetSource(ctx context.Context, name string) (core.Source, error)

func (*Store) ListAttempts

func (s *Store) ListAttempts(ctx context.Context, eventID string) ([]core.DeliveryAttempt, error)

func (*Store) ListDeadLetters

func (s *Store) ListDeadLetters(ctx context.Context, limit, offset int) ([]core.PendingDelivery, error)

func (*Store) ListDestinations

func (s *Store) ListDestinations(ctx context.Context, source string) ([]core.Destination, error)

func (*Store) ListEvents

func (s *Store) ListEvents(ctx context.Context, f storage.EventFilter) ([]core.Event, error)

func (*Store) ListSources

func (s *Store) ListSources(ctx context.Context) ([]core.Source, error)

func (*Store) MarkDeadLetter

func (s *Store) MarkDeadLetter(ctx context.Context, id string) error

func (*Store) MarkDelivered

func (s *Store) MarkDelivered(ctx context.Context, id string) error

func (*Store) Prune

func (s *Store) Prune(ctx context.Context, normalCutoff, failedCutoff time.Time) (int, error)

func (*Store) RecordAttempt

func (s *Store) RecordAttempt(ctx context.Context, a core.DeliveryAttempt) error

func (*Store) Reschedule

func (s *Store) Reschedule(ctx context.Context, id string, attempt int, next time.Time) error

func (*Store) SaveEvent

func (s *Store) SaveEvent(ctx context.Context, e core.Event) (string, bool, error)

func (*Store) SourceSecret

func (s *Store) SourceSecret(ctx context.Context, name string) ([]byte, error)

func (*Store) UpdateDestination

func (s *Store) UpdateDestination(ctx context.Context, d core.Destination) error

func (*Store) UpdateSource

func (s *Store) UpdateSource(ctx context.Context, src core.Source, secret []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL