Documentation
¶
Overview ¶
Package inbox provides durable consumer-side idempotency: a ledger that records processed event ids so redeliveries are skipped. It is the consumer-side complement to the transactional outbox.
Consumers extract the event id from the delivery (e.g. via outbox.EventIDFromHeaders) and wrap their handler in deps.Inbox.ProcessOnce, which records the id and runs the handler atomically, exactly once per id.
Index ¶
Constants ¶
const DefaultRetentionPeriod = 7 * 24 * time.Hour
DefaultRetentionPeriod is the default processed-event retention (7 days). It must exceed the broker's maximum redelivery window. Written as a duration (168h) because Go's time.ParseDuration does not accept a "7d" unit.
const DefaultTableName = "gobricks_inbox"
DefaultTableName is the default inbox ledger table name.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Cleanup ¶
type Cleanup struct {
// contains filtered or unexported fields
}
Cleanup is a scheduler.Executor that removes processed-event records older than the configured retention period. Runs daily at 04:00 by default.
type Inbox ¶
type Inbox struct {
// contains filtered or unexported fields
}
Inbox implements app.InboxProcessor, backed by the module's lazily-initialized vendor store.
func (*Inbox) ProcessOnce ¶
func (i *Inbox) ProcessOnce(ctx context.Context, eventID string, fn func(ctx context.Context, tx dbtypes.Tx) error) error
ProcessOnce records eventID in the ledger and runs fn exactly once per id, atomically within a single transaction. A redelivery of an already-processed id short-circuits (fn is not run) and returns nil. The tenant is resolved from ctx; in single-tenant mode the tenant id is empty.
type Module ¶
type Module struct {
// contains filtered or unexported fields
}
Module implements the GoBricks Module interface for the consumer-side inbox. It provides durable, exactly-once event processing via deps.Inbox.ProcessOnce and a daily cleanup job that prunes old processed-event records.
Register it like any other module (the scheduler is optional but required for the retention cleanup job):
fw.RegisterModules(
scheduler.NewModule(), // optional: enables inbox-cleanup
inbox.NewModule(),
&myapp.ConsumerModule{},
)
func (*Module) InboxProcessor ¶
func (m *Module) InboxProcessor() app.InboxProcessor
InboxProcessor implements app.InboxProvider — returns the processor for ModuleDeps wiring. Returns nil when the inbox is disabled.
func (*Module) RegisterJobs ¶
func (m *Module) RegisterJobs(registrar app.JobRegistrar) error
RegisterJobs implements app.JobProvider. The inbox has no relay; it registers only the retention cleanup job, and only when retention is positive.
type Record ¶
Record is a single row in the inbox ledger: a processed event id scoped to a tenant, with the time it was processed.
type Store ¶
type Store interface {
// MarkProcessed records (tenant_id, event_id) within the given transaction.
// It returns inserted=true the first time an id is seen and inserted=false on
// a duplicate (the id was already processed).
MarkProcessed(ctx context.Context, tx dbtypes.Tx, rec Record) (inserted bool, err error)
// DeleteProcessed removes ledger rows processed before the given time.
// Returns the number of rows deleted.
DeleteProcessed(ctx context.Context, db dbtypes.Interface, before time.Time) (int64, error)
// CreateTable creates the inbox table and its index if they do not exist.
// Used for auto-migration when inbox.auto_create_table is true.
CreateTable(ctx context.Context, db dbtypes.Interface) error
}
Store abstracts inbox ledger operations for vendor-agnostic SQL. Implementations exist for PostgreSQL and Oracle with vendor-specific placeholder styles, DDL, and duplicate-detection (PostgreSQL ON CONFLICT vs Oracle unique-violation catch).
func NewOracleStore ¶
NewOracleStore creates a new Oracle inbox store. Returns an error if the table name is not a safe, unqualified identifier.
func NewPostgresStore ¶
NewPostgresStore creates a new PostgreSQL inbox store. Returns an error if the table name is not a safe, unqualified identifier.