dynamodbstore

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: MIT Imports: 17 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultEventsTable = "events"
)

DynamoDB attribute names are kept to single characters to minimize read/write costs. DynamoDB charges per byte for both reads and writes, and attribute names are included in every item's stored and transferred size. At scale (millions of events), the savings from "a" vs "aggregate_id" are material.

View Source
const NumGSIs = 20

NumGSIs is the number of GSI pairs on the aggregates table. The opaquedata single-table design projects every annotated field into one of these 20 slots.

Variables

ProviderSet provides the DynamoDB event store, opaque store, and binds to protosource interfaces. The consumer must provide a dynamoclient.Client and the table name types.

Functions

func EnsureTables added in v0.1.5

func EnsureTables(ctx context.Context, client *dynamodb.Client, eventsTable, aggregatesTable string) error

EnsureTables idempotently creates the events and aggregates tables. If a table already exists it is left alone. Tables are created with PAY_PER_REQUEST billing, deletion protection enabled, TTL on attribute "t", and PITR enabled.

PITR and TTL enablement are best-effort — DynamoDB Local does not always support them, so failures are silently ignored to keep local development and test runs working.

func ProvideOpaqueStore

func ProvideOpaqueStore(client dynamoclient.Client, table AggregatesTableName) *opaquedynamo.Store

Types

type AggregatesTableName

type AggregatesTableName string

AggregatesTableName is a named type for Wire to distinguish table name strings.

type DynamoDBStore

type DynamoDBStore struct {
	// contains filtered or unexported fields
}

DynamoDBStore implements the protosource Store, AggregateStore, and SnapshotTailStore interfaces backed by DynamoDB.

func New

func New(client dynamoclient.Client, opts ...Option) (*DynamoDBStore, error)

New creates a new DynamoDBStore. The client must not be nil.

func ProvideStore

func ProvideStore(client dynamoclient.Client, opaqueStore *opaquedynamo.Store, table EventsTableName) (*DynamoDBStore, error)

func (*DynamoDBStore) Load

func (s *DynamoDBStore) Load(ctx context.Context, aggregateID string) (*historyv1.History, error)

Load retrieves the full event history for the given aggregate ID in ascending version order. Paginates automatically if DynamoDB returns partial results.

func (*DynamoDBStore) LoadTail

func (s *DynamoDBStore) LoadTail(ctx context.Context, aggregateID string, n int) (*historyv1.History, error)

LoadTail returns the last n events for the given aggregate, ordered by version ascending. It queries DynamoDB in descending order with a per-page limit of n, paginating until n records are collected or no more pages remain, then reverses the results.

If n <= 0, an empty History is returned immediately.

func (*DynamoDBStore) Save

func (s *DynamoDBStore) Save(ctx context.Context, aggregateID string, records ...*recordv1.Record) error

Save stores records for the given aggregate ID. Each batch of up to 100 records is written atomically using TransactWriteItems with condition expressions to prevent duplicate versions.

When len(records) exceeds 100, Save splits the work into multiple transactions. Atomicity is guaranteed within each batch, but NOT across batches: if a later batch fails, earlier batches are already committed. Callers that require all-or-nothing semantics for large writes should pre-validate or limit batch size upstream.

Saving zero records is a no-op.

func (*DynamoDBStore) SaveAggregate

func (s *DynamoDBStore) SaveAggregate(ctx context.Context, aggregate proto.Message) error

SaveAggregate persists the materialized aggregate state via the OpaqueStore. The aggregate must implement opaquedata.AutoPKSK (generated from proto annotations) and an OpaqueStore must be configured via WithOpaqueStore. The aggregates table uses pk/sk keys with GSIs for query access patterns.

type EventsTableName

type EventsTableName string

EventsTableName is a named type for Wire to distinguish table name strings.

type Option

type Option func(*DynamoDBStore)

Option configures a DynamoDBStore.

func WithEventsTable

func WithEventsTable(name string) Option

WithEventsTable sets the DynamoDB table name used for event storage.

func WithOpaqueStore

func WithOpaqueStore(store opaquedata.OpaqueStore) Option

WithOpaqueStore sets the OpaqueStore used by SaveAggregate to persist materialized aggregates. The aggregates table uses pk/sk (String/String) keys with up to 20 GSIs for query access patterns. All aggregates must implement opaquedata.AutoPKSK to be materialized.

func WithTTL

func WithTTL(ttl time.Duration) Option

WithTTL sets a time-to-live duration for event records. When set, each saved event includes a TTL attribute ("t") containing the Unix epoch second at which the record should expire. The DynamoDB table must have TTL enabled on the "t" attribute for automatic deletion to occur.

A zero or negative duration disables TTL (the default).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL