Documentation
¶
Index ¶
- Constants
- Variables
- func NewMessageID() string
- func NormalizeEmail(email string) string
- func RunMigrations(ctx context.Context, pool *pgxpool.Pool, migrationsFS fs.FS, ...) error
- func ValidateHITLConfig(ttlSeconds int, expirationAction string) error
- type APIKey
- type APIKeyExportEntry
- type AgentIdentity
- type ConversationDetail
- type ConversationListFilter
- type ConversationSummary
- type DashboardPendingStats
- type DashboardStats
- type DashboardTodayStats
- type DeleteUserDataResult
- type Domain
- type ExpirationCandidate
- type Message
- type MessageListFilter
- type MigrationMode
- type OAuthConnectionEntry
- type PendingApprovalEdit
- type SendAttemptOutcome
- type SendAttemptResult
- type SendResult
- type SigningSecret
- type SigningSecretWithValue
- type Store
- func (s *Store) ApproveAndSend(ctx context.Context, messageID, userID string, edits PendingApprovalEdit, ...) (*Message, error)
- func (s *Store) AutoDisableFailingWebhooks(ctx context.Context) (int, error)
- func (s *Store) BootstrapUser(ctx context.Context, email string) (*User, error)
- func (s *Store) ClaimOrCreateDomain(ctx context.Context, domain, userID string) (*Domain, error)
- func (s *Store) ClaimSendAttempt(ctx context.Context, messageID string) (SendAttemptResult, error)
- func (s *Store) ClearExpiredPrevSecrets(ctx context.Context) (int64, error)
- func (s *Store) CountWebhooksByUser(ctx context.Context, userID string) (int, error)
- func (s *Store) CreateAPIKey(ctx context.Context, userID, name string, expiresAt *time.Time) (*APIKey, error)
- func (s *Store) CreateAgent(ctx context.Context, ...) (*AgentIdentity, error)
- func (s *Store) CreateAgentTx(ctx context.Context, tx pgx.Tx, ...) (*AgentIdentity, error)
- func (s *Store) CreateInboundMessage(ctx context.Context, ...) (*Message, error)
- func (s *Store) CreateInboundMessageInTx(ctx context.Context, tx pgx.Tx, ...) (*Message, error)
- func (s *Store) CreateOrGetUser(ctx context.Context, email, name, googleSub string) (*User, error)
- func (s *Store) CreateOutboundMessage(ctx context.Context, agentID string, toRecipients []string, cc []string, ...) (*Message, error)
- func (s *Store) CreatePendingOutboundMessage(ctx context.Context, agentID string, toRecipients, cc, bcc []string, ...) (*Message, error)
- func (s *Store) CreateSigningSecret(ctx context.Context, userID, name string) (*SigningSecret, error)
- func (s *Store) CreateUserSession(ctx context.Context, userID string) (string, error)
- func (s *Store) CreateWebhook(ctx context.Context, userID, url, description string, events []string, ...) (*Webhook, error)
- func (s *Store) DeleteAPIKey(ctx context.Context, keyID, userID string) error
- func (s *Store) DeleteAgent(ctx context.Context, agentID, userID string) error
- func (s *Store) DeleteDomain(ctx context.Context, domain, userID string) error
- func (s *Store) DeleteExpiredMessages(ctx context.Context) (int64, error)
- func (s *Store) DeleteExpiredUserSessions(ctx context.Context) (int64, error)
- func (s *Store) DeleteSigningSecret(ctx context.Context, secretID, userID string) error
- func (s *Store) DeleteUserData(ctx context.Context, userID string) (*DeleteUserDataResult, error)
- func (s *Store) DeleteUserSession(ctx context.Context, token string) error
- func (s *Store) DeleteWebhook(ctx context.Context, webhookID, userID string) error
- func (s *Store) EnsureSharedDomain(ctx context.Context, domain string) error
- func (s *Store) EnsureUserHasSigningSecret(ctx context.Context, userID string) error
- func (s *Store) ExpireApproveAndSend(ctx context.Context, messageID string, ...) (*Message, error)
- func (s *Store) ExpireReject(ctx context.Context, messageID, reason string) (*Message, error)
- func (s *Store) ExportUserData(ctx context.Context, userID string) (*UserExport, error)
- func (s *Store) GetAgentByEmail(ctx context.Context, email string) (*AgentIdentity, error)
- func (s *Store) GetAgentByID(ctx context.Context, id string) (*AgentIdentity, error)
- func (s *Store) GetConversationByID(ctx context.Context, agentID, conversationID string) (*ConversationDetail, error)
- func (s *Store) GetDKIMKeyInternal(ctx context.Context, domain string) (string, []byte, error)
- func (s *Store) GetDashboardStats(ctx context.Context, userID string, windowDays int) (*DashboardStats, error)
- func (s *Store) GetInboundByEmailMessageID(ctx context.Context, agentID, emailMessageID string) (*Message, error)
- func (s *Store) GetInboundMessage(ctx context.Context, id string) (*Message, error)
- func (s *Store) GetMessageWithContent(ctx context.Context, messageID, agentID string) (*Message, error)
- func (s *Store) GetMessagesByAgent(ctx context.Context, f MessageListFilter) ([]Message, error)
- func (s *Store) GetOutboundMessageForUser(ctx context.Context, messageID, userID string) (*Message, error)
- func (s *Store) GetUserByAPIKey(ctx context.Context, apiKey string) (*User, error)
- func (s *Store) GetUserByID(ctx context.Context, id string) (*User, error)
- func (s *Store) GetUserSession(ctx context.Context, token string) (*User, error)
- func (s *Store) GetUserSigningSecrets(ctx context.Context, userID string) ([]SigningSecretWithValue, error)
- func (s *Store) GetWebhookByID(ctx context.Context, webhookID, userID string) (*Webhook, error)
- func (s *Store) GetWebhookByIDInternal(ctx context.Context, webhookID string) (*Webhook, error)
- func (s *Store) HasAgentsOnDomain(ctx context.Context, domain, userID string) (bool, error)
- func (s *Store) ListAPIKeys(ctx context.Context, userID string) ([]APIKey, error)
- func (s *Store) ListActivityByAgent(ctx context.Context, agentID string, limit int) ([]Message, error)
- func (s *Store) ListAgentsByUser(ctx context.Context, userID string) ([]AgentIdentity, error)
- func (s *Store) ListConversationsByAgent(ctx context.Context, f ConversationListFilter) ([]ConversationSummary, error)
- func (s *Store) ListDomainsByUser(ctx context.Context, userID string) ([]Domain, error)
- func (s *Store) ListEnabledWebhooksForRouting(ctx context.Context, userID, eventType string) ([]Webhook, error)
- func (s *Store) ListExpiredPending(ctx context.Context, limit int) ([]ExpirationCandidate, error)
- func (s *Store) ListPendingOutboundForUser(ctx context.Context, userID string, limit int) ([]Message, error)
- func (s *Store) ListSigningSecrets(ctx context.Context, userID string) ([]SigningSecret, error)
- func (s *Store) ListWebhooksByUser(ctx context.Context, userID string) ([]Webhook, error)
- func (s *Store) LookupConversationID(ctx context.Context, agentID string, messageIDs []string) (string, error)
- func (s *Store) LookupDomain(ctx context.Context, domain, userID string) (*Domain, error)
- func (s *Store) MarkSendFailed(ctx context.Context, messageID, errMsg string) error
- func (s *Store) MarkSendSucceeded(ctx context.Context, messageID string, r SendResult) error
- func (s *Store) MaxWebhooksForUser(ctx context.Context, userID string) (int, error)
- func (s *Store) ModifyMessageLabels(ctx context.Context, messageID, agentID string, add, remove []string) ([]string, error)
- func (s *Store) RejectPending(ctx context.Context, messageID, userID, reason string) (*Message, error)
- func (s *Store) ResolveOutboundOwner(ctx context.Context, messageID string) (userID, agentID string, err error)
- func (s *Store) RotateSecret(ctx context.Context, webhookID, userID string) (newPlaintext string, prevExpiresAt time.Time, err error)
- func (s *Store) SetDomainPrimary(ctx context.Context, domain, userID string) error
- func (s *Store) TouchDomainLastChecked(ctx context.Context, domain, userID string) error
- func (s *Store) TouchSigningSecretLastSigned(ctx context.Context, secretID string) error
- func (s *Store) UpdateAgentHITL(ctx context.Context, agentID, userID string, enabled bool, ttlSeconds int, ...) error
- func (s *Store) UpdateAgentMode(ctx context.Context, agentID, userID, agentMode, webhookURL string) error
- func (s *Store) UpdateAgentWebhook(ctx context.Context, agentID, userID, webhookURL string) error
- func (s *Store) UpdateMessageDeliveryStatus(ctx context.Context, messageID, agentID, status string) error
- func (s *Store) UpdateUserName(ctx context.Context, userID, name string) (*User, error)
- func (s *Store) UpdateWebhook(ctx context.Context, webhookID, userID string, u WebhookUpdate) (*Webhook, error)
- func (s *Store) VerifyDomain(ctx context.Context, domain, userID string) error
- func (s *Store) WithTx(ctx context.Context, fn func(tx pgx.Tx) error) error
- type UsageEventEntry
- type User
- type UserExport
- type UserExportUser
- type Webhook
- type WebhookFilters
- type WebhookUpdate
Constants ¶
const ( HITLMaxTTLSeconds = 604800 // 7 days HITLDefaultTTLSeconds = 604800 HITLExpirationApprove = "approve" HITLExpirationReject = "reject" HITLDefaultExpirationAct = HITLExpirationReject )
HITL constants mirror the CHECK constraints in migration 003_hitl.sql.
const ( MessageStatusSent = "sent" MessageStatusPendingApproval = "pending_approval" MessageStatusRejected = "rejected" MessageStatusExpiredApproved = "expired_approved" MessageStatusExpiredRejected = "expired_rejected" )
Message status values mirror the CHECK constraint in migration 003_hitl.sql.
const ( AutoDisableThreshold = 10 AutoDisableWindow = 72 * time.Hour )
AutoDisableThreshold is the consecutive-failed-events count over AutoDisableWindow that trips a webhook into the auto-disabled state. Tuned per design decision #12 (10 / 72h). The reviewer can re-enable via PATCH after the 5-min cooldown.
const ConversationListHardCap = 100
ConversationListHardCap is the maximum number of conversations a single list call returns. Higher requests are silently clamped. 100 covers the inbox-style use case; a deployment that needs more can either ask for higher (we'll bump it) or paginate (slice 2).
const DashboardDefaultWindowDays = 7
DashboardDefaultWindowDays is the lookback for the dashboard strip when the caller doesn't request a specific window.
const DashboardMaxWindowDays = 90
DashboardMaxWindowDays caps the lookback to keep the underlying SQL scan bounded. 90 days is generous for any UI surface we currently have and remains efficient given the per-user index on usage_summaries.
const DefaultMaxWebhooks = 50
MaxWebhooksForUser returns the per-user cap from account_limits. Users without an account_limits row default to 50 — the column DEFAULT, mirrored here as a fallback so the cap works on dev installs that haven't seeded an account_limits row.
const MaxLabelsPerMessage = 100
MaxLabelsPerMessage is the post-add cap on the labels[] column. The per-operation cap (max items in add_labels / remove_labels) is enforced earlier at the handler. The two together bound the array at a size where GIN containment + JSON marshalling stay cheap.
const MaxSigningSecretsPerUser = 5
MaxSigningSecretsPerUser caps how many active signing secrets a user can hold at once. Two slots covers the standard rotation flow (create new, swap, delete old); a hard cap higher than that mostly catches runaway scripts. Easy to raise later if real users need more.
const MessageTTL = 10 * 24 * time.Hour // 10 days
MessageTTL is the per-row lifetime for `messages`. The janitor at DeleteExpiredMessages drops rows whose expires_at has passed.
10 days is chosen to strictly exceed HITLMaxTTLSeconds (7 days) with a 3-day buffer. The buffer guarantees:
- the HITL worker (60s cadence) always wins the race against the messages janitor (hourly cadence) on max-HITL pending rows;
- terminal HITL rows retain ≥3 days of post-resolution audit visibility before the metadata row is dropped;
- reply-composition can load a parent inbound up to 10 days old for quoting context.
If HITLMaxTTLSeconds is ever raised, raise this too — keep MessageTTL > HITLMaxTTLSeconds by at least 1 day.
const SendAttemptStaleWindow = 10 * time.Minute
SendAttemptStaleWindow is how long an 'attempting' send_attempts row stays "owned" by the original worker before the next caller is allowed to take it over. Bounded above by outbound.SMTPRelay's worst-case retry envelope (~6.5min) plus headroom — kept tighter would risk concurrent SES sends if a real upstream stall happened.
const SessionTTL = 7 * 24 * time.Hour
Variables ¶
var ( ErrSigningSecretCapReached = fmt.Errorf("at most %d signing secrets per user; delete one before creating another", MaxSigningSecretsPerUser) ErrCannotDeleteLastSigningSecret = errors.New("cannot delete the last signing secret; create a new one first") ErrSigningSecretNotFound = errors.New("signing secret not found or not owned by user") )
Sentinel errors so API handlers can map error → HTTP status with errors.Is rather than string-matching the message text. Tests can also assert against them directly.
var ( ErrWebhookNotFound = errors.New("webhook not found") ErrWebhookCapReached = errors.New("webhook count limit reached for this user") )
Sentinel errors so API handlers can map error → HTTP status with errors.Is rather than string-matching.
var ErrDomainHasAgents = fmt.Errorf("cannot delete domain: agents still exist")
ErrDomainHasAgents is returned when a domain delete is blocked by existing agents.
var ErrDomainNotFound = fmt.Errorf("domain not found or not owned by user")
ErrDomainNotFound is returned when a domain is not found or not owned by the user.
var ErrLabelLimitExceeded = errors.New("label limit exceeded")
ErrLabelLimitExceeded reports that an add operation would push a message past MaxLabelsPerMessage. Mapped to HTTP 400 at the handler.
var ErrMessageNotFound = fmt.Errorf("message not found")
ErrMessageNotFound is returned when a message is not found for the given user (either the ID doesn't exist or the message belongs to another user's agent). Handlers map this to HTTP 404.
var ErrNotPendingApproval = fmt.Errorf("message is not pending approval")
ErrNotPendingApproval is returned when an approve or reject operation targets a message that is not (or is no longer) in pending_approval status. Handlers map this to HTTP 409 Conflict.
var ErrSendInProgress = errors.New("send already in progress for this message")
ErrSendInProgress is returned by ApproveAndSend (and the underlying ClaimSendAttempt) when a concurrent attempt for the same message is still in-flight at the SES layer. Callers should treat this as transient — the in-flight caller will either commit (the next retry sees status='sent' and replays) or time out (the row goes stale and the next retry takes over).
var ErrWebhookCooldown = errors.New("webhook was auto-disabled within the last 5 minutes; wait before re-enabling")
ErrWebhookCooldown is returned when a PATCH would re-enable a webhook that was auto-disabled within the last 5 minutes. Slice 4 adds the auto-disable worker; this error type lands now so the handler doesn't need to map magic strings later.
Functions ¶
func NewMessageID ¶
func NewMessageID() string
NewMessageID returns a fresh internal message ID. Callers can use this to generate the ID up-front when they need it before storing — for example, the SMTP relay generates the ID before signing auth headers so the ID is part of the canonical string fed to HMAC.
func NormalizeEmail ¶ added in v0.3.0
NormalizeEmail returns the canonical lookup form of an email address: lower-cased, with surrounding whitespace stripped. Every external-input email used as a lookup key (path vars, form fields, OAuth consent choices, WebSocket subscriptions) must funnel through this so case variants ("Alice@x.com" vs "alice@x.com") resolve to the same row.
Per RFC 5321 §2.4 the local-part is technically case-sensitive — a small number of providers (most famously ProtonMail) preserve case. We collapse it anyway because consistency across HTTP path, JSON body, SMTP envelope, and dashboard URL is more important than spec purity for the agent-inbox use case. Document this if a user complains.
func RunMigrations ¶ added in v0.3.0
func RunMigrations(ctx context.Context, pool *pgxpool.Pool, migrationsFS fs.FS, mode MigrationMode) error
RunMigrations applies every embedded migration that isn't yet recorded in the schema_migrations table. Migrations run in filename-sorted order, each in its own transaction (unless tagged with the "e2a:no-transaction" directive); on the first error the function returns without applying later migrations.
A Postgres session advisory lock serializes concurrent invocations from rolling restarts and multi-instance deploys. The lock is held for the duration of the function and auto-released on session disconnect (so a crashed binary doesn't leave the DB stuck).
All migrations should be written idempotent (CREATE/ALTER ... IF NOT EXISTS) so even-without-the-lock re-runs are harmless. The tracker is the source of truth for "should we attempt to run this one again"; idempotence + the lock are layered safety nets.
func ValidateHITLConfig ¶
ValidateHITLConfig returns an error if the TTL or expiration action is invalid. The DB CHECK constraints are the final guard; this mirrors them for a clean, pre-query error path.
Types ¶
type APIKey ¶
type APIKey struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Name string `json:"name"`
KeyPrefix string `json:"key_prefix"`
PlaintextKey string `json:"key,omitempty"` // only set once at creation, never stored
CreatedAt time.Time `json:"created_at"`
// LastUsedAt is updated by GetUserByAPIKey on every successful
// AuthenticateRequest. NULL on keys that have never been used.
LastUsedAt *time.Time `json:"last_used_at,omitempty"`
// ExpiresAt is the optional hard expiry. AuthenticateRequest rejects
// keys whose expires_at has passed. NULL means "never expires"
// (the backward-compatible default).
ExpiresAt *time.Time `json:"expires_at,omitempty"`
}
type APIKeyExportEntry ¶
type APIKeyExportEntry struct {
ID string `json:"id"`
Name string `json:"name"`
KeyPrefix string `json:"key_prefix"`
CreatedAt time.Time `json:"created_at"`
LastUsedAt *time.Time `json:"last_used_at,omitempty"`
RevokedAt *time.Time `json:"revoked_at,omitempty"`
} // @name APIKeyExportEntry
APIKeyExportEntry is the API-key shape included in the export. We expose only metadata; the hash itself stays internal because (a) it's not useful to the user, (b) it's a credential equivalent for offline dictionary attacks if leaked.
type AgentIdentity ¶
type AgentIdentity struct {
ID string `json:"id"`
Domain string `json:"domain"`
Email string `json:"email"`
Name string `json:"name"`
WebhookURL string `json:"webhook_url"`
AgentMode string `json:"agent_mode"`
DomainVerified bool `json:"domain_verified"`
Public bool `json:"public"`
CreatedAt time.Time `json:"created_at"`
UserID string `json:"user_id"`
HITLEnabled bool `json:"hitl_enabled"`
HITLTTLSeconds int `json:"hitl_ttl_seconds"`
HITLExpirationAction string `json:"hitl_expiration_action"`
// Dashboard enrichment fields. Computed at read
// time by ListAgentsByUser via correlated subqueries — other load
// paths (GetAgentByID / GetAgentByEmail) leave them at zero values,
// same pattern as Domain.AgentCount. Switch to denormalized columns
// if the read cost ever bites.
Inbound7d int `json:"inbound_7d"`
Outbound7d int `json:"outbound_7d"`
PendingCount int `json:"pending_count"`
LastDeliveryAt *time.Time `json:"last_delivery_at,omitempty"`
// WebhookHealthy is false iff there's been a failed webhook delivery
// in the last 24h. Defaults to true for agents with no deliveries
// yet — avoids painting fresh agents red. Meaningless for
// agent_mode='local'; the frontend hides the badge in that case.
WebhookHealthy bool `json:"webhook_healthy"`
}
func (*AgentIdentity) ActualDomain ¶
func (a *AgentIdentity) ActualDomain() string
ActualDomain returns the DNS domain for the agent.
func (*AgentIdentity) EmailAddress ¶
func (a *AgentIdentity) EmailAddress() string
EmailAddress returns the agent's email address (always the ID).
func (*AgentIdentity) IsSharedDomain ¶
func (a *AgentIdentity) IsSharedDomain(sharedDomain string) bool
IsSharedDomain returns true if the agent's domain matches the configured shared domain (the host that backs slug-based registration). When sharedDomain is empty, the deployment has slug registration disabled and no agent can be on the shared domain.
type ConversationDetail ¶ added in v0.3.0
type ConversationDetail struct {
ConversationSummary
Participants []string `json:"participants"`
Labels []string `json:"labels"`
Messages []Message `json:"messages"`
}
ConversationDetail extends the summary with member messages and computed aggregates (participants set, label union). Messages are returned chronologically (oldest first) — the rendering convention for a thread view.
type ConversationListFilter ¶ added in v0.3.0
type ConversationListFilter struct {
AgentID string
Limit int
// Since / Until bracket the conversation's last_message_at —
// "show me conversations that had activity in this window".
// Zero values disable each bound.
Since time.Time
Until time.Time
}
ConversationListFilter is the input to ListConversationsByAgent. Limit is capped to ConversationListHardCap at the storage layer regardless of what the caller passes; pagination is intentionally not in this slice (most agents have dozens of conversations, not thousands) and can be added cursor-style if a deployment needs it.
type ConversationSummary ¶ added in v0.3.0
type ConversationSummary struct {
ID string `json:"conversation_id"`
LastMessageAt time.Time `json:"last_message_at"`
FirstMessageAt time.Time `json:"first_message_at"`
MessageCount int `json:"message_count"`
InboundCount int `json:"inbound_count"`
OutboundCount int `json:"outbound_count"`
HasUnread bool `json:"has_unread"`
LatestSubject string `json:"latest_subject"`
LatestSender string `json:"latest_sender"`
}
ConversationSummary is one row in the list endpoint. Aggregated counts + the "latest message" preview fields are enough to render an inbox-style conversation list without a per-row drill-down.
HasUnread is true iff at least one INBOUND member is in inbox_status='unread'. Outbound pending_approval doesn't count — the conversation list is the agent's mailbox view, not the reviewer's HITL queue.
type DashboardPendingStats ¶ added in v0.3.0
type DashboardStats ¶ added in v0.3.0
type DashboardStats struct {
Today DashboardTodayStats `json:"today"`
Pending DashboardPendingStats `json:"pending"`
DeliverySuccessPct float64 `json:"delivery_success_pct"`
SampleWindowDays int `json:"sample_window_days"`
// InboundWindow / OutboundWindow are the totals over the same
// SampleWindowDays as DeliverySuccessPct. The dashboard at-a-glance
// strip uses Today.*; the settings page uses these window totals
// at a 30-day window (?window=30). Sum over usage_summaries rows
// in the lookback period.
InboundWindow int `json:"inbound_window"`
OutboundWindow int `json:"outbound_window"`
}
DashboardStats is the workspace-level summary returned by GetDashboardStats. Each section corresponds to one of the cards on the redesigned dashboard's stats strip; null/zero values render as "—" in the UI, so deployments without E2A_USAGE_TRACKING enabled degrade gracefully.
type DashboardTodayStats ¶ added in v0.3.0
type DeleteUserDataResult ¶
type DeleteUserDataResult struct {
UsageEventsDeleted int64 `json:"usage_events_deleted"`
UsageSummariesDeleted int64 `json:"usage_summaries_deleted"`
MessagesDeleted int64 `json:"messages_deleted"`
AgentsDeleted int64 `json:"agents_deleted"`
DomainsDeleted int64 `json:"domains_deleted"`
APIKeysDeleted int64 `json:"api_keys_deleted"`
SessionsDeleted int64 `json:"sessions_deleted"`
OAuthAuthCodesDeleted int64 `json:"oauth_auth_codes_deleted,omitempty"`
OAuthAccessTokensDeleted int64 `json:"oauth_access_tokens_deleted,omitempty"`
OAuthRefreshTokensDeleted int64 `json:"oauth_refresh_tokens_deleted,omitempty"`
UserDeleted bool `json:"user_deleted"`
} // @name DeleteUserDataResult
DeleteUserDataResult breaks out per-table row counts for audit logs. Operators receiving a deletion request often have to attest to what was removed; returning structured counts beats parsing a log line.
type Domain ¶
type Domain struct {
Domain string `json:"domain"`
UserID *string `json:"user_id,omitempty"`
Verified bool `json:"verified"`
VerificationToken string `json:"verification_token"`
CreatedAt time.Time `json:"created_at"`
VerifiedAt *time.Time `json:"verified_at,omitempty"`
// IsPrimary marks the user's default domain. At most one TRUE per
// user (enforced by a partial unique index in migration 013).
IsPrimary bool `json:"is_primary"`
// LastCheckedAt is updated whenever the verification probe runs,
// successful or not. NULL until the first probe — distinct from
// "probed and failed" which is captured by `verified=false` + a
// non-null LastCheckedAt.
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
// AgentCount is computed at read time by ListDomainsByUser and is
// not a persisted column. Single-domain LookupDomain leaves it at
// the zero value — callers that need the count call the list path
// (this column-versus-aggregate split avoids changing every store
// signature to thread an agent-counter through).
AgentCount int `json:"agent_count"`
// DKIM keypair fields. The selector + public key
// are user-facing — the dashboard shows them so users can copy the
// DNS TXT record. The private key is intentionally NOT in the JSON
// shape; it's only read by the outbound signer via
// GetDKIMKey(domain). Domains created before migration 014 ran
// keep all three NULL until the next ClaimOrCreate or backfill.
DKIMSelector string `json:"dkim_selector,omitempty"`
DKIMPublicKey string `json:"dkim_public_key,omitempty"`
}
Domain represents a verified or unverified domain registered by a user.
type ExpirationCandidate ¶
type ExpirationCandidate struct {
MessageID string
AgentID string
ExpirationAction string // 'approve' or 'reject'
}
ExpirationCandidate is the minimal row the expiration worker needs to decide how to finalize an expired pending message.
type Message ¶
type Message struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
Direction string `json:"direction"`
Sender string `json:"sender"`
Recipient string `json:"recipient"`
Subject string `json:"subject"`
EmailMessageID string `json:"email_message_id,omitempty"`
ProviderMessageID string `json:"provider_message_id,omitempty"`
Method string `json:"method,omitempty"`
Type string `json:"type,omitempty"`
RawMessage []byte `json:"raw_message,omitempty"`
AuthHeaders map[string]string `json:"auth_headers,omitempty"`
ConversationID string `json:"conversation_id,omitempty"`
DeliveryStatus string `json:"delivery_status,omitempty"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
WebhookStatus string `json:"webhook_status,omitempty"`
WebhookError string `json:"webhook_error,omitempty"`
WebhookAttempts int `json:"webhook_attempts,omitempty"`
// SizeBytes is the byte length of raw_message. Populated by load paths
// that compute it (e.g. GetMessagesByAgent for the dashboard inbox).
// Zero on load paths that don't — the inbox renders "—" in that case.
SizeBytes int `json:"size_bytes,omitempty"`
// InboxStatus mirrors messages.inbox_status ('unread' | 'read') for
// inbound rows. Kept separate from DeliveryStatus (which currently
// carries the same value under a confusing JSON key — see line 161)
// so the dashboard's inbox can read it under a non-overloaded key.
// Empty on outbound rows. Populated by GetMessagesByAgent.
InboxStatus string `json:"inbox_status,omitempty"`
// Multi-recipient fields. For outbound, these are the addressed
// To/Cc/Bcc recipients of the send. For inbound, ToRecipients and CC
// are the parsed To: and Cc: headers of the original message (the
// per-delivery target for this row is in Recipient). BCC is
// outbound-only.
ToRecipients []string `json:"to_recipients,omitempty"`
CC []string `json:"cc,omitempty"`
BCC []string `json:"bcc,omitempty"`
// ReplyTo is the parsed Reply-To: header on inbound messages — empty
// when the header was absent. Distinct from Sender so consumers can
// recover the original From: of forwarded / notification mail whose
// Reply-To points at a different mailbox. Outbound-irrelevant.
ReplyTo []string `json:"reply_to,omitempty"`
// Labels are user-applied string tags (`urgent`, `follow-up`, …).
// Always lowercase, charset `[a-z0-9:_-]+`, ≤ 64 chars per label,
// capped at 100 per message. Empty slice means no labels — the DB
// default is `'{}'` so this is never null on read. Labels with the
// `e2a:` prefix are reserved for server-applied system labels;
// caller writes that try to set them are rejected at the API layer.
Labels []string `json:"labels,omitempty"`
// HITL approval fields. Status defaults to 'sent'; body and attachments
// are populated only while a message is in 'pending_approval', and are
// scrubbed on any terminal transition.
Status string `json:"status,omitempty"`
ApprovalExpiresAt *time.Time `json:"approval_expires_at,omitempty"`
ReviewedAt *time.Time `json:"reviewed_at,omitempty"`
// ReviewedByUserID identifies the human reviewer who approved or
// rejected this message. NULL on worker-triggered transitions
// (TTL auto-approve / auto-reject) — operator-visible signal "no
// human looked at this." Set by ApproveAndSend and RejectPending,
// left null by ExpireApproveAndSend / ExpireReject.
ReviewedByUserID *string `json:"reviewed_by_user_id,omitempty"`
// ReviewedByName is the JOIN'd display name from the reviewer's
// users row, populated only by GetOutboundMessageForUser. List
// endpoints leave this empty to avoid a join-per-row cost — the
// pending-detail page is where reviewer attribution matters.
ReviewedByName *string `json:"reviewed_by_name,omitempty"`
RejectionReason string `json:"rejection_reason,omitempty"`
Edited bool `json:"edited,omitempty"`
BodyText string `json:"body_text,omitempty"`
BodyHTML string `json:"body_html,omitempty"`
AttachmentsJSON json.RawMessage `json:"attachments,omitempty"`
}
type MessageListFilter ¶ added in v0.3.0
type MessageListFilter struct {
AgentID string
Status string // "unread" | "read" | "all"
Direction string // "inbound" | "outbound" | "all"
Descending bool
Limit int
AfterTime time.Time
AfterID string
// Optional search filters. Empty / zero means "no constraint".
// From / SubjectContains are case-insensitive substring matches
// (Postgres ILIKE) and bound to 200 chars at the handler layer.
From string
SubjectContains string
ConversationID string // exact match
Since time.Time // created_at >= Since
Until time.Time // created_at < Until
// Labels filters rows where ALL given labels are present on the
// message (AND-match via Postgres @> array containment). Empty slice
// means "no label constraint" — matches both labelled and unlabelled
// rows. Handler-layer validates each entry against the same charset
// rule used on writes so callers can't smuggle SQL through here.
Labels []string
}
MessageListFilter bundles the params for GetMessagesByAgent. Zero values on the optional substring / time / ID filters mean "no constraint" — callers omit what they don't want to filter on.
type MigrationMode ¶ added in v0.3.0
type MigrationMode string
MigrationMode controls how RunMigrations handles pending migrations.
Set via E2A_MIGRATION_MODE env var. Default is ModeAuto.
- ModeAuto: apply pending migrations in order. Fail (return error) if any apply errors. This is what the hosted deployment uses.
- ModeVerify: do not apply anything. Return an error listing pending migrations. For cautious operators who want a separate manual migration step before binary rollout.
- ModeSkip: do not apply or check. Log and proceed. For emergency surgery — deploy a binary that won't touch schema.
const ( ModeAuto MigrationMode = "auto" ModeVerify MigrationMode = "verify" ModeSkip MigrationMode = "skip" )
func ParseMigrationMode ¶ added in v0.3.0
func ParseMigrationMode(s string) (MigrationMode, error)
ParseMigrationMode reads a string (typically from env) and returns the matching mode, defaulting to ModeAuto on empty input. Returns an error for unknown values so a typo is loud, not silent.
type OAuthConnectionEntry ¶ added in v0.3.0
type OAuthConnectionEntry struct {
ClientID string `json:"client_id"`
ClientName string `json:"client_name"`
AgentEmail string `json:"agent_email"`
Scope string `json:"scope"`
IssuedAt time.Time `json:"issued_at"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
RevokedAt *time.Time `json:"revoked_at,omitempty"`
} // @name OAuthConnectionEntry
OAuthConnectionEntry is one OAuth/MCP client connection. The underlying token signatures are intentionally excluded — they are credential-equivalent. The agent_email is the per-grant binding captured at consent time.
type PendingApprovalEdit ¶
type PendingApprovalEdit struct {
Subject *string
BodyText *string
BodyHTML *string
To []string
CC []string
BCC []string
AttachmentsJSON []byte
// AttachmentsSet must be true when the caller intends to override
// AttachmentsJSON, since nil and empty [] are both valid overrides
// (empty [] clears attachments; nil preserves).
AttachmentsSet bool
}
PendingApprovalEdit holds optional overrides a reviewer can apply when approving a pending message. Pointer-typed strings distinguish "not provided" (nil) from "explicitly empty" (pointer to ""). Slice fields distinguish "unset" (nil) from "empty list" (non-nil zero-length slice).
func (PendingApprovalEdit) Apply ¶
func (e PendingApprovalEdit) Apply(msg *Message) bool
Apply mutates msg to reflect any fields the reviewer changed. Returns true if any field was actually different from what msg already held (signals the edited flag should be set).
type SendAttemptOutcome ¶ added in v0.3.0
type SendAttemptOutcome int
SendAttemptOutcome describes the result of trying to reserve a (message_id) slot for an upstream SES send.
const ( // SendAttemptAcquired — caller now owns the slot and must follow // up with MarkSendSucceeded or MarkSendFailed. SendAttemptAcquired SendAttemptOutcome = iota // SendAttemptAlreadySent — a prior attempt for this message // already succeeded at SES; SendResult is populated with the // recorded provider id and recipient lists. Callers must NOT // re-invoke the upstream send. SendAttemptAlreadySent // SendAttemptInFlight — a concurrent caller holds the slot and // the row is not stale. Callers should surface ErrSendInProgress. SendAttemptInFlight )
type SendAttemptResult ¶ added in v0.3.0
type SendAttemptResult struct {
Outcome SendAttemptOutcome
Sent SendResult
}
SendAttemptResult bundles the outcome with the cached SendResult when the outcome is SendAttemptAlreadySent.
type SendResult ¶
type SendResult struct {
ProviderMessageID string
Method string
To []string
CC []string
BCC []string
}
SendResult carries the outcome of a sender.Send invocation back to the store for final persistence. Handlers wrap their sender.Send call in a closure that returns this type.
type SigningSecret ¶ added in v0.3.0
type SigningSecret struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Name string `json:"name"`
Secret string `json:"secret,omitempty"` // only on creation
SecretPrefix string `json:"secret_prefix,omitempty"` // first 12 chars, for list/get
CreatedAt time.Time `json:"created_at"`
LastSignedAt *time.Time `json:"last_signed_at,omitempty"`
}
SigningSecret is one of a user's HMAC secrets used to sign their agents' inbound webhook payloads and HITL approval magic-link tokens.
The plaintext Secret is only set in the response to a fresh CreateSigningSecret call (and what's persisted in the DB row); list operations omit it and surface a SecretPrefix preview instead.
type SigningSecretWithValue ¶ added in v0.3.0
SigningSecretWithValue carries the plaintext Secret alongside the ID so the relay can both sign with the value and (asynchronously) update last_signed_at on the right row. Returned by GetUserSigningSecrets in most-recent-first order.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) ApproveAndSend ¶
func (s *Store) ApproveAndSend( ctx context.Context, messageID, userID string, edits PendingApprovalEdit, send func(msg *Message) (SendResult, error), ) (*Message, error)
ApproveAndSend finalizes a pending_approval message by running it through a caller-supplied send function inside a transaction that holds a row lock on the pending row. If send returns an error the transaction rolls back and the message remains pending. On success the row is updated to 'sent' with the provider-assigned Message-ID and the body/attachments columns are scrubbed.
edits, if any fields are populated, are applied to the in-memory message before send is called and the 'edited' column is set to true when any field differs from what was stored. Approval-via-magic-link callers pass the zero edits value.
Ownership is enforced by the agent -> user join. Messages owned by another user return ErrMessageNotFound. Messages whose status is not 'pending_approval' return ErrNotPendingApproval. If another worker is already mid-send for this message (rare; only possible after the approval row lock was released without status changing — e.g. a pool drop mid-send), this returns ErrSendInProgress.
Concurrency / failure mode notes:
The row-level FOR NO KEY UPDATE lock is held on the messages row for the duration of the send callback. In practice that is bounded by outbound.SMTPRelay's per-attempt deadline (2min) plus its internal retry backoff (1s/5s/15s) — worst case ~6.5min of lock on this single row. Other rows are unaffected; deadlock is not possible because only one row is ever locked per call.
Why NO KEY UPDATE rather than the stricter FOR UPDATE: the send_attempts INSERT below runs on a SEPARATE pool connection and needs a KEY SHARE lock on this messages row for FK enforcement. FOR UPDATE blocks KEY SHARE; FOR NO KEY UPDATE allows it. The downgrade is safe because nothing in this codebase mutates messages.id (the only key column) after creation — all UPDATEs touch non-key columns, which NO KEY UPDATE serializes against itself exactly like FOR UPDATE.
The old crash window where send() succeeded at SES but the subsequent UPDATE/Commit failed (DB blip, pool exhaustion) is now closed by the send_attempts table. Around send() we run two small auxiliary transactions that outlive the surrounding approval transaction: ClaimSendAttempt before send(), MarkSendSucceeded (or MarkSendFailed) after. If the approval tx rolls back AFTER send() succeeded, the next retry of ApproveAndSend reads send_attempts.status='sent', reuses the recorded SendResult, and skips the upstream send entirely.
func (*Store) AutoDisableFailingWebhooks ¶ added in v0.3.0
AutoDisableFailingWebhooks scans for webhooks whose recent delivery history exceeds the failure threshold and flips them to enabled=false with auto_disabled_at = now(). Returns the count of webhooks newly disabled. Designed to be called periodically (e.g. every 5 minutes) from a janitor goroutine.
"Consecutive failed events" is interpreted as: in the last AutoDisableWindow, at least AutoDisableThreshold rows in webhook_subscriber_deliveries reached status='failed' AND zero rows reached status='delivered'. The zero-delivered guard prevents a noisy webhook that's still mostly working from being disabled.
func (*Store) BootstrapUser ¶
BootstrapUser finds a user by email, or creates one with a synthetic google_subject if none exists. Used by the -bootstrap-email CLI flag for self-host first-run, where there's no Google OAuth flow yet.
func (*Store) ClaimOrCreateDomain ¶
ClaimOrCreateDomain implements the atomic create/claim logic from the design doc. Creates if new, returns the existing row when the same user already owns it (verified or not), and errors if a different user owns it. The verification_token and DKIM keypair are minted on first INSERT and remain stable across re-claims — a caller that has already published the TXT record on DNS (or has mail in flight signed with the DKIM key) isn't silently invalidated by a second call. A different user cannot take over an unverified row; that closes a squatting window where the new owner could verify against a TXT record the original owner already published. Callers are responsible for rejecting the configured shared domain before invoking this — the store has no concept of a reserved domain.
func (*Store) ClaimSendAttempt ¶ added in v0.3.0
ClaimSendAttempt atomically reserves the send_attempts row for messageID. Concurrency model mirrors internal/idempotency.Claim: one UPSERT with a stale-takeover WHERE clause; the loser does a follow-up SELECT to classify the existing row.
Allowed transitions:
(no row) → acquired (fresh INSERT) status='failed' → acquired (UPSERT path takes over) status='attempting' AND stale → acquired (UPSERT path takes over) status='attempting' AND NOT stale → InFlight (refuse) status='sent' → AlreadySent (reuse recorded result)
Called from ApproveAndSend in a SEPARATE small transaction so the claim row outlives any rollback of the surrounding approval transaction. That's what closes the documented double-send window: once status='sent' is committed here, a retry sees AlreadySent and skips the upstream send even if the messages-row UPDATE inside the approval tx never committed.
func (*Store) ClearExpiredPrevSecrets ¶ added in v0.3.0
ClearExpiredPrevSecrets nulls signing_secret_prev / signing_secret_prev_expires_at on rows past their grace window. Idempotent. The worker already ignores expired prev secrets at signing time; this janitor is a hygiene pass so GET responses don't carry a meaningless prev_expires_at.
func (*Store) CountWebhooksByUser ¶ added in v0.3.0
CountWebhooksByUser returns the total number of webhooks (enabled + disabled) the user owns. Used by CreateWebhook to enforce the per-user cap from account_limits.max_webhooks.
func (*Store) CreateAPIKey ¶
func (s *Store) CreateAPIKey(ctx context.Context, userID, name string, expiresAt *time.Time) (*APIKey, error)
CreateAPIKey issues a fresh API key for the user. expiresAt is the optional hard expiration; pass nil to issue a never-expiring key (the backward-compatible default).
func (*Store) CreateAgent ¶
func (s *Store) CreateAgent(ctx context.Context, agentEmail, domain, name, webhookURL, agentMode, userID string) (*AgentIdentity, error)
CreateAgent inserts an agent with a domain FK. Does NOT check domain ownership — that's the API handler's responsibility (shared domain skips the check).
func (*Store) CreateAgentTx ¶ added in v0.3.0
func (s *Store) CreateAgentTx(ctx context.Context, tx pgx.Tx, agentEmail, domain, name, webhookURL, agentMode, userID string) (*AgentIdentity, error)
CreateAgentTx inserts an agent inside a caller-owned transaction. Used by the OAuth consent flow so the slug auto-create row and the authorization-code insert (in oauth_auth_codes) commit together — without this, a code-issue failure after the agent commit would leave a phantom inbox the user never authorized.
func (*Store) CreateInboundMessage ¶
func (s *Store) CreateInboundMessage(ctx context.Context, id, agentID, senderEmail, recipient, emailMessageID, subject, conversationID, deliveryStatus string, rawMessage []byte, authHeaders map[string]string, toRecipients, cc, replyTo []string) (*Message, error)
CreateInboundMessage stores an inbound message. If id is empty a new one is generated; otherwise the caller's pre-generated ID is used so the upstream signer can bind auth headers to the same ID that gets stored. toRecipients and cc are the parsed To: and Cc: headers from the original RFC 2822 message; recipient is the per-delivery target for this row (may be one of the To: addresses, or absent from the header list when the agent was Bcc'd). replyTo is the parsed Reply-To: header (empty when absent — never silently falls back to sender).
func (*Store) CreateInboundMessageInTx ¶ added in v0.3.0
func (s *Store) CreateInboundMessageInTx(ctx context.Context, tx pgx.Tx, id, agentID, senderEmail, recipient, emailMessageID, subject, conversationID, deliveryStatus string, rawMessage []byte, authHeaders map[string]string, toRecipients, cc, replyTo []string) (*Message, error)
CreateInboundMessageInTx writes the messages row inside the caller's transaction. Used by the slice-3 relay refactor (per design §4.2) so the messages INSERT and the webhook_events outbox INSERT commit together, closing the at-least-once publish-loss window.
Mirrors the CreateAgentTx pattern at store.go:596-607 — same SQL body, executed against either *pgxpool.Pool or pgx.Tx via the messageExecutor interface below.
func (*Store) CreateOrGetUser ¶
func (*Store) CreateOutboundMessage ¶
func (s *Store) CreateOutboundMessage(ctx context.Context, agentID string, toRecipients []string, cc []string, bcc []string, subject, msgType, method, providerMessageID, conversationID string) (*Message, error)
CreateOutboundMessage stores an outbound message with multi-recipient support. The recipient param is kept for backward compat with the singular recipient column; toRecipients, cc, and bcc are the canonical outbound-only multi-recipient fields.
func (*Store) CreatePendingOutboundMessage ¶
func (s *Store) CreatePendingOutboundMessage(ctx context.Context, agentID string, toRecipients, cc, bcc []string, subject, bodyText, bodyHTML string, attachmentsJSON []byte, msgType, conversationID, replyToEmailMessageID string, ttlSeconds int) (*Message, error)
CreatePendingOutboundMessage stores a fully composed outbound email in pending_approval status, including body_text, body_html, and attachments so that approval can reconstruct the original SendRequest (or accept edits) without the caller needing to retain it. ttlSeconds sets how long the message remains pending before the expiration worker resolves it.
replyToEmailMessageID is the RFC 5322 Message-ID of the inbound being replied to (e.g. "<abc@gmail.com>"), or "" for fresh sends and test emails. It reuses the email_message_id column, which is unused for outbound rows in every other path — the column semantically carries "the Message-ID this row references" in both directions.
attachmentsJSON must be a JSON array matching the public Attachment shape ([{filename, content_type, data}, ...]) or nil. Callers that already have an []outbound.Attachment slice should json.Marshal it before passing in.
func (*Store) CreateSigningSecret ¶ added in v0.3.0
func (s *Store) CreateSigningSecret(ctx context.Context, userID, name string) (*SigningSecret, error)
CreateSigningSecret mints a new secret for the user. The plaintext secret value is set on the returned struct exactly once; subsequent reads (List/Get) only see the prefix.
Returns ErrSigningSecretCapReached if the user is already at MaxSigningSecretsPerUser. Race-free under concurrent callers via the per-user advisory lock.
Empty `name` is normalized server-side to "unnamed" so the dashboard always has something to display.
func (*Store) CreateUserSession ¶
func (*Store) CreateWebhook ¶ added in v0.3.0
func (s *Store) CreateWebhook(ctx context.Context, userID, url, description string, events []string, filters WebhookFilters) (*Webhook, error)
CreateWebhook inserts a new row and returns it with the plaintext signing secret populated. The plaintext is only available on this response — subsequent GET/list calls scrub it.
Filters validation (charset, count caps, agent ownership) is the handler's job in slice 2; the storage layer only verifies the per-user count cap from account_limits.max_webhooks.
func (*Store) DeleteAPIKey ¶
func (*Store) DeleteAgent ¶
func (*Store) DeleteDomain ¶
DeleteDomain deletes a domain only if owned by the user. The handler should check for existing agents first.
func (*Store) DeleteExpiredMessages ¶
func (*Store) DeleteExpiredUserSessions ¶
func (*Store) DeleteSigningSecret ¶ added in v0.3.0
DeleteSigningSecret removes a secret. Refuses to delete the user's last secret — every user must keep at least one so webhooks remain verifiable. Race-free under concurrent callers via the per-user row lock.
Check order matters: ownership first (so an attacker probing IDs they don't own gets 404, not "cannot delete last" leaking that the caller has only 1 secret), then the floor.
func (*Store) DeleteUserData ¶
DeleteUserData wipes everything tied to a user in a single transaction.
Schema cascades cover most of it (user_sessions, domains, agent_identities, api_keys, usage_summaries all `ON DELETE CASCADE` from users; messages cascade through agent_identities; webhook_deliveries cascade through messages). The one row that doesn't is usage_events: its FK is `ON DELETE SET NULL` so analytics survives, which we explicitly override here for full deletion.
Per-table counts are returned to the caller so an operator can attest to what was removed in audit / compliance contexts.
func (*Store) DeleteUserSession ¶
func (*Store) DeleteWebhook ¶ added in v0.3.0
DeleteWebhook removes a webhook owned by the user. Idempotent: deleting a non-existent or cross-user webhook returns ErrWebhookNotFound, never silently succeeds. The ON DELETE CASCADE on webhook_subscriber_deliveries.webhook_id drops pending delivery rows automatically — no separate cleanup needed.
Slice 1 includes this method (rather than deferring to slice 2's handler work) because tests need it for setup teardown and the implementation is trivial.
func (*Store) EnsureSharedDomain ¶
EnsureSharedDomain inserts a system row for the configured shared mail domain so slug-based agent registration can satisfy the agent_identities.domain → domains.domain foreign key. The row is owned by no user (user_id = NULL) and pre-verified — it represents infrastructure the operator runs, not user-claimed identity.
Called once at server startup. Idempotent via ON CONFLICT DO NOTHING, and a no-op when the operator has not configured a shared domain. Without this, any deployment whose shared_domain differs from the hardcoded migration seed (`agents.e2a.dev`) gets an FK violation the first time a user tries to register a slug-based agent.
func (*Store) EnsureUserHasSigningSecret ¶ added in v0.3.0
EnsureUserHasSigningSecret guarantees the user has at least one signing secret, creating a "default" one if not. Idempotent. Concurrent callers serialize via the per-user advisory lock so we can't accidentally insert two "default" rows.
func (*Store) ExpireApproveAndSend ¶
func (s *Store) ExpireApproveAndSend( ctx context.Context, messageID string, send func(msg *Message) (SendResult, error), ) (*Message, error)
ExpireApproveAndSend is the worker-side counterpart to ApproveAndSend: no user ownership check (the caller is the expiration worker, which is system-scoped), SELECT ... FOR NO KEY UPDATE SKIP LOCKED so concurrent workers don't race for the same row, and the terminal status is 'expired_approved' instead of 'sent'. On send failure the transaction rolls back; the worker should then call ExpireReject to move the row to a final state so the row doesn't get picked up on every sweep.
Exactly-once guarantee: like ApproveAndSend, this method runs the send() callback under a send_attempts gate so a crash between SES acceptance and the surrounding tx commit does NOT cause the next worker poll to re-send. ClaimSendAttempt / MarkSendSucceeded / MarkSendFailed run in separate small transactions that outlive the approval tx; on retry, an AlreadySent verdict reuses the cached SendResult and skips the upstream send entirely. Without this, the polling-loop nature of the worker would guarantee a re-send on any commit failure — strictly worse than the human-approval path, where a re-send needs an explicit click.
SKIP LOCKED means multiple app instances can run the worker without contending on the same row. The row-level FOR NO KEY UPDATE lock on messages is held for the duration of the send callback (bounded by SMTPRelay timeouts); FOR NO KEY UPDATE rather than FOR UPDATE so the send_attempts INSERT in a separate connection can acquire its KEY SHARE lock for FK enforcement — see ApproveAndSend's docstring for the full rationale.
If a concurrent worker is mid-send for the same row (the send_attempts row is 'attempting' and not yet stale), returns ErrSendInProgress. The worker loop should treat this like ErrNotPendingApproval — skip silently and let the next poll handle it.
func (*Store) ExpireReject ¶
ExpireReject transitions a pending_approval message to expired_rejected and scrubs body columns. No user ownership check — this is the worker path. If the row is no longer pending (racing worker, already handled) returns ErrNotPendingApproval; caller can treat as a no-op.
func (*Store) ExportUserData ¶
ExportUserData gathers everything a user owns into a single struct for the right-of-access flow. Reads run inside a REPEATABLE READ transaction so the snapshot is internally consistent even if writes arrive while the export is being assembled.
func (*Store) GetAgentByEmail ¶
GetAgentByEmail looks up an agent by email address (same as GetAgentByID since ID = email).
func (*Store) GetAgentByID ¶
GetAgentByID looks up an agent by its ID (full email) with domain verification status.
func (*Store) GetConversationByID ¶ added in v0.3.0
func (s *Store) GetConversationByID(ctx context.Context, agentID, conversationID string) (*ConversationDetail, error)
GetConversationByID returns the aggregate summary fields plus every member message, ordered oldest-first (chronological reading order). Returns ErrMessageNotFound when no non-expired messages exist for the given (agentID, conversationID) — mirrors the "looks-like-not-found-on-cross-agent" convention used by single- message reads. The same code path handles "wrong agent" and "real non-existent": either way the agent has no business seeing it.
Participants are computed as the union of sender + recipient + each row's to_recipients / cc / bcc (when populated). Empty strings are dropped. Labels are the union of all members' labels[]; both are sorted lexicographically for stable output.
func (*Store) GetDKIMKeyInternal ¶ added in v0.3.0
GetDKIMKeyInternal returns the stored selector + private key bytes for a domain. The "Internal" suffix is load-bearing: this function does NOT scope by user — it takes a domain name and returns whoever owns that domain's signing key. ONLY call from server-internal codepaths where the domain has already been resolved from a trusted source (e.g. an outbound message's sender field, after the agent layer has authenticated the owner). A handler that ever takes a user-supplied domain string and feeds it to this function becomes a "sign as anyone" primitive: don't.
Returns ("", nil, nil) when the domain has no key — callers MUST treat this as "skip signing" and fall back to whatever the relay-level fallback does.
func (*Store) GetDashboardStats ¶ added in v0.3.0
func (s *Store) GetDashboardStats(ctx context.Context, userID string, windowDays int) (*DashboardStats, error)
GetDashboardStats returns workspace-level aggregates for the authenticated user, with a configurable lookback window. windowDays controls Inbound/Outbound totals AND the delivery-success ratio's sample period — passing 0 falls back to DashboardDefaultWindowDays (7), values above DashboardMaxWindowDays (90) are clamped.
Three independent reads — kept separate because the source tables have different indexes and one slow read shouldn't slow the others. All reads are O(rows-for-this-user-only) thanks to the existing per-user indexes.
Robust to missing data: deployments without usage tracking enabled (E2A_USAGE_TRACKING=false — the default for self-hosters) return zero counts rather than erroring. Same for users who have no messages yet. The UI renders zero values as "—".
Delta percentages: today vs yesterday on usage_summaries. Avoids divide-by-zero when yesterday was zero by returning 0. 100% in/de- crease maps to ±100; values clipped at ±999 for integer width.
func (*Store) GetInboundByEmailMessageID ¶ added in v0.3.0
func (s *Store) GetInboundByEmailMessageID(ctx context.Context, agentID, emailMessageID string) (*Message, error)
GetInboundByEmailMessageID looks up an inbound message by its RFC 5322 Message-ID for the given agent. Used by HITL flows to reach the parent inbound at approval time so the References chain can be rebuilt — the pending-outbound row only stores the parent's Message-ID, not its raw message. Scoped to agent_id to prevent any cross-agent reach across shared infra. Returns sql.ErrNoRows when the inbound has expired or was never persisted; callers must tolerate that and fall back to legacy single-id threading.
auth_headers is included in the SELECT so HITL review handlers can surface SPF/DKIM/DMARC provenance on the reply-context pane.
func (*Store) GetInboundMessage ¶
func (*Store) GetMessageWithContent ¶
func (s *Store) GetMessageWithContent(ctx context.Context, messageID, agentID string) (*Message, error)
GetMessageWithContent returns a full message including raw_message and auth_headers. Marks the message as 'read' if it was 'unread'.
func (*Store) GetMessagesByAgent ¶
GetMessagesByAgent returns messages for an agent, filtered by status, direction, and the optional search filters on the MessageListFilter struct.
- direction: "inbound" (default for SDK polling), "outbound", or "all" (used by the dashboard inbox).
- status: "unread" | "read" | "all" — only applies when direction selects inbound rows; ignored on pure outbound queries.
- descending: cursor walks newest→oldest when true; oldest→newest when false (FIFO polling).
- From / SubjectContains: case-insensitive substring (ILIKE).
- ConversationID: exact match.
- Since / Until: time-range bracket on created_at.
The SELECT includes columns both consumers need: the inbox needs `status` (outbound HITL lifecycle), `webhook_status`/`last_error` (outbound delivery), and `octet_length(raw_message)` (size column); the polling SDK ignores these fields and reads only the existing inbound-relevant ones from the Message struct.
func (*Store) GetOutboundMessageForUser ¶
func (s *Store) GetOutboundMessageForUser(ctx context.Context, messageID, userID string) (*Message, error)
GetOutboundMessageForUser returns a full message row (including body, HITL fields, and attachments) if it exists and is owned by userID (via the agent the row belongs to). Inbound messages and cross-user access both return ErrMessageNotFound — the caller should not be able to distinguish "does not exist" from "belongs to someone else".
func (*Store) GetUserByAPIKey ¶
GetUserByAPIKey authenticates a bearer token and returns the owning user. Rejects revoked keys and time-expired keys; touches last_used_at only on the success path so the column stays a real "last successful authentication" signal (rather than "last attempt").
Expiration semantics: expires_at IS NULL means the key never expires (preserves the pre-migration default). A non-null expires_at must be in the future, evaluated against now() in the same query so there's no clock skew between row read and check.
func (*Store) GetUserByID ¶
func (*Store) GetUserSession ¶
func (*Store) GetUserSigningSecrets ¶ added in v0.3.0
func (s *Store) GetUserSigningSecrets(ctx context.Context, userID string) ([]SigningSecretWithValue, error)
GetUserSigningSecrets returns the plaintext secret values for a user (paired with their IDs), most-recent-first. The relay signs with [0] and asynchronously updates last_signed_at on that ID. The HITL token verifier tries each Secret in turn. Caller must NOT log the Secret values.
func (*Store) GetWebhookByID ¶ added in v0.3.0
GetWebhookByID returns the webhook iff it's owned by userID. Cross- user reads (or missing rows) return ErrWebhookNotFound — same not-found-on-cross-user convention used elsewhere in the codebase (conversation reads, message reads).
The returned Webhook has SigningSecret populated for the delivery worker's benefit; the public API layer scrubs this field before responding to GETs.
func (*Store) GetWebhookByIDInternal ¶ added in v0.3.0
GetWebhookByIDInternal returns the webhook by ID with no ownership check. INTERNAL USE ONLY — handler code MUST use GetWebhookByID which scopes by user_id. The retry worker uses this to look up the URL + signing secret for a delivery row whose ownership was already established when the publisher inserted it.
The suffix Internal mirrors the convention in dkim.GetDKIMKeyInternal: a method name that calls out "skipping the standard authorization check" so a reviewer doesn't have to read the body to know why.
func (*Store) HasAgentsOnDomain ¶
HasAgentsOnDomain checks whether the owned domain still has agents.
func (*Store) ListAPIKeys ¶
func (*Store) ListActivityByAgent ¶
func (*Store) ListAgentsByUser ¶
ListAgentsByUser returns all agents owned by the user, joined with domain verification AND enriched with per-agent stats for the dashboard. Five correlated subqueries compute inbound/outbound 7-day counts, pending approvals, last delivery, and webhook health in a single round-trip. Other load paths (GetAgentByID, GetAgentByEmail) intentionally don't compute these — only the dashboard needs them.
func (*Store) ListConversationsByAgent ¶ added in v0.3.0
func (s *Store) ListConversationsByAgent(ctx context.Context, f ConversationListFilter) ([]ConversationSummary, error)
ListConversationsByAgent groups the agent's non-expired messages by conversation_id and returns one row per conversation sorted by most-recent activity. Messages without a conversation_id are not included in any conversation — they remain individually visible via GetMessagesByAgent.
func (*Store) ListDomainsByUser ¶
ListDomainsByUser returns all domains owned by the user (excludes system rows). AgentCount is computed inline via a correlated subquery — one round-trip regardless of how many domains the user has, and the per-row count is cheap because (agent_identities.user_id, agent_identities.domain) is indexed via the existing idx_agent_identities_user.
func (*Store) ListEnabledWebhooksForRouting ¶ added in v0.3.0
func (s *Store) ListEnabledWebhooksForRouting(ctx context.Context, userID, eventType string) ([]Webhook, error)
ListEnabledWebhooksForRouting is the hot-path query used by the event publisher. Returns enabled webhooks for the user that subscribe to the given event type. The in-process publisher then applies filter matching in Go (cheaper than encoding the AND-across-types + OR-within-type rule in SQL at slice-1 scale).
The partial index idx_webhooks_user_enabled WHERE enabled = true keeps this O(log n) on the common case (a user has a small number of enabled webhooks).
func (*Store) ListExpiredPending ¶
ListExpiredPending returns pending_approval messages whose approval_expires_at is in the past, joined with their agent's hitl_expiration_action. Ordered by approval_expires_at ASC so earliest-expired are handled first.
func (*Store) ListPendingOutboundForUser ¶
func (s *Store) ListPendingOutboundForUser(ctx context.Context, userID string, limit int) ([]Message, error)
ListPendingOutboundForUser returns pending-approval messages across all of the user's agents, sorted by approval_expires_at ASC (expiring-soonest first). Body columns are not returned from this path — callers should use GetOutboundMessageForUser for detail.
func (*Store) ListSigningSecrets ¶ added in v0.3.0
ListSigningSecrets returns the user's secrets in most-recent-first order. Populates both Secret (plaintext) and SecretPrefix; callers that build a list shape for the dashboard get to choose which to surface.
func (*Store) ListWebhooksByUser ¶ added in v0.3.0
ListWebhooksByUser returns every webhook owned by the user — used by the slice-2 GET /api/v1/webhooks endpoint. Storage layer surfaces enabled and disabled rows alike; filter at the handler if needed.
func (*Store) LookupConversationID ¶
func (s *Store) LookupConversationID(ctx context.Context, agentID string, messageIDs []string) (string, error)
LookupConversationID finds a conversation_id by matching In-Reply-To / References message IDs against stored messages. Checks both email_message_id (inbound) and provider_message_id (outbound). Uses prefix matching because SES bare IDs stored in provider_message_id (e.g. <010f...>) may lack the @region.amazonses.com suffix that appears in the actual email headers sent to recipients.
func (*Store) LookupDomain ¶
LookupDomain returns a domain if it exists and is owned by the given user.
func (*Store) MarkSendFailed ¶ added in v0.3.0
MarkSendFailed records that the upstream send failed for messageID, so the next ClaimSendAttempt is allowed to take over and retry. Idempotent against double-call: only updates rows still 'attempting'.
func (*Store) MarkSendSucceeded ¶ added in v0.3.0
MarkSendSucceeded records the result of a successful upstream send. Idempotent against double-call: only updates rows still 'attempting' so a stray re-Mark from a buggy caller cannot overwrite a previous success or revive a failed row.
func (*Store) MaxWebhooksForUser ¶ added in v0.3.0
func (*Store) ModifyMessageLabels ¶ added in v0.3.0
func (s *Store) ModifyMessageLabels(ctx context.Context, messageID, agentID string, add, remove []string) ([]string, error)
ModifyMessageLabels applies a delta — add then remove — to a message's labels[] in a single atomic statement. Returns the updated labels (deduplicated, sorted) so the caller can echo them back in the response without a second round-trip.
Inputs are assumed already normalized (lowercased, charset-validated, dedup'd within each list, e2a:* gated). The store layer:
- applies adds first, then removes (so a label in both lists ends up removed)
- rejects if the post-add total would exceed MaxLabelsPerMessage
- returns ErrMessageNotFound if the row is missing / expired / cross-agent
The whole thing runs as one UPDATE so a concurrent PATCH from a second client can't observe a partial state.
func (*Store) RejectPending ¶
func (s *Store) RejectPending(ctx context.Context, messageID, userID, reason string) (*Message, error)
RejectPending transitions a pending_approval message to rejected, records the reviewer's reason (empty string allowed), and scrubs body_text / body_html / attachments_json. Ownership checked; missing rows return ErrMessageNotFound. Non-pending rows return ErrNotPendingApproval.
func (*Store) ResolveOutboundOwner ¶
func (s *Store) ResolveOutboundOwner(ctx context.Context, messageID string) (userID, agentID string, err error)
ResolveOutboundOwner looks up the user_id and agent_id for an outbound message without requiring the caller to know the user_id up-front. It exists for token-authenticated paths (magic-link approve/reject) where the HMAC token itself is the authorization and the handler just needs enough context to dispatch into the existing user-scoped store methods.
Returns ErrMessageNotFound if the message doesn't exist or isn't outbound. The returned user_id is guaranteed to own the message's agent (via the agent_identities.user_id join).
func (*Store) RotateSecret ¶ added in v0.3.0
func (s *Store) RotateSecret(ctx context.Context, webhookID, userID string) (newPlaintext string, prevExpiresAt time.Time, err error)
RotateSecret generates a new signing secret, moves the current secret into signing_secret_prev with a 24h expiry, and returns the new plaintext (shown once). During the 24h grace window the delivery worker dual-signs each request so receivers can verify with either secret while they update their handler.
func (*Store) SetDomainPrimary ¶ added in v0.3.0
SetDomainPrimary marks a domain as the user's primary in a single transaction: first clear any other primary belonging to the user, then set the requested domain. The partial unique index in migration 013 makes the clear-first step necessary — otherwise the two writes would race and one would fail with a unique violation.
Returns ErrDomainNotFound when the domain doesn't exist or isn't owned by the user.
func (*Store) TouchDomainLastChecked ¶ added in v0.3.0
TouchDomainLastChecked records that the verification probe ran. Call this from POST /api/v1/domains/{domain}/verify whether the probe succeeded or not — the LastCheckedAt column is "when did we last try", not "when did we last succeed" (the latter is verified_at).
func (*Store) TouchSigningSecretLastSigned ¶ added in v0.3.0
TouchSigningSecretLastSigned records that the relay used this secret to sign a payload. Best-effort — failure is logged but does not block the actual signing operation.
func (*Store) UpdateAgentHITL ¶
func (s *Store) UpdateAgentHITL(ctx context.Context, agentID, userID string, enabled bool, ttlSeconds int, expirationAction string) error
UpdateAgentHITL updates all three HITL settings on an agent owned by userID. The TTL and expiration action are validated against the same rules as the DB CHECK constraints so callers get a clean error rather than a raw SQL error.
func (*Store) UpdateAgentMode ¶
func (*Store) UpdateAgentWebhook ¶
func (*Store) UpdateMessageDeliveryStatus ¶
func (s *Store) UpdateMessageDeliveryStatus(ctx context.Context, messageID, agentID, status string) error
UpdateMessageDeliveryStatus sets the inbox_status on a message.
func (*Store) UpdateUserName ¶ added in v0.3.0
UpdateUserName persists a new display name on the user row and returns the updated User. Input validation (length, whitespace) is the caller's responsibility — this layer only enforces that the row exists.
func (*Store) UpdateWebhook ¶ added in v0.3.0
func (s *Store) UpdateWebhook(ctx context.Context, webhookID, userID string, u WebhookUpdate) (*Webhook, error)
UpdateWebhook applies a partial update to a webhook. Only fields with a non-nil pointer in WebhookUpdate are touched. Returns the updated row.
Validation (charset, count caps, agent ownership) is the handler's job; the storage layer enforces only the re-enable cooldown and the per-row CHECK constraints (events non-empty, url non-empty).
func (*Store) VerifyDomain ¶
VerifyDomain marks a domain as verified, only if owned by the given user.
func (*Store) WithTx ¶ added in v0.3.0
WithTx opens a transaction, runs fn inside it, and commits if fn returns nil (or rolls back if fn returns an error). Used by the slice-3 relay refactor so the messages INSERT and the webhook_events outbox INSERT commit together, closing the at-least-once publish-loss window.
The relay handler is the primary v1 caller; future trigger sites (slice 4 outbound + HITL) reuse the same helper. Keeps callers from having to import pgxpool directly.
type UsageEventEntry ¶
type UsageEventEntry struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
Domain string `json:"domain"`
Direction string `json:"direction"`
EventType string `json:"event_type"`
CreatedAt time.Time `json:"created_at"`
} // @name UsageEventEntry
UsageEventEntry is one row of the usage_events table for the user.
type UserExport ¶
type UserExport struct {
GeneratedAt time.Time `json:"generated_at"`
SchemaVersion string `json:"schema_version"`
User UserExportUser `json:"user"`
Domains []Domain `json:"domains"`
Agents []AgentIdentity `json:"agents"`
APIKeys []APIKeyExportEntry `json:"api_keys"`
Messages []Message `json:"messages"`
UsageEvents []UsageEventEntry `json:"usage_events,omitempty"`
OAuthConnections []OAuthConnectionEntry `json:"oauth_connections,omitempty"`
} // @name UserExport
UserExport is the structured dump returned by ExportUserData. It's designed to be a complete, machine-readable record of everything a single user owns in the system — the right-of-access counterpart to DeleteUserData below. Sensitive secrets are deliberately excluded:
- API key plaintexts are not stored anywhere (only hashes), so they can't be exported even in principle.
- google_subject is an internal OAuth identifier with no value to the user and is omitted on purpose.
- User session tokens are transient and excluded.
type UserExportUser ¶
type UserExportUser struct {
ID string `json:"id"`
Email string `json:"email"`
Name string `json:"name"`
CreatedAt time.Time `json:"created_at"`
} // @name UserExportUser
UserExportUser mirrors User but omits the google_subject internal identifier from the export payload.
type Webhook ¶ added in v0.3.0
type Webhook struct {
ID string `json:"id"`
UserID string `json:"user_id"`
URL string `json:"url"`
Description string `json:"description"`
Events []string `json:"events"`
Filters WebhookFilters `json:"filters"`
SigningSecret string `json:"signing_secret,omitempty"`
SigningSecretPrev string `json:"-"`
SigningSecretPrevExpiresAt *time.Time `json:"-"`
Enabled bool `json:"enabled"`
AutoDisabledAt *time.Time `json:"auto_disabled_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
LastDeliveredAt *time.Time `json:"last_delivered_at,omitempty"`
}
Webhook is one row in the webhooks table.
SigningSecret carries the plaintext secret. It's populated on CreateWebhook responses (the caller's one chance to see the secret) and read by the delivery worker when signing the X-E2A-Signature header. Public API GET endpoints in slice 2 will scrub this field before responding so a stolen API key cannot exfiltrate webhook secrets via list/get.
SigningSecretPrev + SigningSecretPrevExpiresAt hold the previous secret during the 24h rotation grace window; slice 4 dual-signs using both during that window so receivers can roll forward.
type WebhookFilters ¶ added in v0.3.0
type WebhookFilters struct {
AgentIDs []string `json:"agent_ids,omitempty"`
ConversationIDs []string `json:"conversation_ids,omitempty"`
Labels []string `json:"labels,omitempty"`
}
WebhookFilters is the structured form of webhooks.filters JSONB. Empty / nil slices mean "no constraint of that type" — a webhook with all-empty filters is a cross-cutting subscriber that matches every event of the right type for the owning user.
type WebhookUpdate ¶ added in v0.3.0
type WebhookUpdate struct {
URL *string
Description *string
Events *[]string
Filters *WebhookFilters
Enabled *bool
}
WebhookUpdate carries the fields a PATCH can change. All fields are pointers (or "set-or-leave" flags) so handlers can distinguish "field present, set to X" from "field not present, leave unchanged".
Per the design, url / events / filters are full-replace fields (the sent value is canonical when present). Enabled is a toggle. Re-enable has a 5-minute cooldown — UpdateWebhook returns ErrWebhookCooldown when the caller tries to flip Enabled true within 5 minutes of auto_disabled_at.