outbox

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoTransaction = fmt.Errorf("database transaction is not set")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// TableName is the name of the database table used by the outbox
	// pattern to store pending events before they are dispatched.
	// Default: "event_outbox"
	TableName string

	// NotifyChannel is the PostgreSQL LISTEN/NOTIFY channel that is
	// used to signal the dispatcher when new events are inserted into
	// the outbox table.
	// Default: "outbox_events"
	NotifyChannel string

	// SendNotify determines whether an explicit NOTIFY should be sent
	// after inserting a new outbox record. This can be disabled if
	// triggers or other mechanisms already handle notifications.
	// Default: false
	SendNotify bool

	// IgnoreNoTx controls behavior when attempting to insert an outbox
	// event outside of an active transaction. If set to true, the insert
	// will be skipped instead of returning an error. Useful in cases
	// where events are only conditionally created.
	// Default: false
	IgnoreNoTx bool
}

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

type Event

type Event interface {
	Name() string
	Payload() json.RawMessage
}

type Outbox

type Outbox struct {
	// contains filtered or unexported fields
}

func NewOutbox

func NewOutbox(dbpool *pgxpool.Pool, cfg Config) *Outbox

func (*Outbox) NewEventMessage

func (o *Outbox) NewEventMessage(e Event) *eventMessage

func (*Outbox) NotifyNewEventMessage

func (o *Outbox) NotifyNewEventMessage(ctx context.Context, eventID uuid.UUID, tx postgresdb.Tx) error

func (*Outbox) Persist

func (o *Outbox) Persist(ctx context.Context, msg *event.Message, tx postgresdb.Tx) error

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(log *zerolog.Logger, dbpool *pgxpool.Pool, cfg Config) *Reader

func (*Reader) GetEventByID

func (r *Reader) GetEventByID(
	ctx context.Context,
	eventID string,
	processingTimeout time.Duration,
) (*event.Message, error)

func (*Reader) GetEvents

func (r *Reader) GetEvents(
	ctx context.Context,
	maxTries int,
	limit int,
	processingTimeout time.Duration,
) ([]event.Message, error)

func (*Reader) UpdateEventMessage

func (r *Reader) UpdateEventMessage(ctx context.Context, event event.Message) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL