identity

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HITLMaxTTLSeconds        = 604800 // 7 days
	HITLDefaultTTLSeconds    = 604800
	HITLExpirationApprove    = "approve"
	HITLExpirationReject     = "reject"
	HITLDefaultExpirationAct = HITLExpirationReject
)

HITL constants mirror the CHECK constraints in migration 003_hitl.sql.

View Source
const (
	MessageStatusSent            = "sent"
	MessageStatusPendingApproval = "pending_approval"
	MessageStatusRejected        = "rejected"
	MessageStatusExpiredApproved = "expired_approved"
	MessageStatusExpiredRejected = "expired_rejected"
)

Message status values mirror the CHECK constraint in migration 003_hitl.sql.

View Source
const (
	AutoDisableThreshold = 10
	AutoDisableWindow    = 72 * time.Hour
)

AutoDisableThreshold is the consecutive-failed-events count over AutoDisableWindow that trips a webhook into the auto-disabled state. Tuned per design decision #12 (10 / 72h). The reviewer can re-enable via PATCH after the 5-min cooldown.

View Source
const ConversationListHardCap = 100

ConversationListHardCap is the maximum number of conversations a single list call returns. Higher requests are silently clamped. 100 covers the inbox-style use case; a deployment that needs more can either ask for higher (we'll bump it) or paginate (slice 2).

View Source
const DashboardDefaultWindowDays = 7

DashboardDefaultWindowDays is the lookback for the dashboard strip when the caller doesn't request a specific window.

View Source
const DashboardMaxWindowDays = 90

DashboardMaxWindowDays caps the lookback to keep the underlying SQL scan bounded. 90 days is generous for any UI surface we currently have and remains efficient given the per-user index on usage_summaries.

View Source
const DefaultMaxWebhooks = 50

MaxWebhooksForUser returns the per-user cap from account_limits. Users without an account_limits row default to 50 — the column DEFAULT, mirrored here as a fallback so the cap works on dev installs that haven't seeded an account_limits row.

View Source
const MaxLabelsPerMessage = 100

MaxLabelsPerMessage is the post-add cap on the labels[] column. The per-operation cap (max items in add_labels / remove_labels) is enforced earlier at the handler. The two together bound the array at a size where GIN containment + JSON marshalling stay cheap.

View Source
const MaxSigningSecretsPerUser = 5

MaxSigningSecretsPerUser caps how many active signing secrets a user can hold at once. Two slots covers the standard rotation flow (create new, swap, delete old); a hard cap higher than that mostly catches runaway scripts. Easy to raise later if real users need more.

View Source
const MessageTTL = 10 * 24 * time.Hour // 10 days

MessageTTL is the per-row lifetime for `messages`. The janitor at DeleteExpiredMessages drops rows whose expires_at has passed.

10 days is chosen to strictly exceed HITLMaxTTLSeconds (7 days) with a 3-day buffer. The buffer guarantees:

  • the HITL worker (60s cadence) always wins the race against the messages janitor (hourly cadence) on max-HITL pending rows;
  • terminal HITL rows retain ≥3 days of post-resolution audit visibility before the metadata row is dropped;
  • reply-composition can load a parent inbound up to 10 days old for quoting context.

If HITLMaxTTLSeconds is ever raised, raise this too — keep MessageTTL > HITLMaxTTLSeconds by at least 1 day.

View Source
const SendAttemptStaleWindow = 10 * time.Minute

SendAttemptStaleWindow is how long an 'attempting' send_attempts row stays "owned" by the original worker before the next caller is allowed to take it over. Bounded above by outbound.SMTPRelay's worst-case retry envelope (~6.5min) plus headroom — kept tighter would risk concurrent SES sends if a real upstream stall happened.

View Source
const SessionTTL = 7 * 24 * time.Hour

Variables

View Source
var (
	ErrSigningSecretCapReached       = fmt.Errorf("at most %d signing secrets per user; delete one before creating another", MaxSigningSecretsPerUser)
	ErrCannotDeleteLastSigningSecret = errors.New("cannot delete the last signing secret; create a new one first")
	ErrSigningSecretNotFound         = errors.New("signing secret not found or not owned by user")
)

Sentinel errors so API handlers can map error → HTTP status with errors.Is rather than string-matching the message text. Tests can also assert against them directly.

View Source
var (
	ErrWebhookNotFound   = errors.New("webhook not found")
	ErrWebhookCapReached = errors.New("webhook count limit reached for this user")
)

Sentinel errors so API handlers can map error → HTTP status with errors.Is rather than string-matching.

View Source
var ErrDomainHasAgents = fmt.Errorf("cannot delete domain: agents still exist")

ErrDomainHasAgents is returned when a domain delete is blocked by existing agents.

View Source
var ErrDomainNotFound = fmt.Errorf("domain not found or not owned by user")

ErrDomainNotFound is returned when a domain is not found or not owned by the user.

View Source
var ErrLabelLimitExceeded = errors.New("label limit exceeded")

ErrLabelLimitExceeded reports that an add operation would push a message past MaxLabelsPerMessage. Mapped to HTTP 400 at the handler.

View Source
var ErrMessageNotFound = fmt.Errorf("message not found")

ErrMessageNotFound is returned when a message is not found for the given user (either the ID doesn't exist or the message belongs to another user's agent). Handlers map this to HTTP 404.

View Source
var ErrNotPendingApproval = fmt.Errorf("message is not pending approval")

ErrNotPendingApproval is returned when an approve or reject operation targets a message that is not (or is no longer) in pending_approval status. Handlers map this to HTTP 409 Conflict.

View Source
var ErrSendInProgress = errors.New("send already in progress for this message")

ErrSendInProgress is returned by ApproveAndSend (and the underlying ClaimSendAttempt) when a concurrent attempt for the same message is still in-flight at the SES layer. Callers should treat this as transient — the in-flight caller will either commit (the next retry sees status='sent' and replays) or time out (the row goes stale and the next retry takes over).

View Source
var ErrWebhookCooldown = errors.New("webhook was auto-disabled within the last 5 minutes; wait before re-enabling")

ErrWebhookCooldown is returned when a PATCH would re-enable a webhook that was auto-disabled within the last 5 minutes. Slice 4 adds the auto-disable worker; this error type lands now so the handler doesn't need to map magic strings later.

Functions

func NewMessageID

func NewMessageID() string

NewMessageID returns a fresh internal message ID. Callers can use this to generate the ID up-front when they need it before storing — for example, the SMTP relay generates the ID before signing auth headers so the ID is part of the canonical string fed to HMAC.

func NormalizeEmail added in v0.3.0

func NormalizeEmail(email string) string

NormalizeEmail returns the canonical lookup form of an email address: lower-cased, with surrounding whitespace stripped. Every external-input email used as a lookup key (path vars, form fields, OAuth consent choices, WebSocket subscriptions) must funnel through this so case variants ("Alice@x.com" vs "alice@x.com") resolve to the same row.

Per RFC 5321 §2.4 the local-part is technically case-sensitive — a small number of providers (most famously ProtonMail) preserve case. We collapse it anyway because consistency across HTTP path, JSON body, SMTP envelope, and dashboard URL is more important than spec purity for the agent-inbox use case. Document this if a user complains.

func RunMigrations added in v0.3.0

func RunMigrations(ctx context.Context, pool *pgxpool.Pool, migrationsFS fs.FS, mode MigrationMode) error

RunMigrations applies every embedded migration that isn't yet recorded in the schema_migrations table. Migrations run in filename-sorted order, each in its own transaction (unless tagged with the "e2a:no-transaction" directive); on the first error the function returns without applying later migrations.

A Postgres session advisory lock serializes concurrent invocations from rolling restarts and multi-instance deploys. The lock is held for the duration of the function and auto-released on session disconnect (so a crashed binary doesn't leave the DB stuck).

All migrations should be written idempotent (CREATE/ALTER ... IF NOT EXISTS) so even-without-the-lock re-runs are harmless. The tracker is the source of truth for "should we attempt to run this one again"; idempotence + the lock are layered safety nets.

func ValidateHITLConfig

func ValidateHITLConfig(ttlSeconds int, expirationAction string) error

ValidateHITLConfig returns an error if the TTL or expiration action is invalid. The DB CHECK constraints are the final guard; this mirrors them for a clean, pre-query error path.

Types

type APIKey

type APIKey struct {
	ID           string    `json:"id"`
	UserID       string    `json:"user_id"`
	Name         string    `json:"name"`
	KeyPrefix    string    `json:"key_prefix"`
	PlaintextKey string    `json:"key,omitempty"` // only set once at creation, never stored
	CreatedAt    time.Time `json:"created_at"`
	// LastUsedAt is updated by GetUserByAPIKey on every successful
	// AuthenticateRequest. NULL on keys that have never been used.
	LastUsedAt *time.Time `json:"last_used_at,omitempty"`
	// ExpiresAt is the optional hard expiry. AuthenticateRequest rejects
	// keys whose expires_at has passed. NULL means "never expires"
	// (the backward-compatible default).
	ExpiresAt *time.Time `json:"expires_at,omitempty"`
}

type APIKeyExportEntry

type APIKeyExportEntry struct {
	ID         string     `json:"id"`
	Name       string     `json:"name"`
	KeyPrefix  string     `json:"key_prefix"`
	CreatedAt  time.Time  `json:"created_at"`
	LastUsedAt *time.Time `json:"last_used_at,omitempty"`
	RevokedAt  *time.Time `json:"revoked_at,omitempty"`

} // @name APIKeyExportEntry

APIKeyExportEntry is the API-key shape included in the export. We expose only metadata; the hash itself stays internal because (a) it's not useful to the user, (b) it's a credential equivalent for offline dictionary attacks if leaked.

type AgentIdentity

type AgentIdentity struct {
	ID                   string    `json:"id"`
	Domain               string    `json:"domain"`
	Email                string    `json:"email"`
	Name                 string    `json:"name"`
	WebhookURL           string    `json:"webhook_url"`
	AgentMode            string    `json:"agent_mode"`
	DomainVerified       bool      `json:"domain_verified"`
	Public               bool      `json:"public"`
	CreatedAt            time.Time `json:"created_at"`
	UserID               string    `json:"user_id"`
	HITLEnabled          bool      `json:"hitl_enabled"`
	HITLTTLSeconds       int       `json:"hitl_ttl_seconds"`
	HITLExpirationAction string    `json:"hitl_expiration_action"`
	// Dashboard enrichment fields. Computed at read
	// time by ListAgentsByUser via correlated subqueries — other load
	// paths (GetAgentByID / GetAgentByEmail) leave them at zero values,
	// same pattern as Domain.AgentCount. Switch to denormalized columns
	// if the read cost ever bites.
	Inbound7d      int        `json:"inbound_7d"`
	Outbound7d     int        `json:"outbound_7d"`
	PendingCount   int        `json:"pending_count"`
	LastDeliveryAt *time.Time `json:"last_delivery_at,omitempty"`
	// WebhookHealthy is false iff there's been a failed webhook delivery
	// in the last 24h. Defaults to true for agents with no deliveries
	// yet — avoids painting fresh agents red. Meaningless for
	// agent_mode='local'; the frontend hides the badge in that case.
	WebhookHealthy bool `json:"webhook_healthy"`
}

func (*AgentIdentity) ActualDomain

func (a *AgentIdentity) ActualDomain() string

ActualDomain returns the DNS domain for the agent.

func (*AgentIdentity) EmailAddress

func (a *AgentIdentity) EmailAddress() string

EmailAddress returns the agent's email address (always the ID).

func (*AgentIdentity) IsSharedDomain

func (a *AgentIdentity) IsSharedDomain(sharedDomain string) bool

IsSharedDomain returns true if the agent's domain matches the configured shared domain (the host that backs slug-based registration). When sharedDomain is empty, the deployment has slug registration disabled and no agent can be on the shared domain.

type ConversationDetail added in v0.3.0

type ConversationDetail struct {
	ConversationSummary
	Participants []string  `json:"participants"`
	Labels       []string  `json:"labels"`
	Messages     []Message `json:"messages"`
}

ConversationDetail extends the summary with member messages and computed aggregates (participants set, label union). Messages are returned chronologically (oldest first) — the rendering convention for a thread view.

type ConversationListFilter added in v0.3.0

type ConversationListFilter struct {
	AgentID string
	Limit   int
	// Since / Until bracket the conversation's last_message_at —
	// "show me conversations that had activity in this window".
	// Zero values disable each bound.
	Since time.Time
	Until time.Time
}

ConversationListFilter is the input to ListConversationsByAgent. Limit is capped to ConversationListHardCap at the storage layer regardless of what the caller passes; pagination is intentionally not in this slice (most agents have dozens of conversations, not thousands) and can be added cursor-style if a deployment needs it.

type ConversationSummary added in v0.3.0

type ConversationSummary struct {
	ID             string    `json:"conversation_id"`
	LastMessageAt  time.Time `json:"last_message_at"`
	FirstMessageAt time.Time `json:"first_message_at"`
	MessageCount   int       `json:"message_count"`
	InboundCount   int       `json:"inbound_count"`
	OutboundCount  int       `json:"outbound_count"`
	HasUnread      bool      `json:"has_unread"`
	LatestSubject  string    `json:"latest_subject"`
	LatestSender   string    `json:"latest_sender"`
}

ConversationSummary is one row in the list endpoint. Aggregated counts + the "latest message" preview fields are enough to render an inbox-style conversation list without a per-row drill-down.

HasUnread is true iff at least one INBOUND member is in inbox_status='unread'. Outbound pending_approval doesn't count — the conversation list is the agent's mailbox view, not the reviewer's HITL queue.

type DashboardPendingStats added in v0.3.0

type DashboardPendingStats struct {
	Count         int `json:"count"`
	OldestSeconds int `json:"oldest_seconds"`
}

type DashboardStats added in v0.3.0

type DashboardStats struct {
	Today              DashboardTodayStats   `json:"today"`
	Pending            DashboardPendingStats `json:"pending"`
	DeliverySuccessPct float64               `json:"delivery_success_pct"`
	SampleWindowDays   int                   `json:"sample_window_days"`
	// InboundWindow / OutboundWindow are the totals over the same
	// SampleWindowDays as DeliverySuccessPct. The dashboard at-a-glance
	// strip uses Today.*; the settings page uses these window totals
	// at a 30-day window (?window=30). Sum over usage_summaries rows
	// in the lookback period.
	InboundWindow  int `json:"inbound_window"`
	OutboundWindow int `json:"outbound_window"`
}

DashboardStats is the workspace-level summary returned by GetDashboardStats. Each section corresponds to one of the cards on the redesigned dashboard's stats strip; null/zero values render as "—" in the UI, so deployments without E2A_USAGE_TRACKING enabled degrade gracefully.

type DashboardTodayStats added in v0.3.0

type DashboardTodayStats struct {
	Inbound          int `json:"inbound"`
	Outbound         int `json:"outbound"`
	InboundDeltaPct  int `json:"inbound_delta_pct"`
	OutboundDeltaPct int `json:"outbound_delta_pct"`
}

type DeleteUserDataResult

type DeleteUserDataResult struct {
	UsageEventsDeleted        int64 `json:"usage_events_deleted"`
	UsageSummariesDeleted     int64 `json:"usage_summaries_deleted"`
	MessagesDeleted           int64 `json:"messages_deleted"`
	AgentsDeleted             int64 `json:"agents_deleted"`
	DomainsDeleted            int64 `json:"domains_deleted"`
	APIKeysDeleted            int64 `json:"api_keys_deleted"`
	SessionsDeleted           int64 `json:"sessions_deleted"`
	OAuthAuthCodesDeleted     int64 `json:"oauth_auth_codes_deleted,omitempty"`
	OAuthAccessTokensDeleted  int64 `json:"oauth_access_tokens_deleted,omitempty"`
	OAuthRefreshTokensDeleted int64 `json:"oauth_refresh_tokens_deleted,omitempty"`
	UserDeleted               bool  `json:"user_deleted"`

} // @name DeleteUserDataResult

DeleteUserDataResult breaks out per-table row counts for audit logs. Operators receiving a deletion request often have to attest to what was removed; returning structured counts beats parsing a log line.

type Domain

type Domain struct {
	Domain            string     `json:"domain"`
	UserID            *string    `json:"user_id,omitempty"`
	Verified          bool       `json:"verified"`
	VerificationToken string     `json:"verification_token"`
	CreatedAt         time.Time  `json:"created_at"`
	VerifiedAt        *time.Time `json:"verified_at,omitempty"`
	// IsPrimary marks the user's default domain. At most one TRUE per
	// user (enforced by a partial unique index in migration 013).
	IsPrimary bool `json:"is_primary"`
	// LastCheckedAt is updated whenever the verification probe runs,
	// successful or not. NULL until the first probe — distinct from
	// "probed and failed" which is captured by `verified=false` + a
	// non-null LastCheckedAt.
	LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
	// AgentCount is computed at read time by ListDomainsByUser and is
	// not a persisted column. Single-domain LookupDomain leaves it at
	// the zero value — callers that need the count call the list path
	// (this column-versus-aggregate split avoids changing every store
	// signature to thread an agent-counter through).
	AgentCount int `json:"agent_count"`
	// DKIM keypair fields. The selector + public key
	// are user-facing — the dashboard shows them so users can copy the
	// DNS TXT record. The private key is intentionally NOT in the JSON
	// shape; it's only read by the outbound signer via
	// GetDKIMKey(domain). Domains created before migration 014 ran
	// keep all three NULL until the next ClaimOrCreate or backfill.
	DKIMSelector  string `json:"dkim_selector,omitempty"`
	DKIMPublicKey string `json:"dkim_public_key,omitempty"`
}

Domain represents a verified or unverified domain registered by a user.

type ExpirationCandidate

type ExpirationCandidate struct {
	MessageID        string
	AgentID          string
	ExpirationAction string // 'approve' or 'reject'
}

ExpirationCandidate is the minimal row the expiration worker needs to decide how to finalize an expired pending message.

type Message

type Message struct {
	ID                string            `json:"id"`
	AgentID           string            `json:"agent_id"`
	Direction         string            `json:"direction"`
	Sender            string            `json:"sender"`
	Recipient         string            `json:"recipient"`
	Subject           string            `json:"subject"`
	EmailMessageID    string            `json:"email_message_id,omitempty"`
	ProviderMessageID string            `json:"provider_message_id,omitempty"`
	Method            string            `json:"method,omitempty"`
	Type              string            `json:"type,omitempty"`
	RawMessage        []byte            `json:"raw_message,omitempty"`
	AuthHeaders       map[string]string `json:"auth_headers,omitempty"`
	ConversationID    string            `json:"conversation_id,omitempty"`
	DeliveryStatus    string            `json:"delivery_status,omitempty"`
	CreatedAt         time.Time         `json:"created_at"`
	ExpiresAt         time.Time         `json:"expires_at"`
	WebhookStatus     string            `json:"webhook_status,omitempty"`
	WebhookError      string            `json:"webhook_error,omitempty"`
	WebhookAttempts   int               `json:"webhook_attempts,omitempty"`
	// SizeBytes is the byte length of raw_message. Populated by load paths
	// that compute it (e.g. GetMessagesByAgent for the dashboard inbox).
	// Zero on load paths that don't — the inbox renders "—" in that case.
	SizeBytes int `json:"size_bytes,omitempty"`
	// InboxStatus mirrors messages.inbox_status ('unread' | 'read') for
	// inbound rows. Kept separate from DeliveryStatus (which currently
	// carries the same value under a confusing JSON key — see line 161)
	// so the dashboard's inbox can read it under a non-overloaded key.
	// Empty on outbound rows. Populated by GetMessagesByAgent.
	InboxStatus string `json:"inbox_status,omitempty"`

	// Multi-recipient fields. For outbound, these are the addressed
	// To/Cc/Bcc recipients of the send. For inbound, ToRecipients and CC
	// are the parsed To: and Cc: headers of the original message (the
	// per-delivery target for this row is in Recipient). BCC is
	// outbound-only.
	ToRecipients []string `json:"to_recipients,omitempty"`
	CC           []string `json:"cc,omitempty"`
	BCC          []string `json:"bcc,omitempty"`

	// ReplyTo is the parsed Reply-To: header on inbound messages — empty
	// when the header was absent. Distinct from Sender so consumers can
	// recover the original From: of forwarded / notification mail whose
	// Reply-To points at a different mailbox. Outbound-irrelevant.
	ReplyTo []string `json:"reply_to,omitempty"`

	// Labels are user-applied string tags (`urgent`, `follow-up`, …).
	// Always lowercase, charset `[a-z0-9:_-]+`, ≤ 64 chars per label,
	// capped at 100 per message. Empty slice means no labels — the DB
	// default is `'{}'` so this is never null on read. Labels with the
	// `e2a:` prefix are reserved for server-applied system labels;
	// caller writes that try to set them are rejected at the API layer.
	Labels []string `json:"labels,omitempty"`

	// HITL approval fields. Status defaults to 'sent'; body and attachments
	// are populated only while a message is in 'pending_approval', and are
	// scrubbed on any terminal transition.
	Status            string     `json:"status,omitempty"`
	ApprovalExpiresAt *time.Time `json:"approval_expires_at,omitempty"`
	ReviewedAt        *time.Time `json:"reviewed_at,omitempty"`
	// ReviewedByUserID identifies the human reviewer who approved or
	// rejected this message. NULL on worker-triggered transitions
	// (TTL auto-approve / auto-reject) — operator-visible signal "no
	// human looked at this." Set by ApproveAndSend and RejectPending,
	// left null by ExpireApproveAndSend / ExpireReject.
	ReviewedByUserID *string `json:"reviewed_by_user_id,omitempty"`
	// ReviewedByName is the JOIN'd display name from the reviewer's
	// users row, populated only by GetOutboundMessageForUser. List
	// endpoints leave this empty to avoid a join-per-row cost — the
	// pending-detail page is where reviewer attribution matters.
	ReviewedByName  *string         `json:"reviewed_by_name,omitempty"`
	RejectionReason string          `json:"rejection_reason,omitempty"`
	Edited          bool            `json:"edited,omitempty"`
	BodyText        string          `json:"body_text,omitempty"`
	BodyHTML        string          `json:"body_html,omitempty"`
	AttachmentsJSON json.RawMessage `json:"attachments,omitempty"`
}

type MessageListFilter added in v0.3.0

type MessageListFilter struct {
	AgentID    string
	Status     string // "unread" | "read" | "all"
	Direction  string // "inbound" | "outbound" | "all"
	Descending bool
	Limit      int
	AfterTime  time.Time
	AfterID    string
	// Optional search filters. Empty / zero means "no constraint".
	// From / SubjectContains are case-insensitive substring matches
	// (Postgres ILIKE) and bound to 200 chars at the handler layer.
	From            string
	SubjectContains string
	ConversationID  string    // exact match
	Since           time.Time // created_at >= Since
	Until           time.Time // created_at <  Until
	// Labels filters rows where ALL given labels are present on the
	// message (AND-match via Postgres @> array containment). Empty slice
	// means "no label constraint" — matches both labelled and unlabelled
	// rows. Handler-layer validates each entry against the same charset
	// rule used on writes so callers can't smuggle SQL through here.
	Labels []string
}

MessageListFilter bundles the params for GetMessagesByAgent. Zero values on the optional substring / time / ID filters mean "no constraint" — callers omit what they don't want to filter on.

type MigrationMode added in v0.3.0

type MigrationMode string

MigrationMode controls how RunMigrations handles pending migrations.

Set via E2A_MIGRATION_MODE env var. Default is ModeAuto.

  • ModeAuto: apply pending migrations in order. Fail (return error) if any apply errors. This is what the hosted deployment uses.
  • ModeVerify: do not apply anything. Return an error listing pending migrations. For cautious operators who want a separate manual migration step before binary rollout.
  • ModeSkip: do not apply or check. Log and proceed. For emergency surgery — deploy a binary that won't touch schema.
const (
	ModeAuto   MigrationMode = "auto"
	ModeVerify MigrationMode = "verify"
	ModeSkip   MigrationMode = "skip"
)

func ParseMigrationMode added in v0.3.0

func ParseMigrationMode(s string) (MigrationMode, error)

ParseMigrationMode reads a string (typically from env) and returns the matching mode, defaulting to ModeAuto on empty input. Returns an error for unknown values so a typo is loud, not silent.

type OAuthConnectionEntry added in v0.3.0

type OAuthConnectionEntry struct {
	ClientID   string     `json:"client_id"`
	ClientName string     `json:"client_name"`
	AgentEmail string     `json:"agent_email"`
	Scope      string     `json:"scope"`
	IssuedAt   time.Time  `json:"issued_at"`
	ExpiresAt  *time.Time `json:"expires_at,omitempty"`
	RevokedAt  *time.Time `json:"revoked_at,omitempty"`

} // @name OAuthConnectionEntry

OAuthConnectionEntry is one OAuth/MCP client connection. The underlying token signatures are intentionally excluded — they are credential-equivalent. The agent_email is the per-grant binding captured at consent time.

type PendingApprovalEdit

type PendingApprovalEdit struct {
	Subject         *string
	BodyText        *string
	BodyHTML        *string
	To              []string
	CC              []string
	BCC             []string
	AttachmentsJSON []byte
	// AttachmentsSet must be true when the caller intends to override
	// AttachmentsJSON, since nil and empty [] are both valid overrides
	// (empty [] clears attachments; nil preserves).
	AttachmentsSet bool
}

PendingApprovalEdit holds optional overrides a reviewer can apply when approving a pending message. Pointer-typed strings distinguish "not provided" (nil) from "explicitly empty" (pointer to ""). Slice fields distinguish "unset" (nil) from "empty list" (non-nil zero-length slice).

func (PendingApprovalEdit) Apply

func (e PendingApprovalEdit) Apply(msg *Message) bool

Apply mutates msg to reflect any fields the reviewer changed. Returns true if any field was actually different from what msg already held (signals the edited flag should be set).

type SendAttemptOutcome added in v0.3.0

type SendAttemptOutcome int

SendAttemptOutcome describes the result of trying to reserve a (message_id) slot for an upstream SES send.

const (
	// SendAttemptAcquired — caller now owns the slot and must follow
	// up with MarkSendSucceeded or MarkSendFailed.
	SendAttemptAcquired SendAttemptOutcome = iota
	// SendAttemptAlreadySent — a prior attempt for this message
	// already succeeded at SES; SendResult is populated with the
	// recorded provider id and recipient lists. Callers must NOT
	// re-invoke the upstream send.
	SendAttemptAlreadySent
	// SendAttemptInFlight — a concurrent caller holds the slot and
	// the row is not stale. Callers should surface ErrSendInProgress.
	SendAttemptInFlight
)

type SendAttemptResult added in v0.3.0

type SendAttemptResult struct {
	Outcome SendAttemptOutcome
	Sent    SendResult
}

SendAttemptResult bundles the outcome with the cached SendResult when the outcome is SendAttemptAlreadySent.

type SendResult

type SendResult struct {
	ProviderMessageID string
	Method            string
	To                []string
	CC                []string
	BCC               []string
}

SendResult carries the outcome of a sender.Send invocation back to the store for final persistence. Handlers wrap their sender.Send call in a closure that returns this type.

type SigningSecret added in v0.3.0

type SigningSecret struct {
	ID           string     `json:"id"`
	UserID       string     `json:"user_id"`
	Name         string     `json:"name"`
	Secret       string     `json:"secret,omitempty"`        // only on creation
	SecretPrefix string     `json:"secret_prefix,omitempty"` // first 12 chars, for list/get
	CreatedAt    time.Time  `json:"created_at"`
	LastSignedAt *time.Time `json:"last_signed_at,omitempty"`
}

SigningSecret is one of a user's HMAC secrets used to sign their agents' inbound webhook payloads and HITL approval magic-link tokens.

The plaintext Secret is only set in the response to a fresh CreateSigningSecret call (and what's persisted in the DB row); list operations omit it and surface a SecretPrefix preview instead.

type SigningSecretWithValue added in v0.3.0

type SigningSecretWithValue struct {
	ID     string
	Secret string
}

SigningSecretWithValue carries the plaintext Secret alongside the ID so the relay can both sign with the value and (asynchronously) update last_signed_at on the right row. Returned by GetUserSigningSecrets in most-recent-first order.

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(pool *pgxpool.Pool) *Store

func (*Store) ApproveAndSend

func (s *Store) ApproveAndSend(
	ctx context.Context,
	messageID, userID string,
	edits PendingApprovalEdit,
	send func(msg *Message) (SendResult, error),
) (*Message, error)

ApproveAndSend finalizes a pending_approval message by running it through a caller-supplied send function inside a transaction that holds a row lock on the pending row. If send returns an error the transaction rolls back and the message remains pending. On success the row is updated to 'sent' with the provider-assigned Message-ID and the body/attachments columns are scrubbed.

edits, if any fields are populated, are applied to the in-memory message before send is called and the 'edited' column is set to true when any field differs from what was stored. Approval-via-magic-link callers pass the zero edits value.

Ownership is enforced by the agent -> user join. Messages owned by another user return ErrMessageNotFound. Messages whose status is not 'pending_approval' return ErrNotPendingApproval. If another worker is already mid-send for this message (rare; only possible after the approval row lock was released without status changing — e.g. a pool drop mid-send), this returns ErrSendInProgress.

Concurrency / failure mode notes:

  • The row-level FOR NO KEY UPDATE lock is held on the messages row for the duration of the send callback. In practice that is bounded by outbound.SMTPRelay's per-attempt deadline (2min) plus its internal retry backoff (1s/5s/15s) — worst case ~6.5min of lock on this single row. Other rows are unaffected; deadlock is not possible because only one row is ever locked per call.

    Why NO KEY UPDATE rather than the stricter FOR UPDATE: the send_attempts INSERT below runs on a SEPARATE pool connection and needs a KEY SHARE lock on this messages row for FK enforcement. FOR UPDATE blocks KEY SHARE; FOR NO KEY UPDATE allows it. The downgrade is safe because nothing in this codebase mutates messages.id (the only key column) after creation — all UPDATEs touch non-key columns, which NO KEY UPDATE serializes against itself exactly like FOR UPDATE.

  • The old crash window where send() succeeded at SES but the subsequent UPDATE/Commit failed (DB blip, pool exhaustion) is now closed by the send_attempts table. Around send() we run two small auxiliary transactions that outlive the surrounding approval transaction: ClaimSendAttempt before send(), MarkSendSucceeded (or MarkSendFailed) after. If the approval tx rolls back AFTER send() succeeded, the next retry of ApproveAndSend reads send_attempts.status='sent', reuses the recorded SendResult, and skips the upstream send entirely.

func (*Store) AutoDisableFailingWebhooks added in v0.3.0

func (s *Store) AutoDisableFailingWebhooks(ctx context.Context) (int, error)

AutoDisableFailingWebhooks scans for webhooks whose recent delivery history exceeds the failure threshold and flips them to enabled=false with auto_disabled_at = now(). Returns the count of webhooks newly disabled. Designed to be called periodically (e.g. every 5 minutes) from a janitor goroutine.

"Consecutive failed events" is interpreted as: in the last AutoDisableWindow, at least AutoDisableThreshold rows in webhook_subscriber_deliveries reached status='failed' AND zero rows reached status='delivered'. The zero-delivered guard prevents a noisy webhook that's still mostly working from being disabled.

func (*Store) BootstrapUser

func (s *Store) BootstrapUser(ctx context.Context, email string) (*User, error)

BootstrapUser finds a user by email, or creates one with a synthetic google_subject if none exists. Used by the -bootstrap-email CLI flag for self-host first-run, where there's no Google OAuth flow yet.

func (*Store) ClaimOrCreateDomain

func (s *Store) ClaimOrCreateDomain(ctx context.Context, domain, userID string) (*Domain, error)

ClaimOrCreateDomain implements the atomic create/claim logic from the design doc. Creates if new, returns the existing row when the same user already owns it (verified or not), and errors if a different user owns it. The verification_token and DKIM keypair are minted on first INSERT and remain stable across re-claims — a caller that has already published the TXT record on DNS (or has mail in flight signed with the DKIM key) isn't silently invalidated by a second call. A different user cannot take over an unverified row; that closes a squatting window where the new owner could verify against a TXT record the original owner already published. Callers are responsible for rejecting the configured shared domain before invoking this — the store has no concept of a reserved domain.

func (*Store) ClaimSendAttempt added in v0.3.0

func (s *Store) ClaimSendAttempt(ctx context.Context, messageID string) (SendAttemptResult, error)

ClaimSendAttempt atomically reserves the send_attempts row for messageID. Concurrency model mirrors internal/idempotency.Claim: one UPSERT with a stale-takeover WHERE clause; the loser does a follow-up SELECT to classify the existing row.

Allowed transitions:

(no row)                              → acquired (fresh INSERT)
status='failed'                       → acquired (UPSERT path takes over)
status='attempting' AND stale         → acquired (UPSERT path takes over)
status='attempting' AND NOT stale     → InFlight (refuse)
status='sent'                         → AlreadySent (reuse recorded result)

Called from ApproveAndSend in a SEPARATE small transaction so the claim row outlives any rollback of the surrounding approval transaction. That's what closes the documented double-send window: once status='sent' is committed here, a retry sees AlreadySent and skips the upstream send even if the messages-row UPDATE inside the approval tx never committed.

func (*Store) ClearExpiredPrevSecrets added in v0.3.0

func (s *Store) ClearExpiredPrevSecrets(ctx context.Context) (int64, error)

ClearExpiredPrevSecrets nulls signing_secret_prev / signing_secret_prev_expires_at on rows past their grace window. Idempotent. The worker already ignores expired prev secrets at signing time; this janitor is a hygiene pass so GET responses don't carry a meaningless prev_expires_at.

func (*Store) CountWebhooksByUser added in v0.3.0

func (s *Store) CountWebhooksByUser(ctx context.Context, userID string) (int, error)

CountWebhooksByUser returns the total number of webhooks (enabled + disabled) the user owns. Used by CreateWebhook to enforce the per-user cap from account_limits.max_webhooks.

func (*Store) CreateAPIKey

func (s *Store) CreateAPIKey(ctx context.Context, userID, name string, expiresAt *time.Time) (*APIKey, error)

CreateAPIKey issues a fresh API key for the user. expiresAt is the optional hard expiration; pass nil to issue a never-expiring key (the backward-compatible default).

func (*Store) CreateAgent

func (s *Store) CreateAgent(ctx context.Context, agentEmail, domain, name, webhookURL, agentMode, userID string) (*AgentIdentity, error)

CreateAgent inserts an agent with a domain FK. Does NOT check domain ownership — that's the API handler's responsibility (shared domain skips the check).

func (*Store) CreateAgentTx added in v0.3.0

func (s *Store) CreateAgentTx(ctx context.Context, tx pgx.Tx, agentEmail, domain, name, webhookURL, agentMode, userID string) (*AgentIdentity, error)

CreateAgentTx inserts an agent inside a caller-owned transaction. Used by the OAuth consent flow so the slug auto-create row and the authorization-code insert (in oauth_auth_codes) commit together — without this, a code-issue failure after the agent commit would leave a phantom inbox the user never authorized.

func (*Store) CreateInboundMessage

func (s *Store) CreateInboundMessage(ctx context.Context, id, agentID, senderEmail, recipient, emailMessageID, subject, conversationID, deliveryStatus string, rawMessage []byte, authHeaders map[string]string, toRecipients, cc, replyTo []string) (*Message, error)

CreateInboundMessage stores an inbound message. If id is empty a new one is generated; otherwise the caller's pre-generated ID is used so the upstream signer can bind auth headers to the same ID that gets stored. toRecipients and cc are the parsed To: and Cc: headers from the original RFC 2822 message; recipient is the per-delivery target for this row (may be one of the To: addresses, or absent from the header list when the agent was Bcc'd). replyTo is the parsed Reply-To: header (empty when absent — never silently falls back to sender).

func (*Store) CreateInboundMessageInTx added in v0.3.0

func (s *Store) CreateInboundMessageInTx(ctx context.Context, tx pgx.Tx, id, agentID, senderEmail, recipient, emailMessageID, subject, conversationID, deliveryStatus string, rawMessage []byte, authHeaders map[string]string, toRecipients, cc, replyTo []string) (*Message, error)

CreateInboundMessageInTx writes the messages row inside the caller's transaction. Used by the slice-3 relay refactor (per design §4.2) so the messages INSERT and the webhook_events outbox INSERT commit together, closing the at-least-once publish-loss window.

Mirrors the CreateAgentTx pattern at store.go:596-607 — same SQL body, executed against either *pgxpool.Pool or pgx.Tx via the messageExecutor interface below.

func (*Store) CreateOrGetUser

func (s *Store) CreateOrGetUser(ctx context.Context, email, name, googleSub string) (*User, error)

func (*Store) CreateOutboundMessage

func (s *Store) CreateOutboundMessage(ctx context.Context, agentID string, toRecipients []string, cc []string, bcc []string, subject, msgType, method, providerMessageID, conversationID string) (*Message, error)

CreateOutboundMessage stores an outbound message with multi-recipient support. The recipient param is kept for backward compat with the singular recipient column; toRecipients, cc, and bcc are the canonical outbound-only multi-recipient fields.

func (*Store) CreatePendingOutboundMessage

func (s *Store) CreatePendingOutboundMessage(ctx context.Context, agentID string, toRecipients, cc, bcc []string, subject, bodyText, bodyHTML string, attachmentsJSON []byte, msgType, conversationID, replyToEmailMessageID string, ttlSeconds int) (*Message, error)

CreatePendingOutboundMessage stores a fully composed outbound email in pending_approval status, including body_text, body_html, and attachments so that approval can reconstruct the original SendRequest (or accept edits) without the caller needing to retain it. ttlSeconds sets how long the message remains pending before the expiration worker resolves it.

replyToEmailMessageID is the RFC 5322 Message-ID of the inbound being replied to (e.g. "<abc@gmail.com>"), or "" for fresh sends and test emails. It reuses the email_message_id column, which is unused for outbound rows in every other path — the column semantically carries "the Message-ID this row references" in both directions.

attachmentsJSON must be a JSON array matching the public Attachment shape ([{filename, content_type, data}, ...]) or nil. Callers that already have an []outbound.Attachment slice should json.Marshal it before passing in.

func (*Store) CreateSigningSecret added in v0.3.0

func (s *Store) CreateSigningSecret(ctx context.Context, userID, name string) (*SigningSecret, error)

CreateSigningSecret mints a new secret for the user. The plaintext secret value is set on the returned struct exactly once; subsequent reads (List/Get) only see the prefix.

Returns ErrSigningSecretCapReached if the user is already at MaxSigningSecretsPerUser. Race-free under concurrent callers via the per-user advisory lock.

Empty `name` is normalized server-side to "unnamed" so the dashboard always has something to display.

func (*Store) CreateUserSession

func (s *Store) CreateUserSession(ctx context.Context, userID string) (string, error)

func (*Store) CreateWebhook added in v0.3.0

func (s *Store) CreateWebhook(ctx context.Context, userID, url, description string, events []string, filters WebhookFilters) (*Webhook, error)

CreateWebhook inserts a new row and returns it with the plaintext signing secret populated. The plaintext is only available on this response — subsequent GET/list calls scrub it.

Filters validation (charset, count caps, agent ownership) is the handler's job in slice 2; the storage layer only verifies the per-user count cap from account_limits.max_webhooks.

func (*Store) DeleteAPIKey

func (s *Store) DeleteAPIKey(ctx context.Context, keyID, userID string) error

func (*Store) DeleteAgent

func (s *Store) DeleteAgent(ctx context.Context, agentID, userID string) error

func (*Store) DeleteDomain

func (s *Store) DeleteDomain(ctx context.Context, domain, userID string) error

DeleteDomain deletes a domain only if owned by the user. The handler should check for existing agents first.

func (*Store) DeleteExpiredMessages

func (s *Store) DeleteExpiredMessages(ctx context.Context) (int64, error)

func (*Store) DeleteExpiredUserSessions

func (s *Store) DeleteExpiredUserSessions(ctx context.Context) (int64, error)

func (*Store) DeleteSigningSecret added in v0.3.0

func (s *Store) DeleteSigningSecret(ctx context.Context, secretID, userID string) error

DeleteSigningSecret removes a secret. Refuses to delete the user's last secret — every user must keep at least one so webhooks remain verifiable. Race-free under concurrent callers via the per-user row lock.

Check order matters: ownership first (so an attacker probing IDs they don't own gets 404, not "cannot delete last" leaking that the caller has only 1 secret), then the floor.

func (*Store) DeleteUserData

func (s *Store) DeleteUserData(ctx context.Context, userID string) (*DeleteUserDataResult, error)

DeleteUserData wipes everything tied to a user in a single transaction.

Schema cascades cover most of it (user_sessions, domains, agent_identities, api_keys, usage_summaries all `ON DELETE CASCADE` from users; messages cascade through agent_identities; webhook_deliveries cascade through messages). The one row that doesn't is usage_events: its FK is `ON DELETE SET NULL` so analytics survives, which we explicitly override here for full deletion.

Per-table counts are returned to the caller so an operator can attest to what was removed in audit / compliance contexts.

func (*Store) DeleteUserSession

func (s *Store) DeleteUserSession(ctx context.Context, token string) error

func (*Store) DeleteWebhook added in v0.3.0

func (s *Store) DeleteWebhook(ctx context.Context, webhookID, userID string) error

DeleteWebhook removes a webhook owned by the user. Idempotent: deleting a non-existent or cross-user webhook returns ErrWebhookNotFound, never silently succeeds. The ON DELETE CASCADE on webhook_subscriber_deliveries.webhook_id drops pending delivery rows automatically — no separate cleanup needed.

Slice 1 includes this method (rather than deferring to slice 2's handler work) because tests need it for setup teardown and the implementation is trivial.

func (*Store) EnsureSharedDomain

func (s *Store) EnsureSharedDomain(ctx context.Context, domain string) error

EnsureSharedDomain inserts a system row for the configured shared mail domain so slug-based agent registration can satisfy the agent_identities.domain → domains.domain foreign key. The row is owned by no user (user_id = NULL) and pre-verified — it represents infrastructure the operator runs, not user-claimed identity.

Called once at server startup. Idempotent via ON CONFLICT DO NOTHING, and a no-op when the operator has not configured a shared domain. Without this, any deployment whose shared_domain differs from the hardcoded migration seed (`agents.e2a.dev`) gets an FK violation the first time a user tries to register a slug-based agent.

func (*Store) EnsureUserHasSigningSecret added in v0.3.0

func (s *Store) EnsureUserHasSigningSecret(ctx context.Context, userID string) error

EnsureUserHasSigningSecret guarantees the user has at least one signing secret, creating a "default" one if not. Idempotent. Concurrent callers serialize via the per-user advisory lock so we can't accidentally insert two "default" rows.

func (*Store) ExpireApproveAndSend

func (s *Store) ExpireApproveAndSend(
	ctx context.Context,
	messageID string,
	send func(msg *Message) (SendResult, error),
) (*Message, error)

ExpireApproveAndSend is the worker-side counterpart to ApproveAndSend: no user ownership check (the caller is the expiration worker, which is system-scoped), SELECT ... FOR NO KEY UPDATE SKIP LOCKED so concurrent workers don't race for the same row, and the terminal status is 'expired_approved' instead of 'sent'. On send failure the transaction rolls back; the worker should then call ExpireReject to move the row to a final state so the row doesn't get picked up on every sweep.

Exactly-once guarantee: like ApproveAndSend, this method runs the send() callback under a send_attempts gate so a crash between SES acceptance and the surrounding tx commit does NOT cause the next worker poll to re-send. ClaimSendAttempt / MarkSendSucceeded / MarkSendFailed run in separate small transactions that outlive the approval tx; on retry, an AlreadySent verdict reuses the cached SendResult and skips the upstream send entirely. Without this, the polling-loop nature of the worker would guarantee a re-send on any commit failure — strictly worse than the human-approval path, where a re-send needs an explicit click.

SKIP LOCKED means multiple app instances can run the worker without contending on the same row. The row-level FOR NO KEY UPDATE lock on messages is held for the duration of the send callback (bounded by SMTPRelay timeouts); FOR NO KEY UPDATE rather than FOR UPDATE so the send_attempts INSERT in a separate connection can acquire its KEY SHARE lock for FK enforcement — see ApproveAndSend's docstring for the full rationale.

If a concurrent worker is mid-send for the same row (the send_attempts row is 'attempting' and not yet stale), returns ErrSendInProgress. The worker loop should treat this like ErrNotPendingApproval — skip silently and let the next poll handle it.

func (*Store) ExpireReject

func (s *Store) ExpireReject(ctx context.Context, messageID, reason string) (*Message, error)

ExpireReject transitions a pending_approval message to expired_rejected and scrubs body columns. No user ownership check — this is the worker path. If the row is no longer pending (racing worker, already handled) returns ErrNotPendingApproval; caller can treat as a no-op.

func (*Store) ExportUserData

func (s *Store) ExportUserData(ctx context.Context, userID string) (*UserExport, error)

ExportUserData gathers everything a user owns into a single struct for the right-of-access flow. Reads run inside a REPEATABLE READ transaction so the snapshot is internally consistent even if writes arrive while the export is being assembled.

func (*Store) GetAgentByEmail

func (s *Store) GetAgentByEmail(ctx context.Context, email string) (*AgentIdentity, error)

GetAgentByEmail looks up an agent by email address (same as GetAgentByID since ID = email).

func (*Store) GetAgentByID

func (s *Store) GetAgentByID(ctx context.Context, id string) (*AgentIdentity, error)

GetAgentByID looks up an agent by its ID (full email) with domain verification status.

func (*Store) GetConversationByID added in v0.3.0

func (s *Store) GetConversationByID(ctx context.Context, agentID, conversationID string) (*ConversationDetail, error)

GetConversationByID returns the aggregate summary fields plus every member message, ordered oldest-first (chronological reading order). Returns ErrMessageNotFound when no non-expired messages exist for the given (agentID, conversationID) — mirrors the "looks-like-not-found-on-cross-agent" convention used by single- message reads. The same code path handles "wrong agent" and "real non-existent": either way the agent has no business seeing it.

Participants are computed as the union of sender + recipient + each row's to_recipients / cc / bcc (when populated). Empty strings are dropped. Labels are the union of all members' labels[]; both are sorted lexicographically for stable output.

func (*Store) GetDKIMKeyInternal added in v0.3.0

func (s *Store) GetDKIMKeyInternal(ctx context.Context, domain string) (string, []byte, error)

GetDKIMKeyInternal returns the stored selector + private key bytes for a domain. The "Internal" suffix is load-bearing: this function does NOT scope by user — it takes a domain name and returns whoever owns that domain's signing key. ONLY call from server-internal codepaths where the domain has already been resolved from a trusted source (e.g. an outbound message's sender field, after the agent layer has authenticated the owner). A handler that ever takes a user-supplied domain string and feeds it to this function becomes a "sign as anyone" primitive: don't.

Returns ("", nil, nil) when the domain has no key — callers MUST treat this as "skip signing" and fall back to whatever the relay-level fallback does.

func (*Store) GetDashboardStats added in v0.3.0

func (s *Store) GetDashboardStats(ctx context.Context, userID string, windowDays int) (*DashboardStats, error)

GetDashboardStats returns workspace-level aggregates for the authenticated user, with a configurable lookback window. windowDays controls Inbound/Outbound totals AND the delivery-success ratio's sample period — passing 0 falls back to DashboardDefaultWindowDays (7), values above DashboardMaxWindowDays (90) are clamped.

Three independent reads — kept separate because the source tables have different indexes and one slow read shouldn't slow the others. All reads are O(rows-for-this-user-only) thanks to the existing per-user indexes.

Robust to missing data: deployments without usage tracking enabled (E2A_USAGE_TRACKING=false — the default for self-hosters) return zero counts rather than erroring. Same for users who have no messages yet. The UI renders zero values as "—".

Delta percentages: today vs yesterday on usage_summaries. Avoids divide-by-zero when yesterday was zero by returning 0. 100% in/de- crease maps to ±100; values clipped at ±999 for integer width.

func (*Store) GetInboundByEmailMessageID added in v0.3.0

func (s *Store) GetInboundByEmailMessageID(ctx context.Context, agentID, emailMessageID string) (*Message, error)

GetInboundByEmailMessageID looks up an inbound message by its RFC 5322 Message-ID for the given agent. Used by HITL flows to reach the parent inbound at approval time so the References chain can be rebuilt — the pending-outbound row only stores the parent's Message-ID, not its raw message. Scoped to agent_id to prevent any cross-agent reach across shared infra. Returns sql.ErrNoRows when the inbound has expired or was never persisted; callers must tolerate that and fall back to legacy single-id threading.

auth_headers is included in the SELECT so HITL review handlers can surface SPF/DKIM/DMARC provenance on the reply-context pane.

func (*Store) GetInboundMessage

func (s *Store) GetInboundMessage(ctx context.Context, id string) (*Message, error)

func (*Store) GetMessageWithContent

func (s *Store) GetMessageWithContent(ctx context.Context, messageID, agentID string) (*Message, error)

GetMessageWithContent returns a full message including raw_message and auth_headers. Marks the message as 'read' if it was 'unread'.

func (*Store) GetMessagesByAgent

func (s *Store) GetMessagesByAgent(ctx context.Context, f MessageListFilter) ([]Message, error)

GetMessagesByAgent returns messages for an agent, filtered by status, direction, and the optional search filters on the MessageListFilter struct.

  • direction: "inbound" (default for SDK polling), "outbound", or "all" (used by the dashboard inbox).
  • status: "unread" | "read" | "all" — only applies when direction selects inbound rows; ignored on pure outbound queries.
  • descending: cursor walks newest→oldest when true; oldest→newest when false (FIFO polling).
  • From / SubjectContains: case-insensitive substring (ILIKE).
  • ConversationID: exact match.
  • Since / Until: time-range bracket on created_at.

The SELECT includes columns both consumers need: the inbox needs `status` (outbound HITL lifecycle), `webhook_status`/`last_error` (outbound delivery), and `octet_length(raw_message)` (size column); the polling SDK ignores these fields and reads only the existing inbound-relevant ones from the Message struct.

func (*Store) GetOutboundMessageForUser

func (s *Store) GetOutboundMessageForUser(ctx context.Context, messageID, userID string) (*Message, error)

GetOutboundMessageForUser returns a full message row (including body, HITL fields, and attachments) if it exists and is owned by userID (via the agent the row belongs to). Inbound messages and cross-user access both return ErrMessageNotFound — the caller should not be able to distinguish "does not exist" from "belongs to someone else".

func (*Store) GetUserByAPIKey

func (s *Store) GetUserByAPIKey(ctx context.Context, apiKey string) (*User, error)

GetUserByAPIKey authenticates a bearer token and returns the owning user. Rejects revoked keys and time-expired keys; touches last_used_at only on the success path so the column stays a real "last successful authentication" signal (rather than "last attempt").

Expiration semantics: expires_at IS NULL means the key never expires (preserves the pre-migration default). A non-null expires_at must be in the future, evaluated against now() in the same query so there's no clock skew between row read and check.

func (*Store) GetUserByID

func (s *Store) GetUserByID(ctx context.Context, id string) (*User, error)

func (*Store) GetUserSession

func (s *Store) GetUserSession(ctx context.Context, token string) (*User, error)

func (*Store) GetUserSigningSecrets added in v0.3.0

func (s *Store) GetUserSigningSecrets(ctx context.Context, userID string) ([]SigningSecretWithValue, error)

GetUserSigningSecrets returns the plaintext secret values for a user (paired with their IDs), most-recent-first. The relay signs with [0] and asynchronously updates last_signed_at on that ID. The HITL token verifier tries each Secret in turn. Caller must NOT log the Secret values.

func (*Store) GetWebhookByID added in v0.3.0

func (s *Store) GetWebhookByID(ctx context.Context, webhookID, userID string) (*Webhook, error)

GetWebhookByID returns the webhook iff it's owned by userID. Cross- user reads (or missing rows) return ErrWebhookNotFound — same not-found-on-cross-user convention used elsewhere in the codebase (conversation reads, message reads).

The returned Webhook has SigningSecret populated for the delivery worker's benefit; the public API layer scrubs this field before responding to GETs.

func (*Store) GetWebhookByIDInternal added in v0.3.0

func (s *Store) GetWebhookByIDInternal(ctx context.Context, webhookID string) (*Webhook, error)

GetWebhookByIDInternal returns the webhook by ID with no ownership check. INTERNAL USE ONLY — handler code MUST use GetWebhookByID which scopes by user_id. The retry worker uses this to look up the URL + signing secret for a delivery row whose ownership was already established when the publisher inserted it.

The suffix Internal mirrors the convention in dkim.GetDKIMKeyInternal: a method name that calls out "skipping the standard authorization check" so a reviewer doesn't have to read the body to know why.

func (*Store) HasAgentsOnDomain

func (s *Store) HasAgentsOnDomain(ctx context.Context, domain, userID string) (bool, error)

HasAgentsOnDomain checks whether the owned domain still has agents.

func (*Store) ListAPIKeys

func (s *Store) ListAPIKeys(ctx context.Context, userID string) ([]APIKey, error)

func (*Store) ListActivityByAgent

func (s *Store) ListActivityByAgent(ctx context.Context, agentID string, limit int) ([]Message, error)

func (*Store) ListAgentsByUser

func (s *Store) ListAgentsByUser(ctx context.Context, userID string) ([]AgentIdentity, error)

ListAgentsByUser returns all agents owned by the user, joined with domain verification AND enriched with per-agent stats for the dashboard. Five correlated subqueries compute inbound/outbound 7-day counts, pending approvals, last delivery, and webhook health in a single round-trip. Other load paths (GetAgentByID, GetAgentByEmail) intentionally don't compute these — only the dashboard needs them.

func (*Store) ListConversationsByAgent added in v0.3.0

func (s *Store) ListConversationsByAgent(ctx context.Context, f ConversationListFilter) ([]ConversationSummary, error)

ListConversationsByAgent groups the agent's non-expired messages by conversation_id and returns one row per conversation sorted by most-recent activity. Messages without a conversation_id are not included in any conversation — they remain individually visible via GetMessagesByAgent.

func (*Store) ListDomainsByUser

func (s *Store) ListDomainsByUser(ctx context.Context, userID string) ([]Domain, error)

ListDomainsByUser returns all domains owned by the user (excludes system rows). AgentCount is computed inline via a correlated subquery — one round-trip regardless of how many domains the user has, and the per-row count is cheap because (agent_identities.user_id, agent_identities.domain) is indexed via the existing idx_agent_identities_user.

func (*Store) ListEnabledWebhooksForRouting added in v0.3.0

func (s *Store) ListEnabledWebhooksForRouting(ctx context.Context, userID, eventType string) ([]Webhook, error)

ListEnabledWebhooksForRouting is the hot-path query used by the event publisher. Returns enabled webhooks for the user that subscribe to the given event type. The in-process publisher then applies filter matching in Go (cheaper than encoding the AND-across-types + OR-within-type rule in SQL at slice-1 scale).

The partial index idx_webhooks_user_enabled WHERE enabled = true keeps this O(log n) on the common case (a user has a small number of enabled webhooks).

func (*Store) ListExpiredPending

func (s *Store) ListExpiredPending(ctx context.Context, limit int) ([]ExpirationCandidate, error)

ListExpiredPending returns pending_approval messages whose approval_expires_at is in the past, joined with their agent's hitl_expiration_action. Ordered by approval_expires_at ASC so earliest-expired are handled first.

func (*Store) ListPendingOutboundForUser

func (s *Store) ListPendingOutboundForUser(ctx context.Context, userID string, limit int) ([]Message, error)

ListPendingOutboundForUser returns pending-approval messages across all of the user's agents, sorted by approval_expires_at ASC (expiring-soonest first). Body columns are not returned from this path — callers should use GetOutboundMessageForUser for detail.

func (*Store) ListSigningSecrets added in v0.3.0

func (s *Store) ListSigningSecrets(ctx context.Context, userID string) ([]SigningSecret, error)

ListSigningSecrets returns the user's secrets in most-recent-first order. Populates both Secret (plaintext) and SecretPrefix; callers that build a list shape for the dashboard get to choose which to surface.

func (*Store) ListWebhooksByUser added in v0.3.0

func (s *Store) ListWebhooksByUser(ctx context.Context, userID string) ([]Webhook, error)

ListWebhooksByUser returns every webhook owned by the user — used by the slice-2 GET /api/v1/webhooks endpoint. Storage layer surfaces enabled and disabled rows alike; filter at the handler if needed.

func (*Store) LookupConversationID

func (s *Store) LookupConversationID(ctx context.Context, agentID string, messageIDs []string) (string, error)

LookupConversationID finds a conversation_id by matching In-Reply-To / References message IDs against stored messages. Checks both email_message_id (inbound) and provider_message_id (outbound). Uses prefix matching because SES bare IDs stored in provider_message_id (e.g. <010f...>) may lack the @region.amazonses.com suffix that appears in the actual email headers sent to recipients.

func (*Store) LookupDomain

func (s *Store) LookupDomain(ctx context.Context, domain, userID string) (*Domain, error)

LookupDomain returns a domain if it exists and is owned by the given user.

func (*Store) MarkSendFailed added in v0.3.0

func (s *Store) MarkSendFailed(ctx context.Context, messageID, errMsg string) error

MarkSendFailed records that the upstream send failed for messageID, so the next ClaimSendAttempt is allowed to take over and retry. Idempotent against double-call: only updates rows still 'attempting'.

func (*Store) MarkSendSucceeded added in v0.3.0

func (s *Store) MarkSendSucceeded(ctx context.Context, messageID string, r SendResult) error

MarkSendSucceeded records the result of a successful upstream send. Idempotent against double-call: only updates rows still 'attempting' so a stray re-Mark from a buggy caller cannot overwrite a previous success or revive a failed row.

func (*Store) MaxWebhooksForUser added in v0.3.0

func (s *Store) MaxWebhooksForUser(ctx context.Context, userID string) (int, error)

func (*Store) ModifyMessageLabels added in v0.3.0

func (s *Store) ModifyMessageLabels(ctx context.Context, messageID, agentID string, add, remove []string) ([]string, error)

ModifyMessageLabels applies a delta — add then remove — to a message's labels[] in a single atomic statement. Returns the updated labels (deduplicated, sorted) so the caller can echo them back in the response without a second round-trip.

Inputs are assumed already normalized (lowercased, charset-validated, dedup'd within each list, e2a:* gated). The store layer:

  • applies adds first, then removes (so a label in both lists ends up removed)
  • rejects if the post-add total would exceed MaxLabelsPerMessage
  • returns ErrMessageNotFound if the row is missing / expired / cross-agent

The whole thing runs as one UPDATE so a concurrent PATCH from a second client can't observe a partial state.

func (*Store) RejectPending

func (s *Store) RejectPending(ctx context.Context, messageID, userID, reason string) (*Message, error)

RejectPending transitions a pending_approval message to rejected, records the reviewer's reason (empty string allowed), and scrubs body_text / body_html / attachments_json. Ownership checked; missing rows return ErrMessageNotFound. Non-pending rows return ErrNotPendingApproval.

func (*Store) ResolveOutboundOwner

func (s *Store) ResolveOutboundOwner(ctx context.Context, messageID string) (userID, agentID string, err error)

ResolveOutboundOwner looks up the user_id and agent_id for an outbound message without requiring the caller to know the user_id up-front. It exists for token-authenticated paths (magic-link approve/reject) where the HMAC token itself is the authorization and the handler just needs enough context to dispatch into the existing user-scoped store methods.

Returns ErrMessageNotFound if the message doesn't exist or isn't outbound. The returned user_id is guaranteed to own the message's agent (via the agent_identities.user_id join).

func (*Store) RotateSecret added in v0.3.0

func (s *Store) RotateSecret(ctx context.Context, webhookID, userID string) (newPlaintext string, prevExpiresAt time.Time, err error)

RotateSecret generates a new signing secret, moves the current secret into signing_secret_prev with a 24h expiry, and returns the new plaintext (shown once). During the 24h grace window the delivery worker dual-signs each request so receivers can verify with either secret while they update their handler.

func (*Store) SetDomainPrimary added in v0.3.0

func (s *Store) SetDomainPrimary(ctx context.Context, domain, userID string) error

SetDomainPrimary marks a domain as the user's primary in a single transaction: first clear any other primary belonging to the user, then set the requested domain. The partial unique index in migration 013 makes the clear-first step necessary — otherwise the two writes would race and one would fail with a unique violation.

Returns ErrDomainNotFound when the domain doesn't exist or isn't owned by the user.

func (*Store) TouchDomainLastChecked added in v0.3.0

func (s *Store) TouchDomainLastChecked(ctx context.Context, domain, userID string) error

TouchDomainLastChecked records that the verification probe ran. Call this from POST /api/v1/domains/{domain}/verify whether the probe succeeded or not — the LastCheckedAt column is "when did we last try", not "when did we last succeed" (the latter is verified_at).

func (*Store) TouchSigningSecretLastSigned added in v0.3.0

func (s *Store) TouchSigningSecretLastSigned(ctx context.Context, secretID string) error

TouchSigningSecretLastSigned records that the relay used this secret to sign a payload. Best-effort — failure is logged but does not block the actual signing operation.

func (*Store) UpdateAgentHITL

func (s *Store) UpdateAgentHITL(ctx context.Context, agentID, userID string, enabled bool, ttlSeconds int, expirationAction string) error

UpdateAgentHITL updates all three HITL settings on an agent owned by userID. The TTL and expiration action are validated against the same rules as the DB CHECK constraints so callers get a clean error rather than a raw SQL error.

func (*Store) UpdateAgentMode

func (s *Store) UpdateAgentMode(ctx context.Context, agentID, userID, agentMode, webhookURL string) error

func (*Store) UpdateAgentWebhook

func (s *Store) UpdateAgentWebhook(ctx context.Context, agentID, userID, webhookURL string) error

func (*Store) UpdateMessageDeliveryStatus

func (s *Store) UpdateMessageDeliveryStatus(ctx context.Context, messageID, agentID, status string) error

UpdateMessageDeliveryStatus sets the inbox_status on a message.

func (*Store) UpdateUserName added in v0.3.0

func (s *Store) UpdateUserName(ctx context.Context, userID, name string) (*User, error)

UpdateUserName persists a new display name on the user row and returns the updated User. Input validation (length, whitespace) is the caller's responsibility — this layer only enforces that the row exists.

func (*Store) UpdateWebhook added in v0.3.0

func (s *Store) UpdateWebhook(ctx context.Context, webhookID, userID string, u WebhookUpdate) (*Webhook, error)

UpdateWebhook applies a partial update to a webhook. Only fields with a non-nil pointer in WebhookUpdate are touched. Returns the updated row.

Validation (charset, count caps, agent ownership) is the handler's job; the storage layer enforces only the re-enable cooldown and the per-row CHECK constraints (events non-empty, url non-empty).

func (*Store) VerifyDomain

func (s *Store) VerifyDomain(ctx context.Context, domain, userID string) error

VerifyDomain marks a domain as verified, only if owned by the given user.

func (*Store) WithTx added in v0.3.0

func (s *Store) WithTx(ctx context.Context, fn func(tx pgx.Tx) error) error

WithTx opens a transaction, runs fn inside it, and commits if fn returns nil (or rolls back if fn returns an error). Used by the slice-3 relay refactor so the messages INSERT and the webhook_events outbox INSERT commit together, closing the at-least-once publish-loss window.

The relay handler is the primary v1 caller; future trigger sites (slice 4 outbound + HITL) reuse the same helper. Keeps callers from having to import pgxpool directly.

type UsageEventEntry

type UsageEventEntry struct {
	ID        string    `json:"id"`
	AgentID   string    `json:"agent_id"`
	Domain    string    `json:"domain"`
	Direction string    `json:"direction"`
	EventType string    `json:"event_type"`
	CreatedAt time.Time `json:"created_at"`

} // @name UsageEventEntry

UsageEventEntry is one row of the usage_events table for the user.

type User

type User struct {
	ID            string    `json:"id"`
	Email         string    `json:"email"`
	Name          string    `json:"name"`
	GoogleSubject string    `json:"-"`
	CreatedAt     time.Time `json:"created_at"`
}

type UserExport

type UserExport struct {
	GeneratedAt      time.Time              `json:"generated_at"`
	SchemaVersion    string                 `json:"schema_version"`
	User             UserExportUser         `json:"user"`
	Domains          []Domain               `json:"domains"`
	Agents           []AgentIdentity        `json:"agents"`
	APIKeys          []APIKeyExportEntry    `json:"api_keys"`
	Messages         []Message              `json:"messages"`
	UsageEvents      []UsageEventEntry      `json:"usage_events,omitempty"`
	OAuthConnections []OAuthConnectionEntry `json:"oauth_connections,omitempty"`

} // @name UserExport

UserExport is the structured dump returned by ExportUserData. It's designed to be a complete, machine-readable record of everything a single user owns in the system — the right-of-access counterpart to DeleteUserData below. Sensitive secrets are deliberately excluded:

  • API key plaintexts are not stored anywhere (only hashes), so they can't be exported even in principle.
  • google_subject is an internal OAuth identifier with no value to the user and is omitted on purpose.
  • User session tokens are transient and excluded.

type UserExportUser

type UserExportUser struct {
	ID        string    `json:"id"`
	Email     string    `json:"email"`
	Name      string    `json:"name"`
	CreatedAt time.Time `json:"created_at"`

} // @name UserExportUser

UserExportUser mirrors User but omits the google_subject internal identifier from the export payload.

type Webhook added in v0.3.0

type Webhook struct {
	ID                         string         `json:"id"`
	UserID                     string         `json:"user_id"`
	URL                        string         `json:"url"`
	Description                string         `json:"description"`
	Events                     []string       `json:"events"`
	Filters                    WebhookFilters `json:"filters"`
	SigningSecret              string         `json:"signing_secret,omitempty"`
	SigningSecretPrev          string         `json:"-"`
	SigningSecretPrevExpiresAt *time.Time     `json:"-"`
	Enabled                    bool           `json:"enabled"`
	AutoDisabledAt             *time.Time     `json:"auto_disabled_at,omitempty"`
	CreatedAt                  time.Time      `json:"created_at"`
	LastDeliveredAt            *time.Time     `json:"last_delivered_at,omitempty"`
}

Webhook is one row in the webhooks table.

SigningSecret carries the plaintext secret. It's populated on CreateWebhook responses (the caller's one chance to see the secret) and read by the delivery worker when signing the X-E2A-Signature header. Public API GET endpoints in slice 2 will scrub this field before responding so a stolen API key cannot exfiltrate webhook secrets via list/get.

SigningSecretPrev + SigningSecretPrevExpiresAt hold the previous secret during the 24h rotation grace window; slice 4 dual-signs using both during that window so receivers can roll forward.

type WebhookFilters added in v0.3.0

type WebhookFilters struct {
	AgentIDs        []string `json:"agent_ids,omitempty"`
	ConversationIDs []string `json:"conversation_ids,omitempty"`
	Labels          []string `json:"labels,omitempty"`
}

WebhookFilters is the structured form of webhooks.filters JSONB. Empty / nil slices mean "no constraint of that type" — a webhook with all-empty filters is a cross-cutting subscriber that matches every event of the right type for the owning user.

type WebhookUpdate added in v0.3.0

type WebhookUpdate struct {
	URL         *string
	Description *string
	Events      *[]string
	Filters     *WebhookFilters
	Enabled     *bool
}

WebhookUpdate carries the fields a PATCH can change. All fields are pointers (or "set-or-leave" flags) so handlers can distinguish "field present, set to X" from "field not present, leave unchanged".

Per the design, url / events / filters are full-replace fields (the sent value is canonical when present). Enabled is a toggle. Re-enable has a 5-minute cooldown — UpdateWebhook returns ErrWebhookCooldown when the caller tries to flip Enabled true within 5 minutes of auto_disabled_at.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL