Documentation
¶
Overview ¶
Package ingest implements the analytics write path: POST /api/v1/ingest/events → validated batch → in-memory channel → flusher → ClickHouse events_raw_v1.
Pipeline:
handler ──► Submit (atomic pending++, chan <- env)
│
▼
┌───── chan batchEnvelope ─────┐
│ │
▼ │
flusher goroutine │
(accumulate to FlushEvents or │
tick at FlushInterval) │
│
┌─────┴──────┐
│ │
▼ ▼
CH INSERT ok CH INSERT fails
│ │
│ InsertFailedEvent (DLQ row, batch as JSONB)
│ │
│ ┌────┴─────┐
│ ▼ ▼
│ PG ok PG also fails
│ │ │
│ continue SetFatal(true)
▼
drain.atomic.pending -= len(batch)
DLQ drain goroutine (every FlushInterval):
ListFailedEventsForDrain LIMIT N
│
├── CH ok ──► DeleteFailedEvent
└── CH fails ──► IncrementFailedEventAttempt
attempt+1 ≥ 20 ──► QuarantineFailedEvent
age ≥ 24h ──► QuarantineFailedEvent
After pass: Flags.SetDLQDepth(CountActiveFailedEvents)
Token paths stay strictly separate. /api/v1/ingest/events uses LookupIngestToken against api_tokens (mere_pub_…); /api/v1/whoami, /api/v1/projects/*, and /mcp use oauth.Service.LookupActiveAccessToken against oauth_access_tokens. The two surfaces share no code, and the OAuth path rejects mere_pub_ at the prefix step.
Index ¶
- Variables
- func ValidateBatch(events []Event) (valid []Event, rejected []ValidationError)
- type Event
- type Flags
- type Options
- type Service
- func (s *Service) Flags() *Flags
- func (s *Service) LookupIngestToken(ctx context.Context, plaintext string) (string, error)
- func (s *Service) OptionsView() Options
- func (s *Service) Shutdown(ctx context.Context) error
- func (s *Service) Start(ctx context.Context)
- func (s *Service) Submit(_ context.Context, projectID string, events []Event) error
- type ValidationError
Constants ¶
This section is empty.
Variables ¶
var ErrChannelFull = errors.New("ingest: channel full")
ErrChannelFull is returned by Submit when the in-flight event count would exceed the configured ceiling. Surfaced to clients as 503 + Retry-After: 1.
Functions ¶
func ValidateBatch ¶
func ValidateBatch(events []Event) (valid []Event, rejected []ValidationError)
ValidateBatch returns a sanitized event slice + a per-index rejection list. Pure: no I/O, no clock reads on the request.
Rules — minimal, lenient:
- `event` must be non-empty (the only required field besides timestamp).
- `timestamp` must not be the zero time.
- `properties` / `extras` default to `{}` if absent or empty so the ClickHouse String column never lands a NULL.
anonymous_id, user_id, and session_id stay optional (Nullable in events_raw_v1). Events without identity are queryable but do not contribute to persons.
Types ¶
type Event ¶
type Event struct {
Event string `json:"event"`
AnonymousID *string `json:"anonymous_id,omitempty"`
UserID *string `json:"user_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
SessionID *string `json:"session_id,omitempty"`
Properties json.RawMessage `json:"properties,omitempty"`
Extras json.RawMessage `json:"extras,omitempty"`
}
Event is the wire-stable representation of a single analytics event.
DO NOT rename fields. failed_events.batch_payload stores Event JSON captured at submission time, so a rename here would orphan in-flight DLQ rows on the next deploy.
func (*Event) UnmarshalJSON ¶
UnmarshalJSON decodes an event with the "strict on required, lenient on extras" contract: event, timestamp, anonymous_id, user_id, session_id, and properties are first-class; every other top-level field is folded verbatim into extras, so callers can attach arbitrary fields without a schema migration and we never reject an unknown field. camelCase aliases (anonymousId, userId, sessionId) are also accepted for browser SDK payloads. An explicit "extras" object, if present, is the base that stray fields merge on top of (stray fields win on collision). timestamp accepts an ISO 8601 / RFC 3339 string or a number of epoch milliseconds.
A custom decoder (rather than struct tags) is what makes the leniency work: the standard decoder's DisallowUnknownFields, set on the ingest request body, does not reach into a json.Unmarshaler. Default marshaling round-trips this cleanly — extras serializes back as the "extras" field, which this decoder recognizes — so the DLQ replay path is unaffected.
type Flags ¶
type Flags struct {
// contains filtered or unexported fields
}
Flags exposes the process-level ingest state: a fatal flag set when both CH and the PG dead-letter queue fail in the same flush, a disabled flag for the INGEST_DISABLED kill switch + SIGTERM phase 1, and a DLQ depth gauge refreshed once per drain pass.
All three are atomic; readers are on the hot path (every /api/v1/ingest/events request + /healthz poll).
func (*Flags) DLQDepth ¶
DLQDepth returns the count refreshed by the drain goroutine at the end of each pass. /healthz compares it against DLQDepth503Threshold.
func (*Flags) IsDisabled ¶
IsDisabled reports whether the kill switch is active. Triggered at boot by INGEST_DISABLED=true and by SIGTERM phase 1 immediately before HTTP shutdown.
func (*Flags) IsFatal ¶
IsFatal reports whether the flusher has hit a CH+PG dual failure since the last clean flush. While true, /api/v1/ingest/events returns 503 + /healthz reports down. Cleared by the first clean flush after CH recovers.
func (*Flags) SetDLQDepth ¶
SetDLQDepth is called by the drain goroutine after a CountActiveFailedEvents.
type Options ¶
type Options struct {
EventBuffer int
FlushEvents int
FlushInterval time.Duration
ShutdownGrace time.Duration
Disabled bool
MaxBodyBytes int64
DLQDrainBatchLimit int
DLQDepth503Threshold int
// UsageSink is the extension seam called after a batch durably lands in
// ClickHouse, with (projectID, eventCount), so a wrapper can meter per-tenant
// volume (see docs/extending.md). Nil defaults to the no-op extension.Discard,
// so the open-source build counts nothing.
UsageSink extension.UsageSink
}
Options configures a Service. Built from config.Config in cmd/server.
EventBuffer is the atomic pending-event ceiling; FlushEvents is the per-batch flush trigger; FlushInterval is the time-based fallback; ShutdownGrace bounds SIGTERM phase 3; Disabled seeds the kill switch; MaxBodyBytes is informational (the web layer enforces); DLQDrainBatchLimit caps each drain pass; DLQDepth503Threshold is informational (web layer enforces).
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service owns the ingest pipeline's runtime state: the in-memory channel, the flusher + DLQ goroutines, the flags surface, and the handles to the two backing stores.
func NewService ¶
NewService constructs a Service. Call Start to launch the flusher + DLQ goroutines and Shutdown on SIGTERM phase 3 to drain.
func (*Service) Flags ¶
Flags exposes the process-level ingest state surface so handlers + the /healthz route can read fatal / disabled / DLQ depth without poking the Service's internals.
func (*Service) LookupIngestToken ¶
LookupIngestToken resolves a plaintext mere_pub_… bearer to its project ID.
Returns ("", nil) for the three "no project" cases (missing prefix, no row, or soft-deleted project) — the middleware uniformly turns those into 401. Returns ("", err) on infrastructure failure (PG down, network blip); the middleware *must* propagate that as 500, never 401, so PG-outage telemetry doesn't look like an attack signal.
func (*Service) OptionsView ¶
Options returns the configured option bag (mostly for the web layer to pull MaxBodyBytes and DLQDepth503Threshold off a single dependency).
func (*Service) Shutdown ¶
Shutdown is SIGTERM phase 3. It signals the goroutines to stop, closes the envelope channel so the flusher can drain remaining work, and waits up to the provided ctx deadline. If the deadline fires while envelopes are still in flight, the residual is captured into a single failed_events row so nothing is silently dropped (Issue 6A).
Safe to call more than once.
func (*Service) Start ¶
Start launches the flusher + DLQ drain goroutines. Safe to call more than once; only the first call has effect.
func (*Service) Submit ¶
Submit hands a validated event slice to the in-memory channel. Returns ErrChannelFull when the in-flight event count would exceed EventBuffer; the handler translates that to 503 + Retry-After: 1.
Submit does not block on a full channel — backpressure is fail-fast so a slow flusher can't hold open client connections forever.
type ValidationError ¶
ValidationError reports a per-event problem inside a submitted batch. Index is the position within the request's events array (0-based) so the client can correlate.