dynamodb

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package dynamodb provides a DynamoDB-backed implementation of state.Store, the source-of-truth state for Murmur pipelines.

The store dispatches on monoid Kind to choose between native DDB primitives and optimistic-concurrency CAS:

  • KindSum, KindCount with int64 / float64 values: atomic UpdateItem ADD. No read, no contention, full DDB throughput. Ships in this package as Int64SumStore.
  • All other kinds: read-modify-write with conditional write on a version attribute. CAS retries up to MaxRetries on conflict. Ships in a follow-up.

Table schema:

PK pk (S) — the entity key
SK sk (N) — the bucket ID (0 for non-windowed aggregations)
   v  (N or B) — the value
   ttl (N) — optional Unix-epoch-seconds TTL (DDB native TTL attribute)
   ver (N) — optimistic-concurrency version (CAS path only)

Index

Constants

This section is empty.

Variables

View Source
var ErrMaxRetriesExceeded = errors.New("dynamodb BytesStore: max CAS retries exceeded")

ErrMaxRetriesExceeded is returned when CAS contention prevents a successful merge within MaxRetries attempts. Caller should retry at a higher level or shed load.

Functions

func CreateBytesTable

func CreateBytesTable(ctx context.Context, client *dynamodb.Client, table string) error

CreateBytesTable creates a table with the schema expected by BytesStore. The schema is identical to Int64SumStore's; the difference is purely in the value attribute type (number vs binary), which DynamoDB handles dynamically.

func CreateDedupTable

func CreateDedupTable(ctx context.Context, client *dynamodb.Client, table string) error

CreateDedupTable is a test/dev helper that creates a dedup table with the schema NewDeduper expects. Production should provision via Terraform with TTL enabled on the `ttl` attribute.

func CreateInt64Table

func CreateInt64Table(ctx context.Context, client *dynamodb.Client, table string) error

CreateInt64Table is a test/dev helper that creates a DDB table with the schema expected by Int64SumStore. Production tables should be created via Terraform; this exists to keep integration tests self-contained.

Types

type BytesStore

type BytesStore struct {
	// contains filtered or unexported fields
}

BytesStore is a state.Store[[]byte] that uses optimistic-concurrency CAS for monoidal merge. Suitable for sketches (HLL, TopK, Bloom) and any other byte-encoded monoid that can't be expressed as a single DDB UpdateExpression.

Update protocol:

  1. GetItem to read current value + version.
  2. Combine in-process via the user's monoid.
  3. Conditional PutItem: insert if attribute_not_exists(ver), else update if ver == expected. Increment ver atomically on success.
  4. On ConditionalCheckFailedException, retry from step 1 up to MaxRetries.

Throughput is lower than the atomic-ADD path; CAS contention scales with the per-key write rate. For very hot keys, configure a Valkey cache as accelerator (the cache absorbs the read-modify-write storm; we periodically snapshot back to DDB).

func NewBytesStore

func NewBytesStore(client *dynamodb.Client, table string, m monoid.Monoid[[]byte]) *BytesStore

NewBytesStore returns a CAS-backed Store[[]byte] for the given monoid. The table must already exist; CreateBytesTable is the test helper for the schema.

func (*BytesStore) Close

func (s *BytesStore) Close() error

Close is a no-op; the underlying client is owned by the caller.

func (*BytesStore) Get

func (s *BytesStore) Get(ctx context.Context, k state.Key) ([]byte, bool, error)

Get reads the current sketch bytes at k.

func (*BytesStore) GetMany

func (s *BytesStore) GetMany(ctx context.Context, ks []state.Key) ([][]byte, []bool, error)

GetMany batches reads via BatchGetItem. Loops on UnprocessedKeys with bounded exponential backoff so a throttled batch doesn't silently truncate results.

func (*BytesStore) MergeUpdate

func (s *BytesStore) MergeUpdate(ctx context.Context, k state.Key, delta []byte, ttl time.Duration) error

MergeUpdate performs the read-combine-conditional-write loop. CAS contention (ConditionalCheckFailedException) retries with exponential backoff + full jitter, identical to the BatchGetItem retry policy. Without backoff, N concurrent writers on the same hot key would all retry at once and burn tight CPU + DDB request capacity in lockstep — under enough contention they'd never make forward progress.

type Deduper

type Deduper struct {
	// contains filtered or unexported fields
}

Deduper is a DynamoDB-backed implementation of state.Deduper. It uses a dedicated table whose only job is to claim EventIDs atomically: the streaming runtime calls MarkSeen with each Source.Record's EventID before applying the monoid Combine, and a duplicate (a record re-delivered after a crash) is short-circuited cleanly.

Schema:

pk  (S) — the EventID
ttl (N) — Unix-epoch seconds when the entry should be evicted (DDB native TTL)

Atomic claim: PutItem with ConditionExpression "attribute_not_exists(pk)". Concurrent claims by two workers race; exactly one's PutItem succeeds and returns nil; the other gets ConditionalCheckFailedException and the wrapper returns firstSeen=false.

func NewDeduper

func NewDeduper(client *dynamodb.Client, table string, ttl time.Duration) *Deduper

NewDeduper constructs a Deduper backed by the named table. ttl is how long each EventID claim is retained before DDB's TTL feature evicts it; pick a value > the source's max delivery latency. 24h is a reasonable default for Kafka with bounded retention; longer for Kinesis with extended retention.

func (*Deduper) Close

func (d *Deduper) Close() error

Close is a no-op; the underlying client is owned by the caller.

func (*Deduper) MarkSeen

func (d *Deduper) MarkSeen(ctx context.Context, eventID string) (bool, error)

MarkSeen claims eventID. firstSeen=true means the caller wins and should proceed with processing; firstSeen=false means the EventID was already claimed by some prior call (i.e. the record is a duplicate).

type Int64MaxStore

type Int64MaxStore struct {
	// contains filtered or unexported fields
}

Int64MaxStore is a state.Store[int64] specialized for the **monotonic counter** pattern: each `MergeUpdate(k, v)` sets the stored value to `v` only if `v > current`. Out-of-order events with values lower than what's already stored are silently dropped. Equivalent to the `SetCountIfGreater` pattern from count-core's review.

Mechanism: a single DDB UpdateItem with a conditional expression:

UpdateExpression:    "SET #v = :v"
ConditionExpression: "attribute_not_exists(#v) OR #v < :v"

On condition failure (existing value >= new), DDB returns ConditionalCheckFailedException — the store catches it and treats the operation as a no-op success. From the caller's perspective the MergeUpdate succeeded; the stored value is the higher of the two.

When to use this vs Int64SumStore

  • Int64SumStore: events carry DELTAS. `MergeUpdate(k, +1)` adds 1 to the running sum. Atomic ADD; high throughput; correct under at-least-once with monoid-Sum's commutativity.
  • Int64MaxStore: events carry ABSOLUTE VALUES. `MergeUpdate(k, 42)` sets the stored value to 42 if the current is < 42, otherwise ignores. The natural fit for cache-fill / "I just observed this value" / version-stamped counter patterns where the producer knows the absolute count and downstream consumers might process out of order.

Pairs with the Max monoid

Pipelines using `core.Max[int64]()` semantically expect this store rather than Int64SumStore. The Murmur monoid framework's MergeUpdate contract is "combine delta into existing"; for Max semantics, "delta" IS "the new candidate value" and "combine" is "take the bigger one" — which is exactly what the conditional UpdateItem implements at the DDB level.

Out-of-order safety

Two workers processing different events for the same key can write in any order; the higher value wins. This is the behavior count-core builds with its SetCountIfGreater idiom; using Int64MaxStore inherits the same guarantee at the storage layer.

Cost

One DDB UpdateItem per MergeUpdate, conditional. CCF (condition fail) is NOT charged a write capacity unit per AWS docs (the conditional check is part of the read side); successful writes are charged 1 WCU. So worst-case cost is ~1 WCU/event, same as Int64SumStore.

func NewInt64MaxStore

func NewInt64MaxStore(client *dynamodb.Client, table string) *Int64MaxStore

NewInt64MaxStore constructs an Int64MaxStore against the named table. The table must exist with the standard schema (pk:S, sk:N, v:N) — use CreateInt64Table from this package as the test/DDB-local helper.

func (*Int64MaxStore) Close

func (s *Int64MaxStore) Close() error

Close is a no-op; the underlying DDB client is owned by the caller.

func (*Int64MaxStore) Get

func (s *Int64MaxStore) Get(ctx context.Context, k state.Key) (int64, bool, error)

Get returns the stored value at k. Identical implementation to Int64SumStore.Get — the storage shape is the same; only MergeUpdate differs.

func (*Int64MaxStore) GetMany

func (s *Int64MaxStore) GetMany(ctx context.Context, ks []state.Key) ([]int64, []bool, error)

GetMany delegates to the same BatchGetItem-with-UnprocessedKeys retry loop Int64SumStore uses. Implementation is identical.

func (*Int64MaxStore) MergeUpdate

func (s *Int64MaxStore) MergeUpdate(ctx context.Context, k state.Key, delta int64, ttl time.Duration) error

MergeUpdate sets the stored value at k to delta IF delta is strictly greater than the current stored value (or if no value yet exists). Otherwise it's a no-op success — the higher value already wins.

Note: `delta` here is misleading wording carried over from the Store interface. For Int64MaxStore it's the new ABSOLUTE candidate value, not a delta-to-add. Callers passing a counter from the producer side (count-core's typical shape) pass the count itself.

type Int64SumStore

type Int64SumStore struct {
	// contains filtered or unexported fields
}

Int64SumStore is a state.Store[int64] specialized for the KindSum monoid. MergeUpdate uses DynamoDB's atomic ADD UpdateExpression: no read, no CAS retry, no application-side race conditions. The fastest path for high-frequency counter pipelines.

func NewInt64SumStore

func NewInt64SumStore(client *dynamodb.Client, table string) *Int64SumStore

NewInt64SumStore returns a Store backed by the given DDB table. The table must already exist with schema (pk: S, sk: N) — see CreateInt64Table for a helper used in tests.

func (*Int64SumStore) Close

func (s *Int64SumStore) Close() error

Close is a no-op — the underlying client is owned by the caller.

func (*Int64SumStore) Get

func (s *Int64SumStore) Get(ctx context.Context, k state.Key) (int64, bool, error)

Get returns the current sum for k. Missing keys return 0, false, nil.

func (*Int64SumStore) GetMany

func (s *Int64SumStore) GetMany(ctx context.Context, ks []state.Key) ([]int64, []bool, error)

GetMany batches reads via BatchGetItem. Returns one entry per requested key in order; missing keys return zero value and ok=false at that index.

func (*Int64SumStore) MergeUpdate

func (s *Int64SumStore) MergeUpdate(ctx context.Context, k state.Key, delta int64, ttl time.Duration) error

MergeUpdate atomically adds delta to the value at k via DynamoDB's ADD UpdateExpression. Idempotent under at-least-once dedup applied upstream — the DDB call itself is a single atomic operation, no application-side CAS required.

If ttl is nonzero, sets the ttl attribute to now + ttl (Unix epoch seconds). DynamoDB's TTL feature evicts the row asynchronously when ttl elapses; useful for windowed aggregations to retire old buckets.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL