mongo

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package mongo provides a Mongo-backed implementation of snapshot.Source for Murmur's Bootstrap execution mode.

Pattern (Debezium-style):

  1. CaptureHandoff opens a Change Stream against the database (no iteration), reads the start-of-stream resume token, and closes. The token is the "where to pick up live streaming from after bootstrap" marker — handed to the live source on transition.
  2. Scan iterates the entire collection via Find with default batch size. Each document is decoded into T and emitted as a source.Record with EventID = the document's _id (so re-runs are idempotent under at-least-once dedup).
  3. Resume re-runs Scan; at-least-once dedup at the state store handles duplicates.

The Mongo replica set must be initiated (rs.initiate) before Change Streams work. Standalone Mongo is fine for plain Find but does not support resume tokens.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	URI        string
	Database   string
	Collection string
	// Filter, if non-nil, is passed as the Find filter and the Watch matchStage. Use
	// for partial snapshots (e.g., {"tenant": "x"}). nil means scan everything.
	Filter bson.M
	// Decode converts a raw BSON document to T. Use BSONDecoder[T]() for the default
	// driver-based decoding (struct tags); supply your own for custom shapes.
	Decode Decoder[T]
	// OnDecodeError, if non-nil, is called for every document whose Decode returned
	// an error. Default behavior is to drop silently.
	OnDecodeError func(raw bson.Raw, err error)
}

Config configures a Mongo snapshot source.

type Decoder

type Decoder[T any] func(bson.Raw) (T, error)

Decoder converts a raw BSON document to a typed Record value.

func BSONDecoder

func BSONDecoder[T any]() Decoder[T]

BSONDecoder returns a Decoder that uses the Mongo driver's reflection-based unmarshaling. T should carry `bson:"..."` tags as needed.

type Source

type Source[T any] struct {
	// contains filtered or unexported fields
}

Source implements snapshot.Source for a single Mongo collection.

func NewSource

func NewSource[T any](ctx context.Context, cfg Config[T]) (*Source[T], error)

NewSource connects to the Mongo cluster and returns a SnapshotSource for the given collection. The returned Source owns the underlying mongo.Client; call Close to disconnect.

func (*Source[T]) CaptureHandoff

func (s *Source[T]) CaptureHandoff(ctx context.Context) (snapshot.HandoffToken, error)

CaptureHandoff opens a Change Stream against the collection (no iteration) just to read the start-of-stream resume token. Returns the token as a HandoffToken (BSON bytes). Requires the Mongo deployment to be a replica set.

func (*Source[T]) Close

func (s *Source[T]) Close() error

Close disconnects the underlying client.

func (*Source[T]) Name

func (s *Source[T]) Name() string

Name returns "mongo:<db>.<collection>".

func (*Source[T]) Resume

func (s *Source[T]) Resume(ctx context.Context, marker []byte, out chan<- source.Record[T]) error

Resume re-runs Scan from the beginning. At-least-once dedup at the state-store level (per-event-ID) handles re-emitted documents safely. A future enhancement could thread a primary-key watermark through marker for true incremental snapshots.

func (*Source[T]) Scan

func (s *Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error

Scan reads every document from the collection (filtered by cfg.Filter if set), decodes to T, and emits source.Records into out. The cursor's batch size is left at the driver default; callers concerned about memory or backpressure can wrap out in a buffered channel sized to taste. Each Record's Ack is a no-op — bootstrap state is committed by the Bootstrap runtime, not per-record at the source.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL