Documentation
¶
Overview ¶
Package bootstrap drives a Murmur pipeline in Bootstrap mode: it reads from a SnapshotSource (Mongo collection scan, DynamoDB ParallelScan, etc.), applies the pipeline's key/value extractors and monoid Combine, and writes to the primary state store — populating initial state when the live event stream lacks full history.
The pattern matches Debezium's "snapshot then stream":
- Capture a HandoffToken from the snapshot source. This is the resume position the live source should pick up from once bootstrap completes.
- Scan the snapshot to completion, applying each record through the pipeline.
- Return the captured token. The deployment system stores it and starts the live runtime configured to begin from that point.
At-least-once dedup at the state store handles re-emissions if bootstrap is retried. HandoffToken is opaque to the bootstrap runtime — the live source's NewSourceFromToken (or equivalent) interprets it.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Run ¶
func Run[T any, V any]( ctx context.Context, p *pipeline.Pipeline[T, V], src snapshot.Source[T], opts ...RunOption, ) (snapshot.HandoffToken, error)
Run drives the bootstrap. Returns the captured HandoffToken on success; the deployment system persists it and hands it to the live runtime on transition.
Types ¶
type RunOption ¶
type RunOption func(*runConfig)
RunOption configures Run.
func WithBatchTick ¶
WithBatchTick overrides the RecordBatch flush interval. The default 1 s fits a multi-hour bootstrap where dashboard granularity matters more than emission cost. Pass d <= 0 to disable periodic batch flushes (a single batch is still emitted at completion).
func WithDedup ¶
WithDedup installs a state.Deduper for snapshot-side dedup. Useful when a bootstrap is re-run (operator retry, partial-failure recovery): without it every document is folded again on re-run; with it only documents not yet claimed are processed. The Deduper sees the same EventID derivation the SnapshotSource emits — for Mongo that's the document _id, so re-running against the same collection is idempotent.
func WithFailOnError ¶
WithFailOnError configures whether the bootstrap aborts on the first dead-lettered record (after retries exhausted). Default false — the runtime records the error via metrics.Recorder and continues. Set true when the snapshot must complete fully or not at all (e.g., a one-shot migration where a missing entity is a correctness bug, not a tolerable blip). The streaming runtime never aborts on poison records; bootstrap is permissive by default for the same reason.
func WithMaxAttempts ¶
WithMaxAttempts sets the per-record retry budget for transient store failures. Defaults to 3. A bootstrap that fails-fast on the first throttled DDB write is unfit for snapshotting a large collection — a 30-minute scan needs retries to absorb intermittent backpressure.
func WithMetrics ¶
WithMetrics installs a metrics.Recorder. Defaults to metrics.Noop{}.
func WithRetryBackoff ¶
WithRetryBackoff configures the per-attempt sleep schedule. Doubles after each failure starting from base, capped at max, with full jitter. Defaults to 50 ms / 5 s — same as the streaming runtime.