Documentation
¶
Overview ¶
Package dynamodbstreams hosts a Murmur pipeline behind an AWS Lambda DynamoDB Streams trigger. This is the natural CDC pattern for shops whose source of truth is a DynamoDB table — a common Murmur use case is aggregating activity over a DDB-table-backed application's change history.
The package is the symmetric peer of pkg/exec/lambda/kinesis: same retry / dedup / metrics / partial-batch-failure semantics; the only difference is the input event shape.
Wire it up:
func main() {
pipe := buildPipeline()
handler, err := dynamodbstreams.NewHandler(pipe,
// The Decoder reads the change record and projects to the
// pipeline's T. Inspect rec.EventName ("INSERT" / "MODIFY" /
// "REMOVE") to decide how to handle each operation.
func(rec *events.DynamoDBEventRecord) (Order, error) {
if rec.EventName == "REMOVE" {
return Order{}, dynamodbstreams.ErrSkipRecord // Don't aggregate deletes.
}
return decodeOrder(rec.Change.NewImage)
},
dynamodbstreams.WithDedup(deduper),
)
if err != nil { log.Fatal(err) }
lambda.Start(handler)
}
Decoder pattern ¶
DDB Streams records do NOT carry a raw byte payload — they carry a `Change` with `NewImage` / `OldImage` / `Keys` as `map[string]events.DynamoDBAttributeValue`. The Decoder receives the whole record (not just bytes) so callers can:
- branch on EventName to ignore deletes, or treat MODIFY as a delta
- read OldImage to detect which fields changed
- dig into Keys when the partition key alone is enough
Return ErrSkipRecord from the decoder to skip a record cleanly (counts as processed, no BatchItemFailure entry). Any other error is treated as a poison pill: counted via metrics.RecordError, surfaced via WithDecodeErrorCallback, and skipped — same poison-pill semantics as the Kinesis handler.
Partial-batch failure handling ¶
Records that exhaust their retry budget are reported via BatchItemFailures with the DDB Streams `eventID` as ItemIdentifier. Configure your event-source mapping with `FunctionResponseTypes=["ReportBatchItemFailures"]` so Lambda only redelivers the failures (or, in shard-order replay mode, all records from the earliest failure forward).
Index ¶
- Variables
- type Decoder
- type Handler
- type HandlerOption
- func WithClock(now func() time.Time) HandlerOption
- func WithDecodeErrorCallback(fn func(rec *events.DynamoDBEventRecord, err error)) HandlerOption
- func WithDedup(d state.Deduper) HandlerOption
- func WithMaxAttempts(n int) HandlerOption
- func WithMetrics(r metrics.Recorder) HandlerOption
- func WithRetryBackoff(base, max time.Duration) HandlerOption
Constants ¶
This section is empty.
Variables ¶
var ErrSkipRecord = errors.New("dynamodbstreams: skip record")
ErrSkipRecord is the sentinel a Decoder returns to indicate the record should be skipped without being treated as a poison pill. The handler counts it as processed and does not add it to BatchItemFailures.
Use this when a DDB change shouldn't drive an aggregation — typically a REMOVE event, or a MODIFY whose changed fields don't matter for this pipeline.
Functions ¶
This section is empty.
Types ¶
type Decoder ¶
type Decoder[T any] func(*events.DynamoDBEventRecord) (T, error)
Decoder converts a DynamoDB Streams change record to the pipeline's input type T. Return ErrSkipRecord to skip cleanly; any other error is recorded as a decode failure and the record is dropped.
type Handler ¶
type Handler = func(context.Context, events.DynamoDBEvent) (events.DynamoDBEventResponse, error)
Handler is the Lambda handler signature for DynamoDB Streams triggers — pass the returned value directly to lambda.Start.
func NewHandler ¶
func NewHandler[T any, V any]( p *pipeline.Pipeline[T, V], decode Decoder[T], opts ...HandlerOption, ) (Handler, error)
NewHandler builds a Lambda handler that drives a Murmur pipeline from a DynamoDB Streams trigger. The pipeline must be Build-validated; the supplied Decoder converts each DDB Streams change record to the pipeline's input type T. The pipeline's Source field is unused — Lambda owns polling.
Returns an error if the pipeline's required fields (Key, Value, Aggregate, StoreIn) are not set, or if Decode is nil.
type HandlerOption ¶
type HandlerOption func(*handlerConfig)
HandlerOption configures NewHandler.
func WithClock ¶
func WithClock(now func() time.Time) HandlerOption
WithClock overrides time.Now for windowed-bucket assignment. Useful for tests with deterministic clocks; production code should leave this unset.
func WithDecodeErrorCallback ¶
func WithDecodeErrorCallback(fn func(rec *events.DynamoDBEventRecord, err error)) HandlerOption
WithDecodeErrorCallback installs a callback for records whose Decoder returned a non-ErrSkipRecord error. Default behavior is to drop silently and continue with the next record.
Decode failures are NOT redelivered — the same record will fail to decode on the next pass. Wire this to a DLQ producer to move poison pills off the hot path.
func WithDedup ¶
func WithDedup(d state.Deduper) HandlerOption
WithDedup installs a state.Deduper. Each DynamoDB Streams record's `eventID` (already globally unique within the stream's history) is claimed via MarkSeen before the merge runs; on a duplicate, the merge is skipped and the record is counted as processed.
Strongly recommended in production: Lambda's BatchItemFailures pattern can redeliver records adjacent to a failure even if those records had already been merged successfully on the prior invocation.
func WithMaxAttempts ¶
func WithMaxAttempts(n int) HandlerOption
WithMaxAttempts sets the per-record retry budget. Defaults to 3.
func WithMetrics ¶
func WithMetrics(r metrics.Recorder) HandlerOption
WithMetrics installs a metrics.Recorder. Defaults to metrics.Noop{}. The handler records events under the pipeline's Name; retries under "<name>:retry"; dedup skips under "<name>:dedup_skip"; dead letters under "<name>:dead_letter"; and skipped records (ErrSkipRecord) under "<name>:skip" — same conventions as the streaming runtime.
func WithRetryBackoff ¶
func WithRetryBackoff(base, max time.Duration) HandlerOption
WithRetryBackoff configures the per-attempt sleep schedule. Doubles after each failure starting from base, capped at max, with full jitter. Defaults to 50 ms / 5 s.