txoutbox

package
v0.25.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package txoutbox provides reusable transaction hook and outbox dispatch helpers.

Index

Constants

View Source
const (
	OutboxStatusPending    = "pending"
	OutboxStatusProcessing = "processing"
	OutboxStatusRetrying   = "retrying"
	OutboxStatusSucceeded  = "succeeded"
	OutboxStatusFailed     = "failed"
)

Variables

View Source
var (
	ErrStoreNotConfigured     = errors.New("txoutbox: store not configured")
	ErrPublisherNotConfigured = errors.New("txoutbox: publisher not configured")
)

Functions

func WithTxHooks

func WithTxHooks[Tx any](ctx context.Context, txManager TxManager[Tx], fn func(tx Tx, hooks *Hooks) error) error

WithTxHooks executes fn in a write transaction and runs queued post-commit hooks only when commit succeeds.

Types

type ClaimInput

type ClaimInput struct {
	Consumer  string
	Now       time.Time
	Limit     int
	Topic     string
	LockUntil *time.Time
}

ClaimInput controls batched outbox claiming.

type DispatchInput

type DispatchInput struct {
	Consumer    string
	Topic       string
	Limit       int
	Now         time.Time
	RetryDelay  time.Duration
	ClaimLockTo *time.Time
}

DispatchInput controls outbox dispatch behavior.

type DispatchResult

type DispatchResult struct {
	Claimed   int
	Published int
	Retrying  int
	Failed    int
}

DispatchResult captures batch dispatch outcomes.

func DispatchBatch

func DispatchBatch[Scope any](ctx context.Context, store Store[Scope], scope Scope, publisher Publisher, input DispatchInput) (DispatchResult, error)

DispatchBatch claims pending outbox messages and publishes them.

type Hooks

type Hooks struct {
	// contains filtered or unexported fields
}

Hooks tracks post-commit hooks collected during a transaction callback.

func (*Hooks) AfterCommit

func (h *Hooks) AfterCommit(hook PostCommitHook)

AfterCommit registers a hook to run after transaction commit.

type Message

type Message struct {
	ID            string
	TenantID      string
	OrgID         string
	Topic         string
	MessageKey    string
	PayloadJSON   string
	HeadersJSON   string
	CorrelationID string
	Status        string
	AttemptCount  int
	MaxAttempts   int
	LastError     string
	AvailableAt   time.Time
	LockedAt      *time.Time
	LockedBy      string
	PublishedAt   *time.Time
	CreatedAt     time.Time
	UpdatedAt     time.Time
}

Message stores durable side-effect events for post-commit dispatch.

type PostCommitHook

type PostCommitHook func() error

PostCommitHook defines work executed only after a successful transaction commit.

type Publisher

type Publisher interface {
	PublishOutboxMessage(ctx context.Context, message Message) error
}

Publisher publishes outbox messages to external systems.

type Query

type Query struct {
	Topic    string
	Status   string
	Limit    int
	Offset   int
	SortDesc bool
}

Query controls outbox listing filters.

type Store

type Store[Scope any] interface {
	EnqueueOutboxMessage(ctx context.Context, scope Scope, record Message) (Message, error)
	ClaimOutboxMessages(ctx context.Context, scope Scope, input ClaimInput) ([]Message, error)
	MarkOutboxMessageSucceeded(ctx context.Context, scope Scope, id string, publishedAt time.Time) (Message, error)
	MarkOutboxMessageFailed(ctx context.Context, scope Scope, id, failureReason string, nextAttemptAt *time.Time, failedAt time.Time) (Message, error)
	ListOutboxMessages(ctx context.Context, scope Scope, query Query) ([]Message, error)
}

Store defines durable post-commit side-effect message persistence.

type TxManager

type TxManager[Tx any] interface {
	WithTx(ctx context.Context, fn func(tx Tx) error) error
}

TxManager provides transaction lifecycle coordination for a typed tx surface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL