webhook

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const DeliveryTTL = 48 * time.Hour
View Source
const LeaseDuration = 5 * time.Minute

LeaseDuration is how long a leased delivery is hidden from other workers. On a clean delivery the row's next_retry_at is overwritten by the success path (MarkDelivered) or a real backoff (RecordFailure). The lease only matters as a recovery mechanism when a worker dies mid-delivery — after LeaseDuration the row becomes eligible again and another worker picks it up. Long enough that a legitimate slow webhook won't be double-fired, short enough that a crashed worker doesn't strand its rows for hours.

Variables

This section is empty.

Functions

This section is empty.

Types

type AutoDisableWorker added in v0.3.0

type AutoDisableWorker struct {
	// contains filtered or unexported fields
}

AutoDisableWorker scans for chronically-failing webhooks and disables them, and clears expired signing_secret_prev rows past their 24h grace window. Decision #12 in the design.

The two passes share a worker because they're both cheap, idempotent, and run on the same low cadence.

func NewAutoDisableWorker added in v0.3.0

func NewAutoDisableWorker(store *identity.Store) *AutoDisableWorker

NewAutoDisableWorker constructs the worker with the design's default 5-min cadence. Tests can use Tick directly.

func (*AutoDisableWorker) Start added in v0.3.0

func (w *AutoDisableWorker) Start(ctx context.Context)

Start blocks on ctx — call in its own goroutine.

func (*AutoDisableWorker) Tick added in v0.3.0

func (w *AutoDisableWorker) Tick(ctx context.Context)

Tick runs both maintenance passes once. Exposed so tests can drive the worker synchronously without waiting on the ticker.

type Deliverer

type Deliverer struct {
	// contains filtered or unexported fields
}

func NewDeliverer

func NewDeliverer(requireHTTPS bool) *Deliverer

func (*Deliverer) Deliver

func (d *Deliverer) Deliver(ctx context.Context, agent *identity.AgentIdentity, p Payload) error

Deliver is a convenience alias for DeliverHTTP.

func (*Deliverer) DeliverHTTP

func (d *Deliverer) DeliverHTTP(ctx context.Context, agent *identity.AgentIdentity, p Payload) error

DeliverHTTP performs the actual HTTP POST to the agent's webhook URL.

type Delivery

type Delivery struct {
	AgentID       string     `json:"agent_id"`
	MessageID     string     `json:"message_id"`
	Status        string     `json:"status"`
	Attempts      int        `json:"attempts"`
	MaxAttempts   int        `json:"max_attempts"`
	LastError     string     `json:"last_error"`
	LastAttemptAt *time.Time `json:"last_attempt_at,omitempty"`
	NextRetryAt   time.Time  `json:"next_retry_at"`
	CreatedAt     time.Time  `json:"created_at"`
	ExpiresAt     time.Time  `json:"expires_at"`
}

type DeliveryOutcome added in v0.3.0

type DeliveryOutcome struct {
	Success    bool
	StatusCode int
	Error      string
}

DeliveryOutcome is what the deliverer returns to the caller for status accounting. statusCode is 0 when there was no HTTP response (connection error, timeout, DNS failure).

type DeliveryStore

type DeliveryStore struct {
	// contains filtered or unexported fields
}

func NewDeliveryStore

func NewDeliveryStore(pool *pgxpool.Pool) *DeliveryStore

func (*DeliveryStore) CreateDelivery

func (s *DeliveryStore) CreateDelivery(ctx context.Context, messageID string, lastError string) (*Delivery, error)

func (*DeliveryStore) DeleteExpiredDeliveries

func (s *DeliveryStore) DeleteExpiredDeliveries(ctx context.Context) (int64, error)

func (*DeliveryStore) GetPendingDeliveries

func (s *DeliveryStore) GetPendingDeliveries(ctx context.Context, limit int) ([]Delivery, error)

GetPendingDeliveries atomically claims up to `limit` due deliveries. Each returned row's next_retry_at is pushed by LeaseDuration so other workers (in this process or a different replica) won't grab the same row. The standard `WHERE status='pending' AND next_retry_at <= now()` filter then naturally excludes leased rows.

This must run inside a transaction: `FOR UPDATE SKIP LOCKED` only holds the row lock for the lifetime of the surrounding transaction. pool.Query (autocommit) would release the lock as soon as the SELECT completed, leaving a window where two callers could each return the same row.

func (*DeliveryStore) MarkAttemptFailed

func (s *DeliveryStore) MarkAttemptFailed(ctx context.Context, messageID, errMsg string, nextRetry time.Time) error

func (*DeliveryStore) MarkDelivered

func (s *DeliveryStore) MarkDelivered(ctx context.Context, messageID string) error

func (*DeliveryStore) MarkFailed

func (s *DeliveryStore) MarkFailed(ctx context.Context, messageID, errMsg string) error

type Payload

type Payload struct {
	MessageID      string `json:"message_id,omitempty"`
	ConversationID string `json:"conversation_id,omitempty"`
	From           string `json:"from"`
	// To is the parsed To: header from the inbound message — every fan-out
	// delivery for one inbound message carries the same list. Recipient is
	// this delivery's per-agent target (always one of the addressed agents,
	// not necessarily in To: when the agent was Bcc'd).
	To []string `json:"to"`
	CC []string `json:"cc,omitempty"`
	// ReplyTo is the parsed Reply-To: header (RFC 5322 § 3.6.2 — list, single
	// value is typical but multi is legal). Empty list when the header is
	// absent; the relay never silently falls back to From: so consumers can
	// distinguish "sender didn't request a different reply mailbox" from
	// "sender explicitly named these mailboxes".
	ReplyTo     []string          `json:"reply_to,omitempty"`
	Recipient   string            `json:"recipient"`
	RawMessage  []byte            `json:"raw_message"`
	AuthHeaders map[string]string `json:"auth_headers"`
	ReceivedAt  time.Time         `json:"received_at"`
}

type PersistentDeliverer

type PersistentDeliverer struct {
	// contains filtered or unexported fields
}

PersistentDeliverer wraps Deliverer with DB persistence for retry support.

func NewPersistentDeliverer

func NewPersistentDeliverer(deliverer *Deliverer, store *DeliveryStore) *PersistentDeliverer

func (*PersistentDeliverer) Deliver

Deliver attempts delivery and persists the result. On failure, the delivery is queued for retry. Always returns nil — the message is safely persisted. Use this for SMTP inbound where the message is already accepted.

func (*PersistentDeliverer) DeliverSync

func (pd *PersistentDeliverer) DeliverSync(ctx context.Context, agent *identity.AgentIdentity, p Payload) error

DeliverSync attempts delivery, persists the result, and returns the error. Use this for API-initiated sends where the caller needs immediate feedback.

type RetryWorker

type RetryWorker struct {
	// contains filtered or unexported fields
}

func NewRetryWorker

func NewRetryWorker(deliveryStore *DeliveryStore, deliverer *Deliverer, identityStore *identity.Store) *RetryWorker

func (*RetryWorker) Start

func (w *RetryWorker) Start(ctx context.Context)

type SubscriberDeliverer added in v0.3.0

type SubscriberDeliverer struct {
	// contains filtered or unexported fields
}

SubscriberDeliverer performs the HTTP POST for a webhook_subscriber_deliveries row, signs the request with the per-webhook HMAC secret, and reports success / failure to the caller. Distinct from the legacy Deliverer (which signs nothing at the request level — it forwards X-E2A-Auth-* headers from the payload instead).

Slice 1 carries only the current secret. Slice 4 will extend this to dual-sign during the 24h rotation grace window.

func NewSubscriberDeliverer added in v0.3.0

func NewSubscriberDeliverer(requireHTTPS bool) *SubscriberDeliverer

NewSubscriberDeliverer constructs the deliverer with the 15s per-attempt timeout chosen in design decision #6.

requireHTTPS gates against plaintext URLs in production. The existing ValidateWebhookURL helper at registration time also enforces this; the deliverer's check is the second line of defense in case validation drifts or DNS resolves a registered hostname to an internal IP after the fact.

func (*SubscriberDeliverer) Deliver added in v0.3.0

func (d *SubscriberDeliverer) Deliver(ctx context.Context, url string, body []byte, secret, secretPrev string) DeliveryOutcome

Deliver performs one POST attempt. It signs the request body with the supplied HMAC secret in Stripe-style header format:

X-E2A-Signature: t=<unix>,v1=<hex(hmac-sha256(secret, "<t>.<body>"))>

secretPrev (if non-empty) adds a second v1=... signature for the receiver to verify against during the 24h rotation grace window. Slice 1 always passes secretPrev="" (no grace logic yet); slice 4 wires this up.

2xx responses are success. Anything else (including 3xx, since redirects are blocked) is a failure with the HTTP status code reported back. Connection errors return Success=false and StatusCode=0.

type SubscriberDelivery added in v0.3.0

type SubscriberDelivery struct {
	ID             string
	WebhookID      string
	EventType      string
	EventPayload   []byte // pre-marshalled envelope bytes; POSTed verbatim
	MessageID      *string
	Status         string // pending | delivered | failed
	Attempts       int
	MaxAttempts    int
	LastError      string
	LastStatusCode *int
	LastAttemptAt  *time.Time
	NextRetryAt    time.Time
	CreatedAt      time.Time
	ExpiresAt      time.Time
}

SubscriberDelivery is one row in webhook_subscriber_deliveries. Distinct from the legacy Delivery struct (which is keyed by message_id and tracks legacy single-URL delivery state).

type SubscriberRetryWorker added in v0.3.0

type SubscriberRetryWorker struct {
	// contains filtered or unexported fields
}

SubscriberRetryWorker drains webhook_subscriber_deliveries on a tick. Distinct from the legacy RetryWorker (which drains webhook_deliveries). The two workers can run side-by-side without stepping on each other because they read from disjoint tables.

Decisions reflected in the design:

  • 8 global concurrent worker goroutines (design #6). One bad subscriber cannot pin all 8 slots because of the per-webhook inflight cap below.
  • Per-webhook inflight cap of 1 (H2 from the design review). A sync.Map[webhookID]→sync.Mutex serializes attempts to the same subscriber; a slow customer's webhook backlogs onto its own queue without starving fast subscribers.
  • 15s per-attempt HTTP timeout (carried by the SubscriberDeliverer).

Slice 1 doesn't add the auto-disable worker or the signing_secret_prev null-out pass; those land in slice 4.

func NewSubscriberRetryWorker added in v0.3.0

func NewSubscriberRetryWorker(store *SubscriberStore, deliverer *SubscriberDeliverer, identityStore *identity.Store) *SubscriberRetryWorker

func (*SubscriberRetryWorker) Start added in v0.3.0

func (w *SubscriberRetryWorker) Start(ctx context.Context)

Start blocks on ctx — call in its own goroutine. Same shape as the legacy RetryWorker.Start. Production wiring uses Start; tests call Tick directly so they don't have to wait on the 30s interval.

func (*SubscriberRetryWorker) Tick added in v0.3.0

func (w *SubscriberRetryWorker) Tick(ctx context.Context)

Tick processes one batch of pending deliveries. Exposed (not just the private processBatch) so tests can drive the worker synchronously without waiting on the ticker.

type SubscriberStore added in v0.3.0

type SubscriberStore struct {
	// contains filtered or unexported fields
}

SubscriberStore manages webhook_subscriber_deliveries. Parallel to the legacy DeliveryStore (which manages webhook_deliveries).

func NewSubscriberStore added in v0.3.0

func NewSubscriberStore(pool *pgxpool.Pool) *SubscriberStore

func (*SubscriberStore) BumpNextRetry added in v0.3.0

func (s *SubscriberStore) BumpNextRetry(ctx context.Context, deliveryID string, after time.Duration) error

BumpNextRetry pushes the row's next_retry_at out by `after` so the row doesn't reappear in GetPending on every tick. Used by the worker when it skips a row without attempting delivery (e.g. webhook is disabled, waiting for re-enable). Status stays 'pending' so the row resumes processing once the deferred time arrives.

func (*SubscriberStore) DeleteExpiredSubscriberDeliveries added in v0.3.0

func (s *SubscriberStore) DeleteExpiredSubscriberDeliveries(ctx context.Context) (int, error)

DeleteExpiredSubscriberDeliveries removes rows whose expires_at has passed. Migration 025 sets a 30-day TTL on every row; without this janitor the table grows monotonically and query plans degrade. Mirrors DeliveryStore.DeleteExpiredDeliveries for the legacy table.

func (*SubscriberStore) GetPending added in v0.3.0

func (s *SubscriberStore) GetPending(ctx context.Context, limit int) ([]SubscriberDelivery, error)

GetPending leases up to `limit` rows whose next_retry_at is in the past. The lease is implemented as a single CTE that selects-for-update the candidate rows with SKIP LOCKED and pushes their next_retry_at forward by LeaseDuration, so concurrent workers (in-process OR across replicas) never return the same row. Mirrors the legacy DeliveryStore.GetPendingDeliveries pattern.

The lease window is what prevents double-POST in multi-replica deployments. If a worker crashes mid-attempt, the row reappears once the lease expires; the caller writing MarkDelivered / RecordAttemptFailure inside the lease window overwrites next_retry_at to the real schedule (or 'failed' status) before the lease expires.

func (*SubscriberStore) InsertPendingForTest added in v0.3.0

func (s *SubscriberStore) InsertPendingForTest(ctx context.Context, webhookID, eventType string, envelope []byte) (string, error)

InsertPendingForTest creates a single delivery row tied to the given webhook + event type with the supplied envelope bytes. The retry worker picks it up on the next tick. Used by the POST /api/v1/webhooks/{id}/test endpoint to schedule a one-off delivery without going through the publisher's filter-matching path.

func (*SubscriberStore) ListDeliveriesByWebhook added in v0.3.0

func (s *SubscriberStore) ListDeliveriesByWebhook(ctx context.Context, webhookID, status string, limit int) ([]SubscriberDelivery, error)

ListDeliveriesByWebhook returns up to `limit` delivery rows for the webhook, most-recent first. When status is non-empty, restricts to that status (pending|delivered|failed). Limit is bounded by the caller; this method does not enforce a cap.

func (*SubscriberStore) MarkDelivered added in v0.3.0

func (s *SubscriberStore) MarkDelivered(ctx context.Context, deliveryID string, statusCode int) error

MarkDelivered transitions a row to status='delivered' and stamps last_attempt_at + last_status_code. Also bumps webhooks.last_delivered_at in the same transaction so list views show the freshest activity.

func (*SubscriberStore) RecordAttemptFailure added in v0.3.0

func (s *SubscriberStore) RecordAttemptFailure(ctx context.Context, deliveryID, errMsg string, statusCode int) error

RecordAttemptFailure records a failed attempt, sets the next retry time, and decides whether to keep the row pending (more attempts remain) or transition to 'failed' (exhausted).

statusCode is 0 when the failure was a connection error / timeout (no HTTP response received).

SELECT and UPDATE run inside a single transaction with FOR UPDATE so two workers can't race on the same row (which could happen if the GetPending lease expires while an attempt is in flight). Without the row lock, the read-then-write pattern under-counts attempts on concurrent failure recording.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL