idempotency

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package idempotency implements the storage backing the Idempotency-Key header on the outbound send endpoints.

Stripe-style semantics:

  • Caller sends `Idempotency-Key: <string>` on a POST that has a side-effect (creating an outbound email).
  • Server scopes the key by (user_id, key) and remembers it for TTL.
  • Same key + same request body hash → replay the cached response; do NOT redo the side effect.
  • Same key + different request body hash → 422 (mismatch).
  • Same key, prior request still in-flight → 409.

Why (user_id, key) and not (api_key_id, key): the request path's authenticator (internal/agent.API.authenticateUser) returns only the owning user, and threading the credential id through every handler is invasive. UUIDv4 collisions across one user's keys are mathematically negligible, and the body-hash check catches the pathological collision case explicitly with a 422. Stripe scopes at the account level for the same reason.

Index

Constants

View Source
const MaxKeyLength = 255

MaxKeyLength caps Idempotency-Key header values. Past this the header is rejected at the request boundary (400). Matches the upper-bound Stripe documents.

View Source
const StaleClaimWindow = 5 * time.Minute

StaleClaimWindow is the age past which an in_progress row is treated as the leftover of a crashed handler and may be taken over by the next caller. Bounded above by SMTPRelay's worst-case retry envelope (~6.5min) but kept tighter so legitimate stalls produce a loud 409 to the second caller rather than silently double-sending.

View Source
const SweepInterval = 1 * time.Hour

SweepInterval is the cadence the cmd/e2a hourly cleanup loop uses when invoking Sweep. Exposed as a constant so the loop and the docstring stay consistent.

View Source
const TTL = 24 * time.Hour

TTL is how long a completed row stays cached (and continues rejecting body-mismatched replays) before the sweep removes it.

Variables

This section is empty.

Functions

func HashBody

func HashBody(body []byte) string

HashBody returns the sha256 hex of the raw request body. Exposed so callers (the HTTP layer) can compute it once from the bytes they already read, rather than re-reading or re-encoding.

func HashRequest

func HashRequest(path string, body []byte) string

HashRequest mixes the request path into the body hash so the same Idempotency-Key cannot accidentally replay a cached response across different routes (e.g. reusing a key set on a reply to message A while replying to message B). The path is included on its own line before the body so the hash is unambiguous about where path ends and body begins.

Callers compute this once with the raw body bytes they already have and pass the result to Store.Claim.

Types

type CachedResponse

type CachedResponse struct {
	StatusCode  int
	ContentType string
	Body        []byte
}

CachedResponse is what the server replays for an OutcomeReplay. Content-Type is captured separately so the replay is wire-faithful.

type ClaimOutcome

type ClaimOutcome int

ClaimOutcome describes what happened when a caller tried to claim (user_id, key) for a fresh request.

const (
	// OutcomeAcquired — caller owns the row and MUST follow up with
	// Complete (success path) or Release (caller-side abort that
	// did not produce a side-effect, e.g. early validation failure).
	OutcomeAcquired ClaimOutcome = iota
	// OutcomeReplay — a previous request with this key completed
	// successfully and the body hash matches. Serve the cached
	// response verbatim.
	OutcomeReplay
	// OutcomeMismatch — a previous completed request used this key
	// with a different body. Refuse with 422.
	OutcomeMismatch
	// OutcomeInFlight — a concurrent request with this key is still
	// being processed (and the row is not stale). Refuse with 409.
	OutcomeInFlight
)

type ClaimResult

type ClaimResult struct {
	Outcome ClaimOutcome
	Cached  CachedResponse
}

ClaimResult bundles the outcome with the cached response when the outcome is OutcomeReplay. Other outcomes leave Cached zero-valued.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is the postgres-backed idempotency store.

func NewStore

func NewStore(pool *pgxpool.Pool) *Store

NewStore wraps a pgx pool. The pool must already have the schema from migrations/015_idempotency_and_send_attempts.sql applied.

func (*Store) Claim

func (s *Store) Claim(ctx context.Context, userID, key, path, bodyHash string) (ClaimResult, error)

Claim atomically reserves (userID, key) for the caller. The outcome determines what the HTTP layer should do next:

  • OutcomeAcquired → run the side effect, then Complete on success or Release on caller-side abort.
  • OutcomeReplay → write Cached to the response, do NOT re-run the side effect.
  • OutcomeMismatch → respond 422.
  • OutcomeInFlight → respond 409.

path is the request route (e.g. "/api/v1/send") and is stored only for diagnostics; it is NOT part of the dedup key, so the same idempotency key cannot be reused across different routes (Stripe documents the same behavior — surfaces caller bugs faster than a silent allow).

Concurrency: the claim is decided in a single UPSERT with RETURNING. Two concurrent callers can never both observe OutcomeAcquired because only one of the racing statements has its row materialized in RETURNING — the other either hits the conflict path (DO UPDATE WHERE not stale → row skipped in RETURNING) or, if the row was stale, the takeover serializes on the unique-index lock and the loser sees a non-stale in_progress on its follow-up read.

func (*Store) Complete

func (s *Store) Complete(ctx context.Context, userID, key string, resp CachedResponse) error

Complete records the response for an OutcomeAcquired claim. After this point any subsequent Claim with the same key either replays (body matches) or 422s (body differs), until the row is swept.

Idempotent against double-call: only updates rows still marked in_progress, so a stray re-Complete from a buggy caller cannot overwrite an already-cached response.

func (*Store) Release

func (s *Store) Release(ctx context.Context, userID, key string) error

Release drops an OutcomeAcquired claim without recording a response. Use when the caller decided not to perform the side effect (e.g. the request failed validation before any external work happened), so the next caller with the same key can try again with a fresh payload rather than getting OutcomeMismatch on the second attempt.

Only deletes in_progress rows so it cannot accidentally wipe a completed cache entry.

func (*Store) Sweep

func (s *Store) Sweep(ctx context.Context) (int64, error)

Sweep removes completed rows older than TTL. Returns the count deleted. Wire this into the cmd/e2a hourly cleanup loop.

Does NOT delete in_progress rows past StaleClaimWindow on its own — those are taken over by the next Claim via the UPSERT WHERE clause, which keeps the takeover path concentrated in one place (the concurrency model lives in Claim, not split across two functions).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL