Documentation
¶
Index ¶
- Constants
- type AutoDisableWorker
- type Deliverer
- type Delivery
- type DeliveryOutcome
- type DeliveryStore
- func (s *DeliveryStore) CreateDelivery(ctx context.Context, messageID string, lastError string) (*Delivery, error)
- func (s *DeliveryStore) DeleteExpiredDeliveries(ctx context.Context) (int64, error)
- func (s *DeliveryStore) GetPendingDeliveries(ctx context.Context, limit int) ([]Delivery, error)
- func (s *DeliveryStore) MarkAttemptFailed(ctx context.Context, messageID, errMsg string, nextRetry time.Time) error
- func (s *DeliveryStore) MarkDelivered(ctx context.Context, messageID string) error
- func (s *DeliveryStore) MarkFailed(ctx context.Context, messageID, errMsg string) error
- type Payload
- type PersistentDeliverer
- type RetryWorker
- type SubscriberDeliverer
- type SubscriberDelivery
- type SubscriberRetryWorker
- type SubscriberStore
- func (s *SubscriberStore) BumpNextRetry(ctx context.Context, deliveryID string, after time.Duration) error
- func (s *SubscriberStore) DeleteExpiredSubscriberDeliveries(ctx context.Context) (int, error)
- func (s *SubscriberStore) GetPending(ctx context.Context, limit int) ([]SubscriberDelivery, error)
- func (s *SubscriberStore) InsertPendingForTest(ctx context.Context, webhookID, eventType string, envelope []byte) (string, error)
- func (s *SubscriberStore) ListDeliveriesByWebhook(ctx context.Context, webhookID, status string, limit int) ([]SubscriberDelivery, error)
- func (s *SubscriberStore) MarkDelivered(ctx context.Context, deliveryID string, statusCode int) error
- func (s *SubscriberStore) RecordAttemptFailure(ctx context.Context, deliveryID, errMsg string, statusCode int) error
Constants ¶
const DeliveryTTL = 48 * time.Hour
const LeaseDuration = 5 * time.Minute
LeaseDuration is how long a leased delivery is hidden from other workers. On a clean delivery the row's next_retry_at is overwritten by the success path (MarkDelivered) or a real backoff (RecordFailure). The lease only matters as a recovery mechanism when a worker dies mid-delivery — after LeaseDuration the row becomes eligible again and another worker picks it up. Long enough that a legitimate slow webhook won't be double-fired, short enough that a crashed worker doesn't strand its rows for hours.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AutoDisableWorker ¶ added in v0.3.0
type AutoDisableWorker struct {
// contains filtered or unexported fields
}
AutoDisableWorker scans for chronically-failing webhooks and disables them, and clears expired signing_secret_prev rows past their 24h grace window. Decision #12 in the design.
The two passes share a worker because they're both cheap, idempotent, and run on the same low cadence.
func NewAutoDisableWorker ¶ added in v0.3.0
func NewAutoDisableWorker(store *identity.Store) *AutoDisableWorker
NewAutoDisableWorker constructs the worker with the design's default 5-min cadence. Tests can use Tick directly.
func (*AutoDisableWorker) Start ¶ added in v0.3.0
func (w *AutoDisableWorker) Start(ctx context.Context)
Start blocks on ctx — call in its own goroutine.
func (*AutoDisableWorker) Tick ¶ added in v0.3.0
func (w *AutoDisableWorker) Tick(ctx context.Context)
Tick runs both maintenance passes once. Exposed so tests can drive the worker synchronously without waiting on the ticker.
type Deliverer ¶
type Deliverer struct {
// contains filtered or unexported fields
}
func NewDeliverer ¶
func (*Deliverer) DeliverHTTP ¶
func (d *Deliverer) DeliverHTTP(ctx context.Context, agent *identity.AgentIdentity, p Payload) error
DeliverHTTP performs the actual HTTP POST to the agent's webhook URL.
type Delivery ¶
type Delivery struct {
AgentID string `json:"agent_id"`
MessageID string `json:"message_id"`
Status string `json:"status"`
Attempts int `json:"attempts"`
MaxAttempts int `json:"max_attempts"`
LastError string `json:"last_error"`
LastAttemptAt *time.Time `json:"last_attempt_at,omitempty"`
NextRetryAt time.Time `json:"next_retry_at"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
}
type DeliveryOutcome ¶ added in v0.3.0
DeliveryOutcome is what the deliverer returns to the caller for status accounting. statusCode is 0 when there was no HTTP response (connection error, timeout, DNS failure).
type DeliveryStore ¶
type DeliveryStore struct {
// contains filtered or unexported fields
}
func NewDeliveryStore ¶
func NewDeliveryStore(pool *pgxpool.Pool) *DeliveryStore
func (*DeliveryStore) CreateDelivery ¶
func (*DeliveryStore) DeleteExpiredDeliveries ¶
func (s *DeliveryStore) DeleteExpiredDeliveries(ctx context.Context) (int64, error)
func (*DeliveryStore) GetPendingDeliveries ¶
GetPendingDeliveries atomically claims up to `limit` due deliveries. Each returned row's next_retry_at is pushed by LeaseDuration so other workers (in this process or a different replica) won't grab the same row. The standard `WHERE status='pending' AND next_retry_at <= now()` filter then naturally excludes leased rows.
This must run inside a transaction: `FOR UPDATE SKIP LOCKED` only holds the row lock for the lifetime of the surrounding transaction. pool.Query (autocommit) would release the lock as soon as the SELECT completed, leaving a window where two callers could each return the same row.
func (*DeliveryStore) MarkAttemptFailed ¶
func (*DeliveryStore) MarkDelivered ¶
func (s *DeliveryStore) MarkDelivered(ctx context.Context, messageID string) error
func (*DeliveryStore) MarkFailed ¶
func (s *DeliveryStore) MarkFailed(ctx context.Context, messageID, errMsg string) error
type Payload ¶
type Payload struct {
MessageID string `json:"message_id,omitempty"`
ConversationID string `json:"conversation_id,omitempty"`
From string `json:"from"`
// To is the parsed To: header from the inbound message — every fan-out
// delivery for one inbound message carries the same list. Recipient is
// this delivery's per-agent target (always one of the addressed agents,
// not necessarily in To: when the agent was Bcc'd).
To []string `json:"to"`
CC []string `json:"cc,omitempty"`
// ReplyTo is the parsed Reply-To: header (RFC 5322 § 3.6.2 — list, single
// value is typical but multi is legal). Empty list when the header is
// absent; the relay never silently falls back to From: so consumers can
// distinguish "sender didn't request a different reply mailbox" from
// "sender explicitly named these mailboxes".
ReplyTo []string `json:"reply_to,omitempty"`
Recipient string `json:"recipient"`
RawMessage []byte `json:"raw_message"`
AuthHeaders map[string]string `json:"auth_headers"`
ReceivedAt time.Time `json:"received_at"`
}
type PersistentDeliverer ¶
type PersistentDeliverer struct {
// contains filtered or unexported fields
}
PersistentDeliverer wraps Deliverer with DB persistence for retry support.
func NewPersistentDeliverer ¶
func NewPersistentDeliverer(deliverer *Deliverer, store *DeliveryStore) *PersistentDeliverer
func (*PersistentDeliverer) Deliver ¶
func (pd *PersistentDeliverer) Deliver(ctx context.Context, agent *identity.AgentIdentity, p Payload) error
Deliver attempts delivery and persists the result. On failure, the delivery is queued for retry. Always returns nil — the message is safely persisted. Use this for SMTP inbound where the message is already accepted.
func (*PersistentDeliverer) DeliverSync ¶
func (pd *PersistentDeliverer) DeliverSync(ctx context.Context, agent *identity.AgentIdentity, p Payload) error
DeliverSync attempts delivery, persists the result, and returns the error. Use this for API-initiated sends where the caller needs immediate feedback.
type RetryWorker ¶
type RetryWorker struct {
// contains filtered or unexported fields
}
func NewRetryWorker ¶
func NewRetryWorker(deliveryStore *DeliveryStore, deliverer *Deliverer, identityStore *identity.Store) *RetryWorker
func (*RetryWorker) Start ¶
func (w *RetryWorker) Start(ctx context.Context)
type SubscriberDeliverer ¶ added in v0.3.0
type SubscriberDeliverer struct {
// contains filtered or unexported fields
}
SubscriberDeliverer performs the HTTP POST for a webhook_subscriber_deliveries row, signs the request with the per-webhook HMAC secret, and reports success / failure to the caller. Distinct from the legacy Deliverer (which signs nothing at the request level — it forwards X-E2A-Auth-* headers from the payload instead).
Slice 1 carries only the current secret. Slice 4 will extend this to dual-sign during the 24h rotation grace window.
func NewSubscriberDeliverer ¶ added in v0.3.0
func NewSubscriberDeliverer(requireHTTPS bool) *SubscriberDeliverer
NewSubscriberDeliverer constructs the deliverer with the 15s per-attempt timeout chosen in design decision #6.
requireHTTPS gates against plaintext URLs in production. The existing ValidateWebhookURL helper at registration time also enforces this; the deliverer's check is the second line of defense in case validation drifts or DNS resolves a registered hostname to an internal IP after the fact.
func (*SubscriberDeliverer) Deliver ¶ added in v0.3.0
func (d *SubscriberDeliverer) Deliver(ctx context.Context, url string, body []byte, secret, secretPrev string) DeliveryOutcome
Deliver performs one POST attempt. It signs the request body with the supplied HMAC secret in Stripe-style header format:
X-E2A-Signature: t=<unix>,v1=<hex(hmac-sha256(secret, "<t>.<body>"))>
secretPrev (if non-empty) adds a second v1=... signature for the receiver to verify against during the 24h rotation grace window. Slice 1 always passes secretPrev="" (no grace logic yet); slice 4 wires this up.
2xx responses are success. Anything else (including 3xx, since redirects are blocked) is a failure with the HTTP status code reported back. Connection errors return Success=false and StatusCode=0.
type SubscriberDelivery ¶ added in v0.3.0
type SubscriberDelivery struct {
ID string
WebhookID string
EventType string
EventPayload []byte // pre-marshalled envelope bytes; POSTed verbatim
MessageID *string
Status string // pending | delivered | failed
Attempts int
MaxAttempts int
LastError string
LastStatusCode *int
LastAttemptAt *time.Time
NextRetryAt time.Time
CreatedAt time.Time
ExpiresAt time.Time
}
SubscriberDelivery is one row in webhook_subscriber_deliveries. Distinct from the legacy Delivery struct (which is keyed by message_id and tracks legacy single-URL delivery state).
type SubscriberRetryWorker ¶ added in v0.3.0
type SubscriberRetryWorker struct {
// contains filtered or unexported fields
}
SubscriberRetryWorker drains webhook_subscriber_deliveries on a tick. Distinct from the legacy RetryWorker (which drains webhook_deliveries). The two workers can run side-by-side without stepping on each other because they read from disjoint tables.
Decisions reflected in the design:
- 8 global concurrent worker goroutines (design #6). One bad subscriber cannot pin all 8 slots because of the per-webhook inflight cap below.
- Per-webhook inflight cap of 1 (H2 from the design review). A sync.Map[webhookID]→sync.Mutex serializes attempts to the same subscriber; a slow customer's webhook backlogs onto its own queue without starving fast subscribers.
- 15s per-attempt HTTP timeout (carried by the SubscriberDeliverer).
Slice 1 doesn't add the auto-disable worker or the signing_secret_prev null-out pass; those land in slice 4.
func NewSubscriberRetryWorker ¶ added in v0.3.0
func NewSubscriberRetryWorker(store *SubscriberStore, deliverer *SubscriberDeliverer, identityStore *identity.Store) *SubscriberRetryWorker
func (*SubscriberRetryWorker) Start ¶ added in v0.3.0
func (w *SubscriberRetryWorker) Start(ctx context.Context)
Start blocks on ctx — call in its own goroutine. Same shape as the legacy RetryWorker.Start. Production wiring uses Start; tests call Tick directly so they don't have to wait on the 30s interval.
func (*SubscriberRetryWorker) Tick ¶ added in v0.3.0
func (w *SubscriberRetryWorker) Tick(ctx context.Context)
Tick processes one batch of pending deliveries. Exposed (not just the private processBatch) so tests can drive the worker synchronously without waiting on the ticker.
type SubscriberStore ¶ added in v0.3.0
type SubscriberStore struct {
// contains filtered or unexported fields
}
SubscriberStore manages webhook_subscriber_deliveries. Parallel to the legacy DeliveryStore (which manages webhook_deliveries).
func NewSubscriberStore ¶ added in v0.3.0
func NewSubscriberStore(pool *pgxpool.Pool) *SubscriberStore
func (*SubscriberStore) BumpNextRetry ¶ added in v0.3.0
func (s *SubscriberStore) BumpNextRetry(ctx context.Context, deliveryID string, after time.Duration) error
BumpNextRetry pushes the row's next_retry_at out by `after` so the row doesn't reappear in GetPending on every tick. Used by the worker when it skips a row without attempting delivery (e.g. webhook is disabled, waiting for re-enable). Status stays 'pending' so the row resumes processing once the deferred time arrives.
func (*SubscriberStore) DeleteExpiredSubscriberDeliveries ¶ added in v0.3.0
func (s *SubscriberStore) DeleteExpiredSubscriberDeliveries(ctx context.Context) (int, error)
DeleteExpiredSubscriberDeliveries removes rows whose expires_at has passed. Migration 025 sets a 30-day TTL on every row; without this janitor the table grows monotonically and query plans degrade. Mirrors DeliveryStore.DeleteExpiredDeliveries for the legacy table.
func (*SubscriberStore) GetPending ¶ added in v0.3.0
func (s *SubscriberStore) GetPending(ctx context.Context, limit int) ([]SubscriberDelivery, error)
GetPending leases up to `limit` rows whose next_retry_at is in the past. The lease is implemented as a single CTE that selects-for-update the candidate rows with SKIP LOCKED and pushes their next_retry_at forward by LeaseDuration, so concurrent workers (in-process OR across replicas) never return the same row. Mirrors the legacy DeliveryStore.GetPendingDeliveries pattern.
The lease window is what prevents double-POST in multi-replica deployments. If a worker crashes mid-attempt, the row reappears once the lease expires; the caller writing MarkDelivered / RecordAttemptFailure inside the lease window overwrites next_retry_at to the real schedule (or 'failed' status) before the lease expires.
func (*SubscriberStore) InsertPendingForTest ¶ added in v0.3.0
func (s *SubscriberStore) InsertPendingForTest(ctx context.Context, webhookID, eventType string, envelope []byte) (string, error)
InsertPendingForTest creates a single delivery row tied to the given webhook + event type with the supplied envelope bytes. The retry worker picks it up on the next tick. Used by the POST /api/v1/webhooks/{id}/test endpoint to schedule a one-off delivery without going through the publisher's filter-matching path.
func (*SubscriberStore) ListDeliveriesByWebhook ¶ added in v0.3.0
func (s *SubscriberStore) ListDeliveriesByWebhook(ctx context.Context, webhookID, status string, limit int) ([]SubscriberDelivery, error)
ListDeliveriesByWebhook returns up to `limit` delivery rows for the webhook, most-recent first. When status is non-empty, restricts to that status (pending|delivered|failed). Limit is bounded by the caller; this method does not enforce a cap.
func (*SubscriberStore) MarkDelivered ¶ added in v0.3.0
func (s *SubscriberStore) MarkDelivered(ctx context.Context, deliveryID string, statusCode int) error
MarkDelivered transitions a row to status='delivered' and stamps last_attempt_at + last_status_code. Also bumps webhooks.last_delivered_at in the same transaction so list views show the freshest activity.
func (*SubscriberStore) RecordAttemptFailure ¶ added in v0.3.0
func (s *SubscriberStore) RecordAttemptFailure(ctx context.Context, deliveryID, errMsg string, statusCode int) error
RecordAttemptFailure records a failed attempt, sets the next retry time, and decides whether to keep the row pending (more attempts remain) or transition to 'failed' (exhausted).
statusCode is 0 when the failure was a connection error / timeout (no HTTP response received).
SELECT and UPDATE run inside a single transaction with FOR UPDATE so two workers can't race on the same row (which could happen if the GetPending lease expires while an attempt is in flight). Without the row lock, the read-then-write pattern under-counts attempts on concurrent failure recording.