bootstrap

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package bootstrap drives a Murmur pipeline in Bootstrap mode: it reads from a SnapshotSource (Mongo collection scan, DynamoDB ParallelScan, etc.), applies the pipeline's key/value extractors and monoid Combine, and writes to the primary state store — populating initial state when the live event stream lacks full history.

The pattern matches Debezium's "snapshot then stream":

  1. Capture a HandoffToken from the snapshot source. This is the resume position the live source should pick up from once bootstrap completes.
  2. Scan the snapshot to completion, applying each record through the pipeline.
  3. Return the captured token. The deployment system stores it and starts the live runtime configured to begin from that point.

At-least-once dedup at the state store handles re-emissions if bootstrap is retried. HandoffToken is opaque to the bootstrap runtime — the live source's NewSourceFromToken (or equivalent) interprets it.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Run

func Run[T any, V any](
	ctx context.Context,
	p *pipeline.Pipeline[T, V],
	src snapshot.Source[T],
	opts ...RunOption,
) (snapshot.HandoffToken, error)

Run drives the bootstrap. Returns the captured HandoffToken on success; the deployment system persists it and hands it to the live runtime on transition.

Types

type RunOption

type RunOption func(*runConfig)

RunOption configures Run.

func WithBatchTick

func WithBatchTick(d time.Duration) RunOption

WithBatchTick overrides the RecordBatch flush interval. The default 1 s fits a multi-hour bootstrap where dashboard granularity matters more than emission cost. Pass d <= 0 to disable periodic batch flushes (a single batch is still emitted at completion).

func WithDedup

func WithDedup(d state.Deduper) RunOption

WithDedup installs a state.Deduper for snapshot-side dedup. Useful when a bootstrap is re-run (operator retry, partial-failure recovery): without it every document is folded again on re-run; with it only documents not yet claimed are processed. The Deduper sees the same EventID derivation the SnapshotSource emits — for Mongo that's the document _id, so re-running against the same collection is idempotent.

func WithFailOnError

func WithFailOnError(v bool) RunOption

WithFailOnError configures whether the bootstrap aborts on the first dead-lettered record (after retries exhausted). Default false — the runtime records the error via metrics.Recorder and continues. Set true when the snapshot must complete fully or not at all (e.g., a one-shot migration where a missing entity is a correctness bug, not a tolerable blip). The streaming runtime never aborts on poison records; bootstrap is permissive by default for the same reason.

func WithMaxAttempts

func WithMaxAttempts(n int) RunOption

WithMaxAttempts sets the per-record retry budget for transient store failures. Defaults to 3. A bootstrap that fails-fast on the first throttled DDB write is unfit for snapshotting a large collection — a 30-minute scan needs retries to absorb intermittent backpressure.

func WithMetrics

func WithMetrics(r metrics.Recorder) RunOption

WithMetrics installs a metrics.Recorder. Defaults to metrics.Noop{}.

func WithRetryBackoff

func WithRetryBackoff(base, max time.Duration) RunOption

WithRetryBackoff configures the per-attempt sleep schedule. Doubles after each failure starting from base, capped at max, with full jitter. Defaults to 50 ms / 5 s — same as the streaming runtime.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL