inbox

package
v0.40.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package inbox provides durable consumer-side idempotency: a ledger that records processed event ids so redeliveries are skipped. It is the consumer-side complement to the transactional outbox.

Consumers extract the event id from the delivery (e.g. via outbox.EventIDFromHeaders) and wrap their handler in deps.Inbox.ProcessOnce, which records the id and runs the handler atomically, exactly once per id.

Index

Constants

View Source
const DefaultRetentionPeriod = 7 * 24 * time.Hour

DefaultRetentionPeriod is the default processed-event retention (7 days). It must exceed the broker's maximum redelivery window. Written as a duration (168h) because Go's time.ParseDuration does not accept a "7d" unit.

View Source
const DefaultTableName = "gobricks_inbox"

DefaultTableName is the default inbox ledger table name.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cleanup

type Cleanup struct {
	// contains filtered or unexported fields
}

Cleanup is a scheduler.Executor that removes processed-event records older than the configured retention period. Runs daily at 04:00 by default.

func (*Cleanup) Execute

func (c *Cleanup) Execute(ctx scheduler.JobContext) error

type Inbox

type Inbox struct {
	// contains filtered or unexported fields
}

Inbox implements app.InboxProcessor, backed by the module's lazily-initialized vendor store.

func (*Inbox) ProcessOnce

func (i *Inbox) ProcessOnce(ctx context.Context, eventID string, fn func(ctx context.Context, tx dbtypes.Tx) error) error

ProcessOnce records eventID in the ledger and runs fn exactly once per id, atomically within a single transaction. A redelivery of an already-processed id short-circuits (fn is not run) and returns nil. The tenant is resolved from ctx; in single-tenant mode the tenant id is empty.

type Module

type Module struct {
	// contains filtered or unexported fields
}

Module implements the GoBricks Module interface for the consumer-side inbox. It provides durable, exactly-once event processing via deps.Inbox.ProcessOnce and a daily cleanup job that prunes old processed-event records.

Register it like any other module (the scheduler is optional but required for the retention cleanup job):

fw.RegisterModules(
    scheduler.NewModule(), // optional: enables inbox-cleanup
    inbox.NewModule(),
    &myapp.ConsumerModule{},
)

func NewModule

func NewModule() *Module

NewModule creates a new inbox Module instance.

func (*Module) InboxProcessor

func (m *Module) InboxProcessor() app.InboxProcessor

InboxProcessor implements app.InboxProvider — returns the processor for ModuleDeps wiring. Returns nil when the inbox is disabled.

func (*Module) Init

func (m *Module) Init(deps *app.ModuleDeps) error

Init implements app.Module.

func (*Module) Name

func (m *Module) Name() string

Name implements app.Module.

func (*Module) RegisterJobs

func (m *Module) RegisterJobs(registrar app.JobRegistrar) error

RegisterJobs implements app.JobProvider. The inbox has no relay; it registers only the retention cleanup job, and only when retention is positive.

func (*Module) Shutdown

func (m *Module) Shutdown() error

Shutdown implements app.Module.

type Record

type Record struct {
	TenantID    string
	EventID     string
	ProcessedAt time.Time
}

Record is a single row in the inbox ledger: a processed event id scoped to a tenant, with the time it was processed.

type Store

type Store interface {
	// MarkProcessed records (tenant_id, event_id) within the given transaction.
	// It returns inserted=true the first time an id is seen and inserted=false on
	// a duplicate (the id was already processed).
	MarkProcessed(ctx context.Context, tx dbtypes.Tx, rec Record) (inserted bool, err error)

	// DeleteProcessed removes ledger rows processed before the given time.
	// Returns the number of rows deleted.
	DeleteProcessed(ctx context.Context, db dbtypes.Interface, before time.Time) (int64, error)

	// CreateTable creates the inbox table and its index if they do not exist.
	// Used for auto-migration when inbox.auto_create_table is true.
	CreateTable(ctx context.Context, db dbtypes.Interface) error
}

Store abstracts inbox ledger operations for vendor-agnostic SQL. Implementations exist for PostgreSQL and Oracle with vendor-specific placeholder styles, DDL, and duplicate-detection (PostgreSQL ON CONFLICT vs Oracle unique-violation catch).

func NewOracleStore

func NewOracleStore(tableName string) (Store, error)

NewOracleStore creates a new Oracle inbox store. Returns an error if the table name is not a safe, unqualified identifier.

func NewPostgresStore

func NewPostgresStore(tableName string) (Store, error)

NewPostgresStore creates a new PostgreSQL inbox store. Returns an error if the table name is not a safe, unqualified identifier.

Directories

Path Synopsis
Package testing provides test utilities for the consumer-side inbox.
Package testing provides test utilities for the consumer-side inbox.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL