processor

package
v0.0.0-...-5247288 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package processor is the shared per-record processing core that every Murmur runtime delegates to. It owns the at-least-once-with-dedup contract, the retry-with-backoff loop, and the metrics surface — three concerns that would otherwise be duplicated across pkg/exec/streaming, pkg/exec/lambda/*, and any future driver (SQS, SNS-fronted EventBridge, etc.).

The package is exported so out-of-tree drivers can sit on the same retry / dedup contract without forking the logic. The API surface is intentionally small: one Config struct, one MergeOne function. Drivers wire their record-decoding to MergeOne and decide what to do with non-nil returns (the Kinesis Lambda adds the record to BatchItemFailures; the streaming runtime Acks past the poison record after dead-lettering it).

Stability: the API tracks pkg/exec/streaming and pkg/exec/lambda/*; expect the same experimental-pre-1.0 churn as those packages.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MergeMany

func MergeMany[V any](
	ctx context.Context,
	cfg *Config,
	pipelineName string,
	eventID string,
	eventTime time.Time,
	keys []string,
	delta V,
	store state.Store[V],
	cache state.Cache[V],
	window *windowed.Config,
) error

MergeMany is the multi-key entry point. It claims the EventID via Dedup (once, regardless of how many keys), then for each key runs the store and cache MergeUpdate with the supplied delta. Used by hierarchical- rollup pipelines where one event contributes to many aggregation keys (e.g. "likes for this post", "likes for this post per country", "global likes" — three keys, one delta).

Failure semantics:

  • All-or-nothing per record (with respect to dead-lettering): if ANY key's merge fails after retries, MergeMany returns an error. Earlier keys that succeeded keep their writes — there is no rollback. The idempotent-merge contract (with Dedup) ensures a redelivery folds correctly: the dedup row prevents the keys-that-succeeded from double-counting on retry, while the keys-that-failed get another attempt.

Pass a one-element keys slice for the single-key case; that's exactly what MergeOne does internally. The retry/backoff loop is per-key — each key gets its own MaxAttempts budget.

func MergeOne

func MergeOne[T any, V any](
	ctx context.Context,
	cfg *Config,
	pipelineName string,
	eventID string,
	eventTime time.Time,
	value T,
	keyFn func(T) string,
	valueFn func(T) V,
	store state.Store[V],
	cache state.Cache[V],
	window *windowed.Config,
) error

MergeOne is the single-key per-record processing entry point. It applies the keyFn / valueFn extractors, claims the EventID via Dedup (if configured), runs the store and cache MergeUpdates, and retries on transient failure with exponential backoff.

Return semantics:

  • nil: the record was processed successfully OR was a duplicate skip. The caller should Ack the record (for sources that have an Ack) or do nothing (for Lambda handlers that report only failures).
  • non-nil: every retry was exhausted. The error wraps the last underlying failure. The caller decides what to do — typically: add to BatchItemFailures (Lambda) or record + Ack-and-skip (streaming).

MergeOne also short-circuits to a non-nil error on context cancellation during a retry backoff so the caller can return promptly rather than burning the remaining budget.

MergeOne does NOT call Ack. Sources with an Ack callback should invoke it themselves on a nil return.

Implementation note: MergeOne is now a thin wrapper around MergeMany — it extracts the single key + value and delegates. Use MergeMany directly for hierarchical-rollup pipelines where one record contributes to many keys.

Types

type Config

type Config struct {
	// Recorder is the metrics.Recorder; defaults to metrics.Noop{}.
	// Events fire under PipelineName, retries under "<name>:retry", dedup
	// skips under "<name>:dedup_skip", dead letters under "<name>:dead_letter".
	Recorder metrics.Recorder

	// MaxAttempts is the per-record retry budget. MergeOne returns the last
	// non-cancellation error after this many failed tries.
	MaxAttempts int

	// BackoffBase is the first sleep between retries. Doubles per attempt
	// up to BackoffMax with full jitter.
	BackoffBase time.Duration

	// BackoffMax caps the per-retry sleep.
	BackoffMax time.Duration

	// Dedup, if non-nil, is consulted before MergeOne does any work. A
	// duplicate is a no-op (no merge, no retry, no error) — MergeOne
	// returns nil and records a "<name>:dedup_skip" event.
	Dedup state.Deduper
}

Config bundles the shared retry / dedup / observability knobs every runtime uses. Construct with sensible defaults via Defaults() and override fields as needed.

func Defaults

func Defaults() Config

Defaults returns a Config with the conventional defaults: Noop recorder, MaxAttempts=3, 50 ms / 5 s backoff, no Dedup. Override fields after construction; later WithX functions also exist for fluent builders.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL