Documentation
¶
Overview ¶
Package inbox provides consumer-side event deduplication for paper-board async event consumers. Consumers track processed event_ids in a processed_events table to avoid re-applying the same envelope (D9.2).
Usage ¶
pool, _ := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
// Run migration once at startup:
helper := inbox.NewMigrationHelper("onboarding")
if err := helper(ctx, pool); err != nil { ... }
ib := inbox.New(pool, "onboarding")
// In the event handler — claim and apply atomically:
tx, _ := pool.Begin(ctx)
claimed, err := ib.TryMark(ctx, tx, event.EventID, event.EventType, event.OrgID)
if err != nil { tx.Rollback(ctx); return err }
if !claimed { tx.Rollback(ctx); return nil } // duplicate — skip
// ... apply business state inside the same tx ...
tx.Commit(ctx)
Retention ¶
processed_events rows are retained for 180 days (D9.2). Cleanup is handled by a per-service k8s CronJob declared in each consumer's Helm chart; the sdk does not manage cleanup.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Inbox ¶
type Inbox interface {
// TryMark atomically records that eventID was processed inside the caller's
// transaction. Returns claimed=true when this call inserted the row (i.e. the
// event had not been processed before). Returns claimed=false when another
// handler already processed the same event (ON CONFLICT DO NOTHING path).
//
// Callers MUST skip business-state mutations when claimed=false to prevent
// duplicate side effects under concurrent delivery.
TryMark(ctx context.Context, tx pgx.Tx, eventID uuid.UUID, eventType, subject string) (claimed bool, err error)
// Exists returns true if eventID has already been processed.
// Non-transactional read for observability/monitoring; use TryMark for
// deduplication inside a handler transaction.
Exists(ctx context.Context, eventID uuid.UUID) (bool, error)
}
Inbox tracks processed event IDs to prevent duplicate processing.
Click to show internal directories.
Click to hide internal directories.