ingest

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2026 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

Package ingest implements the analytics write path: POST /api/v1/ingest/events → validated batch → in-memory channel → flusher → ClickHouse events_raw_v1.

Pipeline:

    handler ──► Submit (atomic pending++, chan <- env)
                          │
                          ▼
                  ┌───── chan batchEnvelope ─────┐
                  │                              │
                  ▼                              │
            flusher goroutine                    │
            (accumulate to FlushEvents or                    │
             tick at FlushInterval)                          │
                  │
            ┌─────┴──────┐
            │            │
            ▼            ▼
   CH INSERT ok    CH INSERT fails
            │            │
            │       InsertFailedEvent (DLQ row, batch as JSONB)
            │            │
            │       ┌────┴─────┐
            │       ▼          ▼
            │   PG ok      PG also fails
            │   │             │
            │   continue   SetFatal(true)
            ▼
          drain.atomic.pending -= len(batch)

DLQ drain goroutine (every FlushInterval):
    ListFailedEventsForDrain LIMIT N
          │
          ├── CH ok                 ──► DeleteFailedEvent
          └── CH fails              ──► IncrementFailedEventAttempt
                  attempt+1 ≥ 20    ──► QuarantineFailedEvent
                  age ≥ 24h         ──► QuarantineFailedEvent
    After pass: Flags.SetDLQDepth(CountActiveFailedEvents)

Token paths stay strictly separate. /api/v1/ingest/events uses LookupIngestToken against api_tokens (mere_pub_…); /api/v1/whoami, /api/v1/projects/*, and /mcp use oauth.Service.LookupActiveAccessToken against oauth_access_tokens. The two surfaces share no code, and the OAuth path rejects mere_pub_ at the prefix step.

Index

Constants

This section is empty.

Variables

View Source
var ErrChannelFull = errors.New("ingest: channel full")

ErrChannelFull is returned by Submit when the in-flight event count would exceed the configured ceiling. Surfaced to clients as 503 + Retry-After: 1.

Functions

func ValidateBatch

func ValidateBatch(events []Event) (valid []Event, rejected []ValidationError)

ValidateBatch returns a sanitized event slice + a per-index rejection list. Pure: no I/O, no clock reads on the request.

Rules — minimal, lenient:

  • `event` must be non-empty (the only required field besides timestamp).
  • `timestamp` must not be the zero time.
  • `properties` / `extras` default to `{}` if absent or empty so the ClickHouse String column never lands a NULL.

anonymous_id, user_id, and session_id stay optional (Nullable in events_raw_v1). Events without identity are queryable but do not contribute to persons.

Types

type Event

type Event struct {
	Event       string          `json:"event"`
	AnonymousID *string         `json:"anonymous_id,omitempty"`
	UserID      *string         `json:"user_id,omitempty"`
	Timestamp   time.Time       `json:"timestamp"`
	SessionID   *string         `json:"session_id,omitempty"`
	Properties  json.RawMessage `json:"properties,omitempty"`
	Extras      json.RawMessage `json:"extras,omitempty"`
}

Event is the wire-stable representation of a single analytics event.

DO NOT rename fields. failed_events.batch_payload stores Event JSON captured at submission time, so a rename here would orphan in-flight DLQ rows on the next deploy.

func (*Event) UnmarshalJSON

func (e *Event) UnmarshalJSON(data []byte) error

UnmarshalJSON decodes an event with the "strict on required, lenient on extras" contract: event, timestamp, anonymous_id, user_id, session_id, and properties are first-class; every other top-level field is folded verbatim into extras, so callers can attach arbitrary fields without a schema migration and we never reject an unknown field. camelCase aliases (anonymousId, userId, sessionId) are also accepted for browser SDK payloads. An explicit "extras" object, if present, is the base that stray fields merge on top of (stray fields win on collision). timestamp accepts an ISO 8601 / RFC 3339 string or a number of epoch milliseconds.

A custom decoder (rather than struct tags) is what makes the leniency work: the standard decoder's DisallowUnknownFields, set on the ingest request body, does not reach into a json.Unmarshaler. Default marshaling round-trips this cleanly — extras serializes back as the "extras" field, which this decoder recognizes — so the DLQ replay path is unaffected.

type Flags

type Flags struct {
	// contains filtered or unexported fields
}

Flags exposes the process-level ingest state: a fatal flag set when both CH and the PG dead-letter queue fail in the same flush, a disabled flag for the INGEST_DISABLED kill switch + SIGTERM phase 1, and a DLQ depth gauge refreshed once per drain pass.

All three are atomic; readers are on the hot path (every /api/v1/ingest/events request + /healthz poll).

func (*Flags) DLQDepth

func (f *Flags) DLQDepth() int64

DLQDepth returns the count refreshed by the drain goroutine at the end of each pass. /healthz compares it against DLQDepth503Threshold.

func (*Flags) IsDisabled

func (f *Flags) IsDisabled() bool

IsDisabled reports whether the kill switch is active. Triggered at boot by INGEST_DISABLED=true and by SIGTERM phase 1 immediately before HTTP shutdown.

func (*Flags) IsFatal

func (f *Flags) IsFatal() bool

IsFatal reports whether the flusher has hit a CH+PG dual failure since the last clean flush. While true, /api/v1/ingest/events returns 503 + /healthz reports down. Cleared by the first clean flush after CH recovers.

func (*Flags) SetDLQDepth

func (f *Flags) SetDLQDepth(v int64)

SetDLQDepth is called by the drain goroutine after a CountActiveFailedEvents.

func (*Flags) SetDisabled

func (f *Flags) SetDisabled(v bool)

SetDisabled flips the kill switch.

func (*Flags) SetFatal

func (f *Flags) SetFatal(v bool)

SetFatal records or clears the fatal state.

type Options

type Options struct {
	EventBuffer          int
	FlushEvents          int
	FlushInterval        time.Duration
	ShutdownGrace        time.Duration
	Disabled             bool
	MaxBodyBytes         int64
	DLQDrainBatchLimit   int
	DLQDepth503Threshold int

	// UsageSink is the extension seam called after a batch durably lands in
	// ClickHouse, with (projectID, eventCount), so a wrapper can meter per-tenant
	// volume (see docs/extending.md). Nil defaults to the no-op extension.Discard,
	// so the open-source build counts nothing.
	UsageSink extension.UsageSink
}

Options configures a Service. Built from config.Config in cmd/server.

EventBuffer is the atomic pending-event ceiling; FlushEvents is the per-batch flush trigger; FlushInterval is the time-based fallback; ShutdownGrace bounds SIGTERM phase 3; Disabled seeds the kill switch; MaxBodyBytes is informational (the web layer enforces); DLQDrainBatchLimit caps each drain pass; DLQDepth503Threshold is informational (web layer enforces).

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service owns the ingest pipeline's runtime state: the in-memory channel, the flusher + DLQ goroutines, the flags surface, and the handles to the two backing stores.

func NewService

func NewService(pool *pgxpool.Pool, ch *sql.DB, opts Options, logger *slog.Logger) *Service

NewService constructs a Service. Call Start to launch the flusher + DLQ goroutines and Shutdown on SIGTERM phase 3 to drain.

func (*Service) Flags

func (s *Service) Flags() *Flags

Flags exposes the process-level ingest state surface so handlers + the /healthz route can read fatal / disabled / DLQ depth without poking the Service's internals.

func (*Service) LookupIngestToken

func (s *Service) LookupIngestToken(ctx context.Context, plaintext string) (string, error)

LookupIngestToken resolves a plaintext mere_pub_… bearer to its project ID.

Returns ("", nil) for the three "no project" cases (missing prefix, no row, or soft-deleted project) — the middleware uniformly turns those into 401. Returns ("", err) on infrastructure failure (PG down, network blip); the middleware *must* propagate that as 500, never 401, so PG-outage telemetry doesn't look like an attack signal.

func (*Service) OptionsView

func (s *Service) OptionsView() Options

Options returns the configured option bag (mostly for the web layer to pull MaxBodyBytes and DLQDepth503Threshold off a single dependency).

func (*Service) Shutdown

func (s *Service) Shutdown(ctx context.Context) error

Shutdown is SIGTERM phase 3. It signals the goroutines to stop, closes the envelope channel so the flusher can drain remaining work, and waits up to the provided ctx deadline. If the deadline fires while envelopes are still in flight, the residual is captured into a single failed_events row so nothing is silently dropped (Issue 6A).

Safe to call more than once.

func (*Service) Start

func (s *Service) Start(ctx context.Context)

Start launches the flusher + DLQ drain goroutines. Safe to call more than once; only the first call has effect.

func (*Service) Submit

func (s *Service) Submit(_ context.Context, projectID string, events []Event) error

Submit hands a validated event slice to the in-memory channel. Returns ErrChannelFull when the in-flight event count would exceed EventBuffer; the handler translates that to 503 + Retry-After: 1.

Submit does not block on a full channel — backpressure is fail-fast so a slow flusher can't hold open client connections forever.

type ValidationError

type ValidationError struct {
	Index  int    `json:"index"`
	Reason string `json:"reason"`
}

ValidationError reports a per-event problem inside a submitted batch. Index is the position within the request's events array (0-based) so the client can correlate.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL