webhookpub

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package webhookpub publishes events from the e2a core (relay, outbound sender, HITL flow) to subscribers registered via the new /api/v1/webhooks resource. It runs in-process and post-commit async: trigger code commits its primary DB write, then calls Publisher.Publish in a goroutine. The publisher matches the event against enabled subscribers (event type + filters), inserts one webhook_subscriber_deliveries row per match, and returns; actual HTTP delivery is the retry worker's job.

Slice 1 only fires email.received from the relay. Slice 3 extends to email.sent, email.pending_approval, email.approved, email.rejected.

The legacy agent_identities.webhook_url path is unaffected by this package — it continues to be served by internal/webhook's existing PersistentDeliverer + retry worker. Both pathways fire side-by-side for back-compat. See the final design at tmp/e2a_webhooks_design.md.

Index

Constants

View Source
const (
	EventEmailReceived        = "email.received"
	EventEmailSent            = "email.sent"
	EventEmailPendingApproval = "email.pending_approval"
	EventEmailApproved        = "email.approved"
	EventEmailRejected        = "email.rejected"
)

Event types. Keeping these as named constants (not arbitrary strings) means typos at trigger sites fail at compile time. The handler-layer allowlist of accepted event names mirrors this list.

Variables

AllEventTypes is the canonical allowlist of event names. Used by the slice-2 handler validation. Adding a new event type means adding a constant above AND extending this slice.

Functions

func DeterministicEventID

func DeterministicEventID(parts ...string) string

DeterministicEventID derives a stable event id from the trigger context. Per design §5.1, the input formula per event type is:

email.received: sha256(message_id || "|" || event_type)
email.sent:     sha256(message_id || "|" || event_type)
pending_approval/approved/rejected: sha256(pending_msg_id || "|" || event_type)
future bounced/complained/delivered: sha256(message_id || "|" || event_type || "|" || ses_event_id)

The "|" delimiter prevents accidental collisions where concatenated fields could be ambiguous (e.g. ("abc","def") vs ("abcdef","")).

Returns "evt_" + first 32 hex chars of the sha256 digest (128 bits of entropy). Birthday collision probability at 1M events/day × 30 days × 5 event types is ~3e-23 — negligible.

Determinism is what makes the outbox write idempotent across MTA SMTP retries: the retried trigger produces the same id, and the outbox INSERT no-ops via ON CONFLICT (id) DO NOTHING.

func IsValidEventType

func IsValidEventType(name string) bool

IsValidEventType reports whether name is one of the catalog event types. Convenience wrapper for the handler-layer validator.

func NewDBInserter

func NewDBInserter(pool *pgxpool.Pool) deliveryInserter

NewDBInserter wires a production-grade inserter that writes to webhook_subscriber_deliveries.

Types

type Envelope

type Envelope struct {
	Event     string    `json:"event"`
	ID        string    `json:"id"`
	CreatedAt time.Time `json:"created_at"`
	Data      any       `json:"data"`
}

Envelope is the wire shape sent in the HTTP body to webhook subscribers. Stable across retries — the event_payload JSON column stores the envelope verbatim and the delivery worker POSTs it unchanged.

type Event

type Event struct {
	// ID is a unique identifier for this event firing. Stable across
	// retries — receivers dedup on it.
	ID string

	// Type is one of the EventEmail* constants.
	Type string

	// CreatedAt is the time the event was published. Embedded in
	// the wire envelope so receivers can reason about staleness.
	CreatedAt time.Time

	// UserID is the owner — used to find matching webhooks. Routing
	// is strictly bounded to the owning user's subscribers; cross-
	// user delivery is impossible by construction.
	UserID string

	// AgentID, ConversationID, Labels are filter-matching keys. Each
	// is matched against the corresponding key in
	// WebhookFilters. Empty / nil here means "the event has no value
	// for this attribute" — see Publisher's null/empty semantics
	// (filters that REQUIRE a value while the event has none → skip).
	AgentID        string
	ConversationID string
	Labels         []string

	// MessageID is the originating message row, if any. May be empty
	// for events without a direct message backing (e.g.
	// email.pending_approval before the held message gets promoted).
	MessageID string

	// Data is the event-specific payload. Wrapped in the envelope
	// {event, id, created_at, data} and serialized into the delivery
	// row's event_payload column.
	Data any
}

Event is the input to Publisher.Publish. Carries the routing keys (UserID, AgentID, ConversationID, Labels) needed to apply filters plus the Data payload that's serialized into the delivery row's event_payload JSONB.

MessageID is optional — set when the event has an originating message row. Persisted on the delivery row with ON DELETE SET NULL so the messages janitor (10-day TTL) doesn't orphan the delivery.

func NewEvent

func NewEvent(eventType, userID string, data any) Event

NewEvent constructs an Event with a fresh ID and now() timestamp. Trigger sites use this rather than building struct literals so the ID format stays consistent (evt_<32-hex>).

func (Event) AsEnvelope

func (e Event) AsEnvelope() Envelope

AsEnvelope returns the wire shape for serialization.

type FeatureFlag

type FeatureFlag interface {
	Enabled() bool
}

FeatureFlag is the kill switch for the new path. When false, Publish becomes a no-op (no DB writes, no fan-out). The retry worker is unaffected — it keeps draining any pending rows that were inserted before the flag was flipped. See decision #3 in the design.

type Outbox

type Outbox interface {
	// PublishTx writes the event to webhook_events inside the
	// caller's transaction. Returns error so the caller can roll back
	// its business state if the outbox write fails.
	//
	// Used for PRE-side-effect triggers (email.received, future
	// email.bounced from SNS, email.pending_approval, email.rejected).
	// If the outbox write fails the caller's tx rolls back; on
	// retry, the deterministic event id makes the second outbox
	// INSERT a no-op via ON CONFLICT (id) DO NOTHING.
	PublishTx(ctx context.Context, tx pgx.Tx, e Event) error

	// PublishBestEffortTx attempts the outbox write inside the
	// caller's transaction but never returns an error. On failure,
	// logs to webhook_publish_failures (slice 4 will add the table)
	// and lets the caller's tx commit anyway.
	//
	// Used for POST-side-effect triggers (email.sent, email.approved)
	// where the irreversible action (SES.Send) has already happened
	// and rolling back the business state would orphan an SES
	// delivery. v1 panics: no caller in this slice. Slice 4 wires the
	// outbound + HITL-approve trigger sites to it.
	PublishBestEffortTx(ctx context.Context, tx pgx.Tx, e Event)

	// DeleteExpiredWebhookEvents drops rows past their 30-day retention.
	// Called from the hourly cleanup loop in cmd/e2a/main.go.
	DeleteExpiredWebhookEvents(ctx context.Context) (int, error)
}

Outbox is the Stripe-tier publisher entry point. Triggers write the webhook_events row inside the same transaction as their business state (e.g. the messages row for email.received). A separate publisher worker (slice 2) consumes pending rows and fans out into webhook_subscriber_deliveries.

Why this lives alongside Publisher: the legacy Publisher.Publish does in-process fan-out (it reads enabled webhooks and inserts delivery rows directly). The new Outbox.PublishTx writes ONE row to webhook_events and arranges NOTIFY; fan-out is deferred. During the rollout window (controlled by the WEBHOOKS_OUTBOX_ENABLED env var, plumbed into the trigger sites), each trigger picks one path. After slice 11 the legacy Publisher.Publish branch is deleted.

See docs/design/2026-06-01-stripe-tier-webhooks.md §4.2 and Appendix A.

func NewOutbox

func NewOutbox(pool *pgxpool.Pool, flag FeatureFlag) Outbox

NewOutbox constructs the Stripe-tier outbox writer. The FeatureFlag gates writes: when disabled, PublishTx is a no-op (returns nil with no DB write). Slice 4's trigger sites branch on the same flag so the legacy go publisher.Publish(...) path runs instead.

Pass StaticFlag(false) in v1 production until slice 11 flips it.

type OutboxWorker

type OutboxWorker struct {
	// contains filtered or unexported fields
}

OutboxWorker drains webhook_events. For each pending row it reads enabled webhooks for the user, applies filter matching in Go, and inserts one webhook_subscriber_deliveries row per match. The existing SubscriberRetryWorker then drains those rows and POSTs them to customer endpoints. See design §4.4 for the full architecture.

Wakeup paths:

  1. LISTEN webhook_events_new — dedicated connection, sub-50ms latency from trigger COMMIT to fan-out.
  2. 1s fallback poll — catches notifications missed during deploy or LISTEN reconnect.

Multi-replica safety:

  • FOR UPDATE SKIP LOCKED + next_poll_at bump in GetPending leases rows so concurrent replicas don't fan out the same event.
  • The per-row partial unique index on (event_id, webhook_id) in webhook_subscriber_deliveries is the backstop for lease-expiry races: a slow worker B can finish after a fast worker A's lease has expired and a new worker C has taken over; per-row ON CONFLICT DO NOTHING swallows the duplicates rather than aborting B's transaction.
  • The final UPDATE outbox row uses WHERE status='pending' so a stale-and-late worker can't overwrite a fast-finisher's matched_webhook_ids snapshot.

func NewOutboxWorker

func NewOutboxWorker(pool *pgxpool.Pool, store identityReader) *OutboxWorker

NewOutboxWorker constructs the slice-2 worker. Production wiring passes the real pool and identity.Store; tests can pass fakes.

func (*OutboxWorker) Start

func (w *OutboxWorker) Start(ctx context.Context)

Start blocks on ctx — call in its own goroutine. Spawns the LISTEN reader in a sibling goroutine and runs the tick loop on the calling goroutine. Both stop when ctx is cancelled.

func (*OutboxWorker) Tick

func (w *OutboxWorker) Tick(ctx context.Context)

Tick processes one batch of pending events. Exposed (not just the private processBatch) so integration tests can drive the worker synchronously instead of waiting on the timer.

func (*OutboxWorker) WithMetrics

func (w *OutboxWorker) WithMetrics(m telemetry.Metrics) *OutboxWorker

WithMetrics swaps in a real metrics backend. Default is NoOp so tests don't have to wire anything.

type Publisher

type Publisher interface {
	Publish(ctx context.Context, e Event)
}

Publisher fans an Event out to matching webhook subscribers. The in-process implementation:

  1. Looks up enabled webhooks for Event.UserID subscribed to Event.Type via identity.Store.ListEnabledWebhooksForRouting.
  2. Applies filter matching in Go (matches reports the rule).
  3. Inserts one webhook_subscriber_deliveries row per match (status=pending). Each row gets a unique whd_<random> id.
  4. Returns. The retry worker picks up the pending rows on its next tick and POSTs them.

On error during enumeration / insert, the publisher logs and silently drops — partial failure is preferable to losing the trigger's primary state change. Crash between commit and publish loses the event entirely; the deferred-items list documents this as the "post-commit async" trade-off.

func New

func New(s store, ins deliveryInserter, flag FeatureFlag) Publisher

New constructs a Publisher.

type StaticFlag

type StaticFlag bool

StaticFlag is a trivially-implemented FeatureFlag for tests and for production wiring that reads an env var at startup.

func (StaticFlag) Enabled

func (f StaticFlag) Enabled() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL