Documentation
¶
Overview ¶
Package dynamodb provides a snapshot.Source backed by DynamoDB ParallelScan, suitable for bootstrapping a Murmur pipeline from a DDB-resident OLTP table.
The companion live source for the same shop is the DDB Streams Lambda runtime (pkg/exec/lambda/dynamodbstreams). Together they give you the canonical AWS-native CDC story:
this Source → bootstrap.Run → populate initial state DDB Streams → dynamodbstreams.Lambda → keep state live
ParallelScan ¶
DDB ParallelScan partitions the table into N segments and reads them concurrently. For a 100M-row table, parallelism cuts the scan from "an afternoon" to "a few hours" depending on RCU provisioning. The implementation here owns N goroutines, each scanning one segment and emitting records into the shared output channel.
Handoff token ¶
CaptureHandoff returns a DDB Streams shard-iterator position captured before the scan starts. The deployment system stores it and hands it to the live Lambda's event-source mapping as `StartingPositionTimestamp` (or equivalent).
Streams shard iterators expire after ~5 minutes if unused, so the handoff token here is the SHARD ID + a timestamp, not the iterator itself; the live runtime calls GetShardIterator with the captured timestamp at startup.
Resumption ¶
Each segment scan returns a `LastEvaluatedKey` per page. Saving these per-segment is straightforward but adds a per-batch checkpoint write — for moderate-size tables (<= 100M rows) the simpler "restart from the beginning, dedup catches it" approach is what production teams pick. Resume() is provided for symmetry with the snapshot.Source contract but currently restarts from segment-0 — at-least-once dedup at the bootstrap.Run level absorbs the duplicate emissions.
Index ¶
- type Config
- type Decoder
- type EventIDFn
- type Source
- func (s *Source[T]) CaptureHandoff(ctx context.Context) (snapshot.HandoffToken, error)
- func (s *Source[T]) Close() error
- func (s *Source[T]) Name() string
- func (s *Source[T]) Resume(ctx context.Context, _ []byte, out chan<- source.Record[T]) error
- func (s *Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config[T any] struct { // Client is an AWS SDK v2 DynamoDB client. The Source does not own // the client's lifecycle. Client *dynamodb.Client // StreamsClient is the SDK v2 DynamoDB Streams client used to // capture the live-source handoff token at scan start. Required when // the table has Streams enabled and you want a gap-and-duplicate- // free bootstrap → live transition. May be nil to skip handoff // capture; in that case CaptureHandoff returns nil token. StreamsClient *dynamodbstreams.Client // TableName is the DDB table to scan. TableName string // StreamARN, when non-empty, identifies the Streams ARN whose shard // iterator the handoff token should reference. Only meaningful when // StreamsClient is also set. StreamARN string // Segments is the number of parallel scan segments. DDB caps at // 1,000,000; in practice 4–32 is the right range for most tables. // For a 100M-row table at 1k RCU/s/partition with eventually- // consistent reads, segments=8 typically saturates the scan budget. // Defaults to 1 (sequential scan). Segments int // Decode converts a DDB item to T. Required. Decode Decoder[T] // EventID derives the dedup key per record. Default: synthetic // segment:index — unique within one scan but NOT across re-runs. // For idempotent re-runs, return the natural primary key. EventID EventIDFn[T] // PageSize caps Items per Scan call. Defaults to 1000 (DDB max). // Lower for memory-constrained workers consuming wide rows. PageSize int32 // FilterExpression, when non-empty, is applied server-side. Saves // network bytes and processing for tables where bootstrap should // only see a subset of rows. FilterExpression string // FilterValues / FilterNames support FilterExpression substitutions. FilterValues map[string]types.AttributeValue FilterNames map[string]string // OnDecodeError, when non-nil, is invoked for items that fail to // decode. Default behavior: drop silently. Wire to a DLQ for // poison-row visibility. OnDecodeError func(item map[string]types.AttributeValue, err error) }
Config configures a DDB-table snapshot.Source.
type Decoder ¶
type Decoder[T any] func(map[string]types.AttributeValue) (T, error)
Decoder converts a DDB item (the AttributeValue map from a scan response) into the pipeline's input type T. The most common shape: use aws/aws-sdk-go-v2/feature/dynamodb/attributevalue.UnmarshalMap to decode into a typed struct.
type EventIDFn ¶
type EventIDFn[T any] func(decoded T, item map[string]types.AttributeValue) string
EventIDFn extracts a stable per-record dedup key from the decoded value. For tables with a single partition key, returning the partition key's string form is the canonical choice — that way a re-run of the same scan folds idempotently under bootstrap.WithDedup. Default: a synthetic "<segment>:<index>" pair per record, which is unique within a single scan but NOT across re-runs (because segment work distribution can shift).
type Source ¶
type Source[T any] struct { // contains filtered or unexported fields }
Source implements snapshot.Source against a DDB ParallelScan.
func (*Source[T]) CaptureHandoff ¶
CaptureHandoff captures the DDB Streams shard iterator position the live consumer should resume from. Called once at bootstrap start.
If StreamsClient or StreamARN is unset, returns a nil token — caller should ensure the live source has its own way to skip past the already-bootstrapped data (e.g., starting from LATEST and accepting the duplicate-event window covered by dedup).