Documentation
¶
Overview ¶
Package replay drives a Murmur pipeline in Replay mode: it consumes historical events from a replay.Driver (typically S3-archived Firehose/Kafka-Connect output, or a Kafka offset range under tiered storage) and applies the same monoid Combine the live runtime would.
The standard backfill pattern: configure the pipeline's StoreIn to point at a SHADOW state table (separate from the live one), run replay to completion, then atomically swap the live query layer's pointer to the shadow. The framework treats the shadow- table swap as a deployment concern; the runtime here just drives records through the pipeline.
At-least-once dedup at the state-store level handles re-runs safely.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Run ¶
func Run[T any, V any]( ctx context.Context, p *pipeline.Pipeline[T, V], drv replay.Driver[T], opts ...RunOption, ) error
Run drives the replay to completion. Returns nil when the driver exhausts its source and all records have been processed; non-nil on a fatal error (or any dead-lettered record when WithFailOnError(true)).
Types ¶
type RunOption ¶
type RunOption func(*runConfig)
RunOption configures Run.
func WithBatchTick ¶
WithBatchTick overrides the RecordBatch flush interval. The default 1 s fits a multi-day replay where dashboard granularity matters more than emission cost. Pass d <= 0 to disable periodic batch flushes (a single batch is still emitted at completion).
func WithDedup ¶
WithDedup installs a state.Deduper. The replay driver typically emits stable per-event IDs (S3 archive line position, Kafka offset), so a re-run of the same archive folds idempotently when a Deduper is wired.
func WithFailOnError ¶
WithFailOnError configures the replay to surface the first dead-lettered record's error to the caller and abort. Default false: dead-lettered records are recorded via the metrics.Recorder and the replay continues.
For shadow-table backfills with atomic swap, an aborted replay leaves the shadow incomplete and the swap blocked — usually the right behavior, but expensive when the failure is one bad row in a 30-day archive. Pick per workload.
func WithMaxAttempts ¶
WithMaxAttempts sets the per-record retry budget for transient store failures. Defaults to 3. Replay over a multi-day archive needs retries to absorb intermittent backpressure — without them, a single throttled DDB write fails the whole replay.
func WithMetrics ¶
WithMetrics installs a metrics.Recorder. Defaults to metrics.Noop{}.
func WithRetryBackoff ¶
WithRetryBackoff configures the per-attempt sleep schedule. Doubles after each failure starting from base, capped at max, with full jitter. Defaults to 50 ms / 5 s.