Documentation
¶
Index ¶
- Constants
- Variables
- func EnsureTables(ctx context.Context, client *dynamodb.Client, ...) error
- func ProvideOpaqueStore(client dynamoclient.Client, table AggregatesTableName) *opaquedynamo.Store
- type AggregatesTableName
- type DynamoDBStore
- func (s *DynamoDBStore) Load(ctx context.Context, aggregateID string) (*historyv1.History, error)
- func (s *DynamoDBStore) LoadTail(ctx context.Context, aggregateID string, n int) (*historyv1.History, error)
- func (s *DynamoDBStore) Save(ctx context.Context, aggregateID string, records ...*recordv1.Record) error
- func (s *DynamoDBStore) SaveAggregate(ctx context.Context, aggregate proto.Message) error
- type EventsTableName
- type Option
Constants ¶
const (
DefaultEventsTable = "events"
)
DynamoDB attribute names are kept to single characters to minimize read/write costs. DynamoDB charges per byte for both reads and writes, and attribute names are included in every item's stored and transferred size. At scale (millions of events), the savings from "a" vs "aggregate_id" are material.
const NumGSIs = 20
NumGSIs is the number of GSI pairs on the aggregates table. The opaquedata single-table design projects every annotated field into one of these 20 slots.
Variables ¶
var ProviderSet = wire.NewSet( ProvideOpaqueStore, ProvideStore, wire.Bind(new(protosource.Store), new(*DynamoDBStore)), wire.Bind(new(protosource.AggregateStore), new(*DynamoDBStore)), )
ProviderSet provides the DynamoDB event store, opaque store, and binds to protosource interfaces. The consumer must provide a dynamoclient.Client and the table name types.
Functions ¶
func EnsureTables ¶ added in v0.1.5
func EnsureTables(ctx context.Context, client *dynamodb.Client, eventsTable, aggregatesTable string) error
EnsureTables idempotently creates the events and aggregates tables. If a table already exists it is left alone. Tables are created with PAY_PER_REQUEST billing, deletion protection enabled, TTL on attribute "t", and PITR enabled.
PITR and TTL enablement are best-effort — DynamoDB Local does not always support them, so failures are silently ignored to keep local development and test runs working.
func ProvideOpaqueStore ¶
func ProvideOpaqueStore(client dynamoclient.Client, table AggregatesTableName) *opaquedynamo.Store
Types ¶
type AggregatesTableName ¶
type AggregatesTableName string
AggregatesTableName is a named type for Wire to distinguish table name strings.
type DynamoDBStore ¶
type DynamoDBStore struct {
// contains filtered or unexported fields
}
DynamoDBStore implements the protosource Store, AggregateStore, and SnapshotTailStore interfaces backed by DynamoDB.
func New ¶
func New(client dynamoclient.Client, opts ...Option) (*DynamoDBStore, error)
New creates a new DynamoDBStore. The client must not be nil.
func ProvideStore ¶
func ProvideStore(client dynamoclient.Client, opaqueStore *opaquedynamo.Store, table EventsTableName) (*DynamoDBStore, error)
func (*DynamoDBStore) Load ¶
Load retrieves the full event history for the given aggregate ID in ascending version order. Paginates automatically if DynamoDB returns partial results.
func (*DynamoDBStore) LoadTail ¶
func (s *DynamoDBStore) LoadTail(ctx context.Context, aggregateID string, n int) (*historyv1.History, error)
LoadTail returns the last n events for the given aggregate, ordered by version ascending. It queries DynamoDB in descending order with a per-page limit of n, paginating until n records are collected or no more pages remain, then reverses the results.
If n <= 0, an empty History is returned immediately.
func (*DynamoDBStore) Save ¶
func (s *DynamoDBStore) Save(ctx context.Context, aggregateID string, records ...*recordv1.Record) error
Save stores records for the given aggregate ID. Each batch of up to 100 records is written atomically using TransactWriteItems with condition expressions to prevent duplicate versions.
When len(records) exceeds 100, Save splits the work into multiple transactions. Atomicity is guaranteed within each batch, but NOT across batches: if a later batch fails, earlier batches are already committed. Callers that require all-or-nothing semantics for large writes should pre-validate or limit batch size upstream.
Saving zero records is a no-op.
func (*DynamoDBStore) SaveAggregate ¶
SaveAggregate persists the materialized aggregate state via the OpaqueStore. The aggregate must implement opaquedata.AutoPKSK (generated from proto annotations) and an OpaqueStore must be configured via WithOpaqueStore. The aggregates table uses pk/sk keys with GSIs for query access patterns.
type EventsTableName ¶
type EventsTableName string
EventsTableName is a named type for Wire to distinguish table name strings.
type Option ¶
type Option func(*DynamoDBStore)
Option configures a DynamoDBStore.
func WithEventsTable ¶
WithEventsTable sets the DynamoDB table name used for event storage.
func WithOpaqueStore ¶
func WithOpaqueStore(store opaquedata.OpaqueStore) Option
WithOpaqueStore sets the OpaqueStore used by SaveAggregate to persist materialized aggregates. The aggregates table uses pk/sk (String/String) keys with up to 20 GSIs for query access patterns. All aggregates must implement opaquedata.AutoPKSK to be materialized.
func WithTTL ¶
WithTTL sets a time-to-live duration for event records. When set, each saved event includes a TTL attribute ("t") containing the Unix epoch second at which the record should expire. The DynamoDB table must have TTL enabled on the "t" attribute for automatic deletion to occur.
A zero or negative duration disables TTL (the default).