orderstats

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

README

Example: Mongo CDC order-stats (Bootstrap → Live)

A full lambda-architecture-flavored pipeline that demonstrates the Bootstrap → Live transition Kyle's adversarial review specifically called out: a Mongo collection holds the source-of-truth, a CDC stream (Mongo Change Streams piped through Kafka via Debezium in production) carries every change that happens after, and Murmur folds them into per-customer order totals.

┌──────────────┐
│  Mongo       │ orders collection (source of truth)
│  ─────────── │
│  rs0         │
└──────┬───────┘
       │ initial snapshot (bootstrap binary)
       ▼
┌──────────────┐    ┌─────────────────┐   ┌─────────────────┐
│ pkg/exec/    │───▶│ Sum monoid per  │──▶│ DDB             │
│ bootstrap    │    │ customer_id     │   │ order_totals    │
└──────────────┘    └─────────────────┘   └─────────────────┘
       │
       │ captures Change Stream resume token
       │ (handed to live consumer's StartAfter)
       ▼
┌──────────────┐
│ Mongo Change │ (in production: Debezium → Kafka)
│ Streams      │
└──────┬───────┘
       │ live CDC events
       ▼
┌──────────────┐    ┌─────────────────┐   ┌─────────────────┐
│ pkg/exec/    │───▶│ Same Sum monoid │──▶│ Same DDB        │
│ streaming    │    │ same key fn     │   │ order_totals    │
│ + WithDedup  │    │ same value fn   │   │ table           │
└──────────────┘    └─────────────────┘   └─────────────────┘

The two binaries share a single pipeline.go that defines the order schema, key extractor, value extractor, monoid, and store. BuildBootstrap adds a Mongo SnapshotSource; BuildLive adds a Kafka source plus an optional Deduper.

Why both modes are needed

  • Bootstrap. Mongo Change Streams only emit changes; they don't surface the documents that already existed when streaming started. Without the bootstrap pass the per-customer totals would only reflect orders placed after the consumer attached — anything older would be invisible. The bootstrap binary scans the collection once and folds every existing document through the same monoid Combine.
  • Live. After bootstrap finishes, the CDC stream takes over. The bootstrap binary captures a Change Stream resume token at the start of the scan (Debezium's standard "snapshot then stream" pattern); the deployment hands that token to the live consumer so the first change observed there is the first change since the bootstrap began. No gaps, no duplicates beyond what WithDedup already absorbs.

Production wiring

In production the live source is Mongo Change Streams piped through Debezium into Kafka. Debezium's MongoDB connector emits per-document JSON envelopes on a topic like dbserver.shop.orders; the Kafka source's Decode function unwraps the envelope's after key into an Order. The example's pipeline.go decoder assumes a flatter shape (raw Order JSON) so the docker-compose stack can drive it directly without standing up Debezium — swap in your envelope-unwrapping decoder when wiring against real Debezium.

Run locally

make compose-up        # mongo (replset), dynamodb-local, kafka, valkey, minio
make seed-ddb DDB_TABLE=order_totals

# Seed Mongo with some historical orders:
docker exec -i murmur-mongo mongosh --quiet shop <<'EOF'
db.orders.insertMany([
  {_id: "o1", customer_id: "cust-a", amount: 100},
  {_id: "o2", customer_id: "cust-a", amount: 50},
  {_id: "o3", customer_id: "cust-b", amount: 25},
])
EOF

# Bootstrap. Prints the resume token (hex) on stdout.
DDB_LOCAL_ENDPOINT=http://localhost:8000 \
  go run ./examples/mongo-cdc-orderstats/cmd/bootstrap

# Now run the live worker against a Kafka topic carrying additional CDC events.
DDB_LOCAL_ENDPOINT=http://localhost:8000 \
KAFKA_BROKERS=localhost:9092 \
DDB_DEDUP_TABLE=order_totals_dedup \
  go run ./examples/mongo-cdc-orderstats/cmd/worker

Test

The end-to-end test in test/e2e/mongo_cdc_orderstats_test.go exercises the full flow: seeds Mongo, runs Bootstrap, produces additional events to Kafka, runs the live worker briefly, and verifies the merged per-customer totals equal bootstrapTotals + liveDeltas to the cent.

Documentation

Overview

Package orderstats defines a Murmur pipeline that aggregates per-customer order totals from a Mongo collection (Bootstrap mode) and a Kafka CDC stream (Live mode). The same pipeline definition runs in both binaries — one imports it and runs bootstrap.Run, the other runs streaming.Run.

In production the Kafka source carries Mongo Change Stream output piped through Debezium (or any equivalent CDC tool). The bootstrap captures a Change Stream resume token; the deployment system hands that token to the live consumer's StartAfter so the first record after handoff is the first CDC change since the bootstrap began. No gaps, no duplicates beyond the at-least-once dedup window.

The pipeline DSL is execution-mode-agnostic; the runtimes (bootstrap.Run, streaming.Run) are what differ.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildBootstrap

func BuildBootstrap(ctx context.Context, cfg Config) (*pipeline.Pipeline[Order, int64], *mongo.Source[Order], state.Store[int64], error)

BuildBootstrap constructs the pipeline for the Mongo-collection-scan bootstrap binary. Returns the pipeline plus the SnapshotSource the bootstrap.Run driver consumes.

func BuildLive

BuildLive constructs the pipeline for the Kafka-driven streaming worker.

Note: the murmur.Counter preset hard-codes Value(func(T) int64 { return 1 }), which is correct for "count events per key." For monetary totals we want Value(func(o Order) int64 { return o.Amount }) instead, so we drop down to the lower-level pipeline.NewPipeline builder. Same Sum monoid; different value extractor.

Types

type Config

type Config struct {
	// Mongo
	MongoURI        string
	MongoDB         string
	MongoCollection string

	// Kafka (live CDC source)
	KafkaBrokers  string // comma-separated
	KafkaTopic    string
	ConsumerGroup string

	// DynamoDB (state)
	DDBEndpoint string
	DDBTable    string
	DDBRegion   string

	// Optional dedup table for at-least-once idempotency
	DDBDedupTable string
}

Config bundles deployment-time settings shared across both binaries.

type Order

type Order struct {
	ID         string `bson:"_id" json:"_id"`
	CustomerID string `bson:"customer_id" json:"customer_id"`
	Amount     int64  `bson:"amount" json:"amount"`
}

Order is the shape stored in Mongo and emitted on the Kafka CDC stream. The example uses identical Go and BSON tags for both producers; in real CDC the Kafka payload is whatever Debezium serializes (typically JSON envelopes with `before` / `after` keys — your Decode would unwrap that).

Directories

Path Synopsis
cmd
bootstrap command
Bootstrap binary for the mongo-cdc-orderstats example.
Bootstrap binary for the mongo-cdc-orderstats example.
worker command
Live streaming worker for the mongo-cdc-orderstats example.
Live streaming worker for the mongo-cdc-orderstats example.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL