Documentation
¶
Overview ¶
Package orderstats defines a Murmur pipeline that aggregates per-customer order totals from a Mongo collection (Bootstrap mode) and a Kafka CDC stream (Live mode). The same pipeline definition runs in both binaries — one imports it and runs bootstrap.Run, the other runs streaming.Run.
In production the Kafka source carries Mongo Change Stream output piped through Debezium (or any equivalent CDC tool). The bootstrap captures a Change Stream resume token; the deployment system hands that token to the live consumer's StartAfter so the first record after handoff is the first CDC change since the bootstrap began. No gaps, no duplicates beyond the at-least-once dedup window.
The pipeline DSL is execution-mode-agnostic; the runtimes (bootstrap.Run, streaming.Run) are what differ.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildBootstrap ¶
func BuildBootstrap(ctx context.Context, cfg Config) (*pipeline.Pipeline[Order, int64], *mongo.Source[Order], state.Store[int64], error)
BuildBootstrap constructs the pipeline for the Mongo-collection-scan bootstrap binary. Returns the pipeline plus the SnapshotSource the bootstrap.Run driver consumes.
func BuildLive ¶
func BuildLive(ctx context.Context, cfg Config) (*pipeline.Pipeline[Order, int64], state.Store[int64], state.Deduper, error)
BuildLive constructs the pipeline for the Kafka-driven streaming worker.
Note: the murmur.Counter preset hard-codes Value(func(T) int64 { return 1 }), which is correct for "count events per key." For monetary totals we want Value(func(o Order) int64 { return o.Amount }) instead, so we drop down to the lower-level pipeline.NewPipeline builder. Same Sum monoid; different value extractor.
Types ¶
type Config ¶
type Config struct {
// Mongo
MongoURI string
MongoDB string
MongoCollection string
// Kafka (live CDC source)
KafkaBrokers string // comma-separated
KafkaTopic string
ConsumerGroup string
// DynamoDB (state)
DDBEndpoint string
DDBTable string
DDBRegion string
// Optional dedup table for at-least-once idempotency
DDBDedupTable string
}
Config bundles deployment-time settings shared across both binaries.
type Order ¶
type Order struct {
ID string `bson:"_id" json:"_id"`
CustomerID string `bson:"customer_id" json:"customer_id"`
Amount int64 `bson:"amount" json:"amount"`
}
Order is the shape stored in Mongo and emitted on the Kafka CDC stream. The example uses identical Go and BSON tags for both producers; in real CDC the Kafka payload is whatever Debezium serializes (typically JSON envelopes with `before` / `after` keys — your Decode would unwrap that).