Documentation
¶
Overview ¶
Package processor is the shared per-record processing core that every Murmur runtime delegates to. It owns the at-least-once-with-dedup contract, the retry-with-backoff loop, and the metrics surface — three concerns that would otherwise be duplicated across pkg/exec/streaming, pkg/exec/lambda/*, and any future driver (SQS, SNS-fronted EventBridge, etc.).
The package is exported so out-of-tree drivers can sit on the same retry / dedup contract without forking the logic. The API surface is intentionally small: one Config struct, one MergeOne function. Drivers wire their record-decoding to MergeOne and decide what to do with non-nil returns (the Kinesis Lambda adds the record to BatchItemFailures; the streaming runtime Acks past the poison record after dead-lettering it).
Stability: the API tracks pkg/exec/streaming and pkg/exec/lambda/*; expect the same experimental-pre-1.0 churn as those packages.
Index ¶
- Constants
- func FlushBatch[V any](ctx context.Context, cfg *Config, pipelineName string, items []FlushItem[V], ...) error
- func MergeBatch[T any, V any](ctx context.Context, cfg *Config, coalesceCfg CoalesceConfig, ...) error
- func MergeMany[V any](ctx context.Context, cfg *Config, pipelineName string, eventID string, ...) error
- func MergeOne[T any, V any](ctx context.Context, cfg *Config, pipelineName string, eventID string, ...) error
- type CoalesceConfig
- type CoalesceOption
- type Coalescer
- type Config
- type FlushError
- type FlushItem
- type KeyFailure
Constants ¶
const ( // DefaultMaxKeys caps the in-memory map size to bound memory under a hot-key // batch. ~10k keys × small int64 deltas is single-digit-MB scratch space. DefaultMaxKeys = 10_000 // DefaultFlushTick triggers a periodic flush during long batches (e.g. a Mongo // snapshot scan that runs for minutes). Lambda batches typically complete well // inside this window so the tick rarely fires there. DefaultFlushTick = 1 * time.Second )
Default coalescing knobs. Exposed so callers can reason about budgets without importing internal constants.
Variables ¶
This section is empty.
Functions ¶
func FlushBatch ¶
func FlushBatch[V any]( ctx context.Context, cfg *Config, pipelineName string, items []FlushItem[V], store state.Store[V], cache state.Cache[V], window *windowed.Config, concurrency int, ) error
FlushBatch applies every FlushItem in `items` to (store, cache) using the processor's retry/backoff/metrics surface, spreading the work across at most `concurrency` goroutines.
Semantics:
concurrency <= 1: items are applied sequentially in the order given, same as a plain `for _, it := range items { mergeKeyWithRetry(...) }`. This is the default path and preserves the pre-concurrency behavior of every caller.
concurrency > 1: a worker pool of size `min(concurrency, len(items))` drains a channel of FlushItems and runs MergeUpdate in parallel. **Each FlushItem must address a distinct (entity, bucket) Key** — callers are expected to coalesce deltas per-key BEFORE calling FlushBatch (additive monoids have already-collapsed all contributions to one delta per key, so parallelizing the per-key flushes is safe regardless of arrival order). FlushBatch does not enforce distinctness — passing duplicate keys with concurrency>1 races the store's MergeUpdate semantics and is undefined.
At-least-once preservation: if any worker returns a non-nil error from mergeKeyWithRetry, FlushBatch records the first such error and returns it after all in-flight items finish. Subsequent items that haven't been dispatched yet are skipped (cancellation propagates through the worker context). The caller must treat a non-nil return as "this batch failed" — earlier-completed items have already written to the store, but the batch as a whole has not been Ack'd so the source will redeliver everything. Idempotent-merge + dedup keeps the redelivered batch correct.
Ordering: because the worker pool is unordered, callers must NOT rely on items being applied in slice order. This is the explicit tradeoff that unlocks parallelism for additive monoids; for non- additive monoids (First/Last/Min/Max), per-key ordering still matters at the routing layer (see streaming.WithConcurrency), but within a single coalesced flush the deltas are already collapsed so there is at most one delta per key in `items`.
FlushBatch always invokes mergeKeyWithRetry for each item — the same retry budget, backoff schedule, dedup-skip handling, and metrics that MergeMany uses. Pass the same *Config you'd pass to MergeMany.
func MergeBatch ¶
func MergeBatch[T any, V any]( ctx context.Context, cfg *Config, coalesceCfg CoalesceConfig, pipelineName string, mon monoid.Monoid[V], store state.Store[V], cache state.Cache[V], window *windowed.Config, events []T, eventIDFn func(T) string, eventTimeFn func(T) time.Time, keysFn func(T) []string, valueFn func(T) V, ) error
MergeBatch is a convenience wrapper around NewCoalescer + a loop over events + Flush. It is the single-line entrypoint drivers can call when they hold an entire source batch in memory (Kinesis Lambda Records, bootstrap snapshot chunk, etc).
The events slice is iterated in order; each event's keys are derived via keysFn, each delta via valueFn. After the loop, MergeBatch calls Flush. Returns nil on full success, or a wrapped *FlushError listing failed keys.
MergeBatch is a zero-allocation passthrough for non-additive monoids (it just loops over MergeMany), so drivers can call it unconditionally regardless of the pipeline's monoid choice.
func MergeMany ¶
func MergeMany[V any]( ctx context.Context, cfg *Config, pipelineName string, eventID string, eventTime time.Time, keys []string, delta V, store state.Store[V], cache state.Cache[V], window *windowed.Config, ) error
MergeMany is the multi-key entry point. It claims the EventID via Dedup (once, regardless of how many keys), then for each key runs the store and cache MergeUpdate with the supplied delta. Used by hierarchical- rollup pipelines where one event contributes to many aggregation keys (e.g. "likes for this post", "likes for this post per country", "global likes" — three keys, one delta).
Failure semantics:
- All-or-nothing per record (with respect to dead-lettering): if ANY key's merge fails after retries, MergeMany returns an error. Earlier keys that succeeded keep their writes — there is no rollback. The idempotent-merge contract (with Dedup) ensures a redelivery folds correctly: the dedup row prevents the keys-that-succeeded from double-counting on retry, while the keys-that-failed get another attempt.
Pass a one-element keys slice for the single-key case; that's exactly what MergeOne does internally. The retry/backoff loop is per-key — each key gets its own MaxAttempts budget.
func MergeOne ¶
func MergeOne[T any, V any]( ctx context.Context, cfg *Config, pipelineName string, eventID string, eventTime time.Time, value T, keyFn func(T) string, valueFn func(T) V, store state.Store[V], cache state.Cache[V], window *windowed.Config, ) error
MergeOne is the single-key per-record processing entry point. It applies the keyFn / valueFn extractors, claims the EventID via Dedup (if configured), runs the store and cache MergeUpdates, and retries on transient failure with exponential backoff.
Return semantics:
- nil: the record was processed successfully OR was a duplicate skip. The caller should Ack the record (for sources that have an Ack) or do nothing (for Lambda handlers that report only failures).
- non-nil: every retry was exhausted. The error wraps the last underlying failure. The caller decides what to do — typically: add to BatchItemFailures (Lambda) or record + Ack-and-skip (streaming).
MergeOne also short-circuits to a non-nil error on context cancellation during a retry backoff so the caller can return promptly rather than burning the remaining budget.
MergeOne does NOT call Ack. Sources with an Ack callback should invoke it themselves on a nil return.
Implementation note: MergeOne is now a thin wrapper around MergeMany — it extracts the single key + value and delegates. Use MergeMany directly for hierarchical-rollup pipelines where one record contributes to many keys.
Types ¶
type CoalesceConfig ¶
type CoalesceConfig struct {
// Enabled toggles coalescing. When false, runtimes route every event through
// processor.MergeMany directly. Default false (no behavior change).
Enabled bool
// MaxKeys caps the size of the in-memory coalescing map before an auto-flush.
// Defaults to DefaultMaxKeys when unset (zero or negative).
MaxKeys int
// FlushTick triggers a periodic flush during long batches. Zero or negative
// disables periodic flush; defaults to DefaultFlushTick when unset.
FlushTick time.Duration
}
CoalesceConfig bundles the user-tunable coalescing knobs.
func DefaultCoalesceConfig ¶
func DefaultCoalesceConfig() CoalesceConfig
DefaultCoalesceConfig returns an enabled CoalesceConfig with the canonical defaults. Used by Pipeline.WithCoalesce when called without options.
type CoalesceOption ¶
type CoalesceOption func(*CoalesceConfig)
CoalesceOption mutates a CoalesceConfig. Used by the pipeline DSL's WithCoalesce.
func WithCoalesceFlushTick ¶
func WithCoalesceFlushTick(d time.Duration) CoalesceOption
WithCoalesceFlushTick sets the time-based auto-flush trigger. Pass 0 to disable.
func WithCoalesceMaxKeys ¶
func WithCoalesceMaxKeys(n int) CoalesceOption
WithCoalesceMaxKeys sets the max-keys auto-flush trigger.
type Coalescer ¶
type Coalescer[V any] struct { // contains filtered or unexported fields }
Coalescer accumulates (entity, bucket) → combined-delta contributions for a single source batch and emits one MergeUpdate per unique key at flush time. Not safe for concurrent use — callers should hold one Coalescer per worker goroutine. See package doc for the failure-semantics contract.
func NewCoalescer ¶
func NewCoalescer[V any]( cfg *Config, coalesceCfg CoalesceConfig, pipelineName string, mon monoid.Monoid[V], store state.Store[V], cache state.Cache[V], window *windowed.Config, ) *Coalescer[V]
NewCoalescer constructs a Coalescer bound to a Pipeline's storage + monoid. The cfg argument is the shared processor.Config (recorder, retry budget, dedup); coalesceCfg is the per-batch coalescing tuning.
If the monoid is not additive (per monoid.IsAdditiveMonoid), the returned Coalescer short-circuits AddMany straight to MergeMany — no buffering, no flush savings. Callers can still use it as a uniform interface.
func (*Coalescer[V]) AddMany ¶
func (c *Coalescer[V]) AddMany( ctx context.Context, eventID string, eventTime time.Time, keys []string, delta V, ) error
AddMany folds an event's (keys, delta) into the pending map. Dedup is applied once up-front. If the monoid is not additive, AddMany delegates straight to MergeMany so every event hits the store exactly as before — no behavior change. Otherwise the delta is Combined into the per-key accumulator and a synchronous auto-flush may fire if MaxKeys or FlushTick triggers.
Returns nil on success. A non-nil error is returned only when an auto-flush ran and some flushed key exhausted its retry budget — the error wraps the underlying FlushError.
func (*Coalescer[V]) Flush ¶
Flush commits all pending (key, delta) pairs by issuing one MergeUpdate per unique key (plus the matching cache MergeUpdate). Each per-key write runs through the processor's retry/backoff loop. Successful keys remain successful even if a sibling key fails — there is no rollback, and the idempotent-merge contract handles a re-delivery.
Returns nil when every key succeeded. Returns a *FlushError when one or more keys exhausted their retry budget; cancellation errors from the context propagate directly. Flush is safe to call when the pending map is empty (returns nil with no work).
func (*Coalescer[V]) IsAdditive ¶
IsAdditive reports whether the bound monoid was eligible for coalescing. Drivers can inspect this to surface a metric or log line on startup.
func (*Coalescer[V]) PendingKeys ¶
PendingKeys returns the number of unique (entity, bucket) pairs currently buffered. Exposed for tests and operational metrics.
type Config ¶
type Config struct {
// Recorder is the metrics.Recorder; defaults to metrics.Noop{}.
// Events fire under PipelineName, retries under "<name>:retry", dedup
// skips under "<name>:dedup_skip", dead letters under "<name>:dead_letter".
Recorder metrics.Recorder
// MaxAttempts is the per-record retry budget. MergeOne returns the last
// non-cancellation error after this many failed tries.
MaxAttempts int
// BackoffBase is the first sleep between retries. Doubles per attempt
// up to BackoffMax with full jitter.
BackoffBase time.Duration
// BackoffMax caps the per-retry sleep.
BackoffMax time.Duration
// Dedup, if non-nil, is consulted before MergeOne does any work. A
// duplicate is a no-op (no merge, no retry, no error) — MergeOne
// returns nil and records a "<name>:dedup_skip" event.
Dedup state.Deduper
}
Config bundles the shared retry / dedup / observability knobs every runtime uses. Construct with sensible defaults via Defaults() and override fields as needed.
type FlushError ¶
type FlushError struct {
FailedKeys []KeyFailure
}
FlushError is returned (wrapped) when one or more keys fail to merge after retries. FailedKeys preserves per-key failure info; drivers that need per-event failure reporting (e.g. Kinesis BatchItemFailures) can fan out KeyFailure.ContributingIDs to the original record IDs they kept alongside the coalescer.
func (*FlushError) Unwrap ¶
func (e *FlushError) Unwrap() error
Unwrap exposes the first per-key error for errors.Is/As walks.
type FlushItem ¶
FlushItem is one pending (key, delta) merge ready to apply to the store. Aggregators / batch flushers build a slice of these — typically AFTER per-key delta coalescing has already collapsed many input events down to one Combine'd delta per Key — and then hand the slice to FlushBatch for parallel execution.
EventTime is the timestamp used to bucket the merge under a windowed configuration; for non-windowed pipelines it is ignored but the field is still required so callers don't accidentally drop windowing context.