replay

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package replay drives a Murmur pipeline in Replay mode: it consumes historical events from a replay.Driver (typically S3-archived Firehose/Kafka-Connect output, or a Kafka offset range under tiered storage) and applies the same monoid Combine the live runtime would.

The standard backfill pattern: configure the pipeline's StoreIn to point at a SHADOW state table (separate from the live one), run replay to completion, then atomically swap the live query layer's pointer to the shadow. The framework treats the shadow- table swap as a deployment concern; the runtime here just drives records through the pipeline.

At-least-once dedup at the state-store level handles re-runs safely.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Run

func Run[T any, V any](
	ctx context.Context,
	p *pipeline.Pipeline[T, V],
	drv replay.Driver[T],
	opts ...RunOption,
) error

Run drives the replay to completion. Returns nil when the driver exhausts its source and all records have been processed; non-nil on a fatal error (or any dead-lettered record when WithFailOnError(true)).

Types

type RunOption

type RunOption func(*runConfig)

RunOption configures Run.

func WithBatchTick

func WithBatchTick(d time.Duration) RunOption

WithBatchTick overrides the RecordBatch flush interval. The default 1 s fits a multi-day replay where dashboard granularity matters more than emission cost. Pass d <= 0 to disable periodic batch flushes (a single batch is still emitted at completion).

func WithDedup

func WithDedup(d state.Deduper) RunOption

WithDedup installs a state.Deduper. The replay driver typically emits stable per-event IDs (S3 archive line position, Kafka offset), so a re-run of the same archive folds idempotently when a Deduper is wired.

func WithFailOnError

func WithFailOnError(v bool) RunOption

WithFailOnError configures the replay to surface the first dead-lettered record's error to the caller and abort. Default false: dead-lettered records are recorded via the metrics.Recorder and the replay continues.

For shadow-table backfills with atomic swap, an aborted replay leaves the shadow incomplete and the swap blocked — usually the right behavior, but expensive when the failure is one bad row in a 30-day archive. Pick per workload.

func WithMaxAttempts

func WithMaxAttempts(n int) RunOption

WithMaxAttempts sets the per-record retry budget for transient store failures. Defaults to 3. Replay over a multi-day archive needs retries to absorb intermittent backpressure — without them, a single throttled DDB write fails the whole replay.

func WithMetrics

func WithMetrics(r metrics.Recorder) RunOption

WithMetrics installs a metrics.Recorder. Defaults to metrics.Noop{}.

func WithRetryBackoff

func WithRetryBackoff(base, max time.Duration) RunOption

WithRetryBackoff configures the per-attempt sleep schedule. Doubles after each failure starting from base, capped at max, with full jitter. Defaults to 50 ms / 5 s.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL