Documentation
¶
Overview ¶
Package streaming runs a Murmur Pipeline in Live mode: it reads records from the pipeline's Source, applies the user's key/value extractors and monoid Combine, writes to the primary state store (and optionally a cache), and Acks the source record.
Phase 1 is intentionally a single-goroutine sequential loop. Per-partition parallelism is a Phase 2 optimization gated on real benchmarks. franz-go batches fetches under the hood; for counter-class workloads at moderate rates, single-goroutine processing is not the bottleneck.
Index ¶
- func Run[T any, V any](ctx context.Context, p *pipeline.Pipeline[T, V], opts ...RunOption) error
- func RunFanout[T any](ctx context.Context, src source.Source[T], pipes []Bound[T], ...) error
- type Bound
- type FanoutOption
- type RunOption
- func WithBatchTick(d time.Duration) RunOption
- func WithBatchWindow(window time.Duration, maxBatch int) RunOption
- func WithConcurrency(n int) RunOption
- func WithDeadLetter(fn func(eventID string, err error)) RunOption
- func WithDedup(d state.Deduper) RunOption
- func WithKeyDebounce(window time.Duration, maxKeys int) RunOption
- func WithMaxAttempts(n int) RunOption
- func WithMetrics(r metrics.Recorder) RunOption
- func WithRetryBackoff(base, max time.Duration) RunOption
- func WithValueDebounce(window time.Duration, maxKeys int) RunOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Run ¶
Run executes the pipeline in Live mode until ctx is canceled or the source returns. Returns nil on graceful shutdown.
Per-record processing errors are retried up to MaxAttempts with exponential backoff, then dead-lettered (callback + RecordError) and skipped. The runtime itself only returns early if the source returns or ctx is canceled — a stuck downstream does not take the worker down with it.
The pipeline must have been configured with From / Key / Value / Aggregate / StoreIn before Run is called; Build is invoked internally to validate.
func RunFanout ¶
func RunFanout[T any]( ctx context.Context, src source.Source[T], pipes []Bound[T], opts ...FanoutOption, ) error
RunFanout runs N pipelines against ONE shared source. Each pipeline drains its own buffered channel; the source pump tees every record to all channels.
Use case: count-core-style "many counters per event." One Kafka topic of like-events drives:
- a per-post counter pipeline
- a per-user counter pipeline
- a global trending pipeline (Misra-Gries TopK)
- a Decayed-sum "hot" score pipeline
All four consume the same records. Without fanout, you'd open four Kafka consumer-group connections from one process (4× the broker load) or run four worker processes (4× the deployment surface). With fanout, one source pump feeds all four.
Ack semantics — counted teeing ¶
Each record's source-side Ack is wrapped so it fires the underlying source.Ack only after EVERY pipeline has called .Ack() on its copy. This means:
- The slowest pipeline gates source offset advancement.
- On worker restart the source replays from the last fully-acked offset; pipelines that already processed see duplicates, dedup catches them.
- A stuck pipeline pins the source — same backpressure semantics as a single-pipeline streaming.Run, but multiplied across N consumers.
Per-pipeline backpressure ¶
Each pipeline has a buffered channel (default 1024 records). When it fills, the source pump blocks on the slow pipeline. Other pipelines continue draining their own channels until the slow one catches up. This is the right shape: Kafka offset advancement is already gated on the slow pipeline's Ack, so blocking the pump matches that constraint.
Failure model ¶
- Source error: returned to the caller; pipelines drain their remaining channel content and exit cleanly.
- Pipeline error: surfaces from RunFanout's joined error. Other pipelines continue.
- Ctx cancel: all pipelines stop after draining; final aggregator flush runs under a 30-second detached context.
Types ¶
type Bound ¶
type Bound[T any] struct { // contains filtered or unexported fields }
Bound is one pipeline registered for fanout. Use Bind[T, V](pipe, opts...) to construct one and pass it to RunFanout. The V type parameter is hidden behind the closure so the multi-runtime can hold pipelines of heterogeneous aggregation types in a single slice.
func Bind ¶
Bind ties a pipeline to a set of streaming RunOptions and returns a Bound[T] suitable for RunFanout. The pipeline's Source field is IGNORED — RunFanout supplies a single shared source upstream and tees records to every Bound consumer.
All pipelines in one fanout must share the same input record type T; each pipeline can have its own aggregation type V.
type FanoutOption ¶
type FanoutOption func(*fanoutConfig)
FanoutOption configures RunFanout.
func WithFanoutBuffer ¶
func WithFanoutBuffer(n int) FanoutOption
WithFanoutBuffer sets the per-pipeline channel buffer size. Default 1024. Larger buffers absorb slow-pipeline backpressure at the cost of memory; the slowest pipeline gates source-side Ack progress (see "Per-pipeline backpressure" in the package doc), so the buffer is the slack a fast pipeline can run ahead before the slow one stalls everyone.
type RunOption ¶
type RunOption func(*runConfig)
RunOption configures Run.
func WithBatchTick ¶
WithBatchTick overrides the RecordBatch flush interval. The default 1 s fits most production deployments; raise it to reduce metrics chatter for very slow pipelines, or pass <= 0 to disable periodic batch flushes entirely.
func WithBatchWindow ¶
WithBatchWindow enables write aggregation: per-(entity, bucket) deltas are accumulated in memory for `window` time, then flushed as a SINGLE store.MergeUpdate per key. This is the only way to keep up with hot keys at scale — a celebrity post taking 50k like-events/sec lands as ONE atomic-ADD per flush window per worker instead of 50k.
Tradeoffs:
- Latency: a record's contribution is invisible to readers for up to `window` time after acceptance. Default unset (no batching) gives immediate-merge semantics; common production values are 500ms–5s depending on read-staleness tolerance.
- Durability under crash: records are Ack'd to the source AFTER the batch flushes, so a worker crash loses at most `window`-worth of in-flight records (the source replays them on restart, dedup catches the redelivery).
- Memory: at most `maxBatch` records per (entity, bucket) before forced flush. Default 1024 if unset. The number of concurrent keys in flight is unbounded — for high-cardinality pipelines (per-user keys), keep `window` short so each batch stays small.
Pass window = 0 (or omit the option) to disable batching entirely; records flow through processor.MergeMany one at a time.
Strongly recommended for any pipeline with a known hot-key distribution.
func WithConcurrency ¶
WithConcurrency configures the runtime's intra-worker parallelism. Default 1 (the historical single-goroutine loop). The value is reused in two places:
**Source-dispatch routing.** When no aggregator is configured (no WithBatchWindow), N worker goroutines drain the source channel and records are routed by hash(first-emitted-key) — so SAME-key records always land on the same worker. This preserves per-key arrival order for non-additive monoids (First, Last, Min, Max) where ordering still matters.
**Aggregator flush fan-out.** When WithBatchWindow IS configured, each (entity, bucket) batch has already coalesced its contributions into ONE delta at accept-time, so per-key flushes touch distinct store keys. flushAll dispatches the per-key MergeUpdates across N worker goroutines (bounded by the number of distinct keys in the flush). This is the change that unlocks real I/O parallelism inside a single worker instance: a flush touching 1000 keys against a 5ms-per-call store completes in ~5s at concurrency=1 vs ~315ms at concurrency=16.
When to raise it:
- The single-goroutine ceiling (~5–10k events/s/worker against DDB-local; documented in README) is your bottleneck.
- The aggregator flush is bottlenecked on store latency (large coalesced-key set per window, DDB UpdateItem at ~5ms each).
- Fan-out via additional Kafka partitions isn't an option (e.g., downstream key cardinality demands a single partition).
When NOT to raise it:
- Your store is CAS-heavy (BytesStore on a hot key). N workers pre-routing by key removes cross-worker CAS contention but a single worker on a hot CAS key is already the bottleneck.
- You have very few distinct keys per flush window — the flush fan-out is bounded by len(keys), so concurrency above that produces no extra speedup.
The source-dispatch router routes by first-key hash. Hierarchical- rollup pipelines (KeyByMany) partition on the FIRST emitted key — so a record's "post:X" delta lands on the same worker as another "post:X" record, even if the records also fan out to "country:Y" / "global" keys that happen to hash differently. This keeps the per-key ordering guarantee meaningful where it matters most.
func WithDeadLetter ¶
WithDeadLetter installs a callback invoked when a record fails every attempt. Use it to push the record's EventID (and the underlying error) to a DLQ table / topic / structured-log sink. Default is a no-op — the record is silently dropped and the runtime moves on. The runtime ALSO calls recorder.RecordError so DLQ counts surface in /api metrics regardless.
func WithDedup ¶
WithDedup installs a state.Deduper. Each record's EventID is claimed via MarkSeen before the merge runs; if MarkSeen reports the EventID was already claimed (a duplicate from a worker crash mid-write), the runtime skips the merge and Acks the record so the source advances.
This makes at-least-once delivery idempotent at the monoid layer for any pipeline whose Combine is non-commutative or non-idempotent (e.g. Sum, HLL.Add). Pipelines whose Combine is already idempotent (Set, Min, Max) don't strictly need a Deduper but adding one is harmless.
The Deduper itself is typically backed by a small DDB table with TTL — see pkg/state/dynamodb.NewDeduper.
func WithKeyDebounce ¶
WithKeyDebounce drops records whose first-emitted key was already seen within `window`. The dropped record's Ack still fires (so the source advances) but no processor work is done — same shape as the dedup-skip path, but keyed by entity rather than EventID.
**Use cases.** This is the producer-side debounce primitive count-core asked for in their integration review. The canonical pattern: a service emits a cache-fill event per cache miss; bursts of misses on the same hot key at top-of-hour produce N redundant events, all carrying the same value. With WithKeyDebounce(5*time.Second, 10000), the first event lands, the next N-1 within 5s are dropped. The underlying source advances (Ack fires); the store is touched once.
**Critical safety constraint.** Debouncing CHANGES SEMANTICS for non-idempotent monoids:
- Safe with: Max, Min, Set, Last (cache-fill / absolute-value patterns where N events for the same key carry the same value).
- Safe with: HLL, Bloom (idempotent under duplicate inputs).
- **NOT SAFE with: Sum, Count, TopK** (dropping deltas UNDER-COUNTS).
There's no runtime check on the monoid kind — the option is shipped with the assumption the caller knows what they're doing. Pair with `Int64MaxStore` (this package) for the count-core SetCountIfGreater pattern.
**Differences from WithDedup.** WithDedup skips records whose EventID has already been claimed (per-event idempotency, typically backed by DDB with TTL). WithKeyDebounce skips records whose KEY has been seen within an in-process time window — much cheaper (no DDB roundtrip, no TTL writes), much narrower (only safe for idempotent / absolute-value monoids).
**Differences from WithBatchWindow.** WithBatchWindow accumulates per-key deltas into a single store write per flush — it KEEPS all the records' contributions, just merged. WithKeyDebounce DROPS all but the first record per key per window. The two compose: a pipeline can use both, where debounce eliminates duplicate-cache- miss noise and the batch window then collapses any remaining writes per key.
func WithMaxAttempts ¶
WithMaxAttempts sets the per-record retry budget. Defaults to 3. A record that errors all the way through MaxAttempts is dead-lettered (see WithDeadLetter) and the runtime continues with the next record rather than crashing.
Set to 1 to disable retries (any error → dead-letter immediately).
func WithMetrics ¶
WithMetrics installs a metrics.Recorder on the runtime. Defaults to metrics.Noop{}.
func WithRetryBackoff ¶
WithRetryBackoff sets the per-attempt sleep schedule. Backoff doubles after each failure starting from base, capped at max, with full jitter. Defaults to 50 ms base / 5 s max.
func WithValueDebounce ¶
WithValueDebounce drops records when the same (entityKey, value) pair was already processed within `window`. Backed by an in-process bounded cache (2× maxKeys); on overflow, oldest entries past the window are evicted.
**Use case.** A service emits absolute-value records on every change notification, but most events carry the SAME value as the previous emission (e.g. Mongo CDC fires for every document touch including no-op updates). WithValueDebounce drops those unchanged emissions before they hit the processor + store.
**Compared to WithKeyDebounce.** WithKeyDebounce drops every record after the first for the same key in the window — works only if every event for a given key carries the same logical value. WithValueDebounce drops only when the VALUE also matches; a value change on the same key always passes through.
**Safety.** Like WithKeyDebounce, debouncing changes semantics for non-idempotent monoids:
- Safe with: Max, Min, Set, Last, Monotonic — when an event re-emits the same absolute value, that's a semantic no-op.
- Safe with: HLL, Bloom — idempotent under duplicate inputs.
- **NOT SAFE with: Sum, Count, TopK** — duplicate inputs accumulate; dropping under-counts.
Pair with `Int64MaxStore` + `core.Monotonic[int64]` for the SetCountIfGreater pattern: the monoid logic accepts only rising values; this debounce strips repeated identical emissions BEFORE they reach the store, saving DDB writes.
**Composes with WithKeyDebounce / WithBatchWindow / WithConcurrency.** All four can be combined. Order of application: WithKeyDebounce first (drops same-key duplicates regardless of value), then WithValueDebounce (drops same-(key,value) within window), then per-EventID dedup, then batch window.