dynamodb

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package dynamodb provides a snapshot.Source backed by DynamoDB ParallelScan, suitable for bootstrapping a Murmur pipeline from a DDB-resident OLTP table.

The companion live source for the same shop is the DDB Streams Lambda runtime (pkg/exec/lambda/dynamodbstreams). Together they give you the canonical AWS-native CDC story:

this Source     → bootstrap.Run    → populate initial state
DDB Streams →     dynamodbstreams.Lambda → keep state live

ParallelScan

DDB ParallelScan partitions the table into N segments and reads them concurrently. For a 100M-row table, parallelism cuts the scan from "an afternoon" to "a few hours" depending on RCU provisioning. The implementation here owns N goroutines, each scanning one segment and emitting records into the shared output channel.

Handoff token

CaptureHandoff returns a DDB Streams shard-iterator position captured before the scan starts. The deployment system stores it and hands it to the live Lambda's event-source mapping as `StartingPositionTimestamp` (or equivalent).

Streams shard iterators expire after ~5 minutes if unused, so the handoff token here is the SHARD ID + a timestamp, not the iterator itself; the live runtime calls GetShardIterator with the captured timestamp at startup.

Resumption

Each segment scan returns a `LastEvaluatedKey` per page. Saving these per-segment is straightforward but adds a per-batch checkpoint write — for moderate-size tables (<= 100M rows) the simpler "restart from the beginning, dedup catches it" approach is what production teams pick. Resume() is provided for symmetry with the snapshot.Source contract but currently restarts from segment-0 — at-least-once dedup at the bootstrap.Run level absorbs the duplicate emissions.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	// Client is an AWS SDK v2 DynamoDB client. The Source does not own
	// the client's lifecycle.
	Client *dynamodb.Client

	// StreamsClient is the SDK v2 DynamoDB Streams client used to
	// capture the live-source handoff token at scan start. Required when
	// the table has Streams enabled and you want a gap-and-duplicate-
	// free bootstrap → live transition. May be nil to skip handoff
	// capture; in that case CaptureHandoff returns nil token.
	StreamsClient *dynamodbstreams.Client

	// TableName is the DDB table to scan.
	TableName string

	// StreamARN, when non-empty, identifies the Streams ARN whose shard
	// iterator the handoff token should reference. Only meaningful when
	// StreamsClient is also set.
	StreamARN string

	// Segments is the number of parallel scan segments. DDB caps at
	// 1,000,000; in practice 4–32 is the right range for most tables.
	// For a 100M-row table at 1k RCU/s/partition with eventually-
	// consistent reads, segments=8 typically saturates the scan budget.
	// Defaults to 1 (sequential scan).
	Segments int

	// Decode converts a DDB item to T. Required.
	Decode Decoder[T]

	// EventID derives the dedup key per record. Default: synthetic
	// segment:index — unique within one scan but NOT across re-runs.
	// For idempotent re-runs, return the natural primary key.
	EventID EventIDFn[T]

	// PageSize caps Items per Scan call. Defaults to 1000 (DDB max).
	// Lower for memory-constrained workers consuming wide rows.
	PageSize int32

	// FilterExpression, when non-empty, is applied server-side. Saves
	// network bytes and processing for tables where bootstrap should
	// only see a subset of rows.
	FilterExpression string

	// FilterValues / FilterNames support FilterExpression substitutions.
	FilterValues map[string]types.AttributeValue
	FilterNames  map[string]string

	// OnDecodeError, when non-nil, is invoked for items that fail to
	// decode. Default behavior: drop silently. Wire to a DLQ for
	// poison-row visibility.
	OnDecodeError func(item map[string]types.AttributeValue, err error)
}

Config configures a DDB-table snapshot.Source.

type Decoder

type Decoder[T any] func(map[string]types.AttributeValue) (T, error)

Decoder converts a DDB item (the AttributeValue map from a scan response) into the pipeline's input type T. The most common shape: use aws/aws-sdk-go-v2/feature/dynamodb/attributevalue.UnmarshalMap to decode into a typed struct.

type EventIDFn

type EventIDFn[T any] func(decoded T, item map[string]types.AttributeValue) string

EventIDFn extracts a stable per-record dedup key from the decoded value. For tables with a single partition key, returning the partition key's string form is the canonical choice — that way a re-run of the same scan folds idempotently under bootstrap.WithDedup. Default: a synthetic "<segment>:<index>" pair per record, which is unique within a single scan but NOT across re-runs (because segment work distribution can shift).

type Source

type Source[T any] struct {
	// contains filtered or unexported fields
}

Source implements snapshot.Source against a DDB ParallelScan.

func NewSource

func NewSource[T any](cfg Config[T]) (*Source[T], error)

NewSource constructs the Source. Validates required fields.

func (*Source[T]) CaptureHandoff

func (s *Source[T]) CaptureHandoff(ctx context.Context) (snapshot.HandoffToken, error)

CaptureHandoff captures the DDB Streams shard iterator position the live consumer should resume from. Called once at bootstrap start.

If StreamsClient or StreamARN is unset, returns a nil token — caller should ensure the live source has its own way to skip past the already-bootstrapped data (e.g., starting from LATEST and accepting the duplicate-event window covered by dedup).

func (*Source[T]) Close

func (s *Source[T]) Close() error

Close is a no-op; the SDK clients are owned by the caller.

func (*Source[T]) Name

func (s *Source[T]) Name() string

Name returns "ddb-snapshot:<table>".

func (*Source[T]) Resume

func (s *Source[T]) Resume(ctx context.Context, _ []byte, out chan<- source.Record[T]) error

Resume restarts the scan from the beginning. See package doc — at- least-once dedup at bootstrap.Run absorbs the duplicate emissions. A future variant could decode `marker` to per-segment LastEvaluatedKey state for true mid-scan resumption.

func (*Source[T]) Scan

func (s *Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error

Scan runs the parallel scan, emitting records via out. Returns nil when all segments complete; non-nil on the first segment error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL