Documentation
¶
Overview ¶
Package dynamodb provides a DynamoDB-backed implementation of state.Store, the source-of-truth state for Murmur pipelines.
The store dispatches on monoid Kind to choose between native DDB primitives and optimistic-concurrency CAS:
- KindSum, KindCount with int64 / float64 values: atomic UpdateItem ADD. No read, no contention, full DDB throughput. Ships in this package as Int64SumStore.
- All other kinds: read-modify-write with conditional write on a version attribute. CAS retries up to MaxRetries on conflict. Ships in a follow-up.
Table schema:
PK pk (S) — the entity key SK sk (N) — the bucket ID (0 for non-windowed aggregations) v (N or B) — the value ttl (N) — optional Unix-epoch-seconds TTL (DDB native TTL attribute) ver (N) — optimistic-concurrency version (CAS path only)
Index ¶
- Variables
- func CreateBytesTable(ctx context.Context, client *dynamodb.Client, table string) error
- func CreateDedupTable(ctx context.Context, client *dynamodb.Client, table string) error
- func CreateInt64Table(ctx context.Context, client *dynamodb.Client, table string) error
- type BytesStore
- func (s *BytesStore) Close() error
- func (s *BytesStore) Get(ctx context.Context, k state.Key) ([]byte, bool, error)
- func (s *BytesStore) GetMany(ctx context.Context, ks []state.Key) ([][]byte, []bool, error)
- func (s *BytesStore) MergeUpdate(ctx context.Context, k state.Key, delta []byte, ttl time.Duration) error
- type Deduper
- type Int64MaxStore
- func (s *Int64MaxStore) Close() error
- func (s *Int64MaxStore) Get(ctx context.Context, k state.Key) (int64, bool, error)
- func (s *Int64MaxStore) GetMany(ctx context.Context, ks []state.Key) ([]int64, []bool, error)
- func (s *Int64MaxStore) MergeUpdate(ctx context.Context, k state.Key, delta int64, ttl time.Duration) error
- type Int64SumStore
- func (s *Int64SumStore) Close() error
- func (s *Int64SumStore) Get(ctx context.Context, k state.Key) (int64, bool, error)
- func (s *Int64SumStore) GetMany(ctx context.Context, ks []state.Key) ([]int64, []bool, error)
- func (s *Int64SumStore) MergeUpdate(ctx context.Context, k state.Key, delta int64, ttl time.Duration) error
Constants ¶
This section is empty.
Variables ¶
var ErrMaxRetriesExceeded = errors.New("dynamodb BytesStore: max CAS retries exceeded")
ErrMaxRetriesExceeded is returned when CAS contention prevents a successful merge within MaxRetries attempts. Caller should retry at a higher level or shed load.
Functions ¶
func CreateBytesTable ¶
CreateBytesTable creates a table with the schema expected by BytesStore. The schema is identical to Int64SumStore's; the difference is purely in the value attribute type (number vs binary), which DynamoDB handles dynamically.
func CreateDedupTable ¶
CreateDedupTable is a test/dev helper that creates a dedup table with the schema NewDeduper expects. Production should provision via Terraform with TTL enabled on the `ttl` attribute.
func CreateInt64Table ¶
CreateInt64Table is a test/dev helper that creates a DDB table with the schema expected by Int64SumStore. Production tables should be created via Terraform; this exists to keep integration tests self-contained.
Types ¶
type BytesStore ¶
type BytesStore struct {
// contains filtered or unexported fields
}
BytesStore is a state.Store[[]byte] that uses optimistic-concurrency CAS for monoidal merge. Suitable for sketches (HLL, TopK, Bloom) and any other byte-encoded monoid that can't be expressed as a single DDB UpdateExpression.
Update protocol:
- GetItem to read current value + version.
- Combine in-process via the user's monoid.
- Conditional PutItem: insert if attribute_not_exists(ver), else update if ver == expected. Increment ver atomically on success.
- On ConditionalCheckFailedException, retry from step 1 up to MaxRetries.
Throughput is lower than the atomic-ADD path; CAS contention scales with the per-key write rate. For very hot keys, configure a Valkey cache as accelerator (the cache absorbs the read-modify-write storm; we periodically snapshot back to DDB).
func NewBytesStore ¶
NewBytesStore returns a CAS-backed Store[[]byte] for the given monoid. The table must already exist; CreateBytesTable is the test helper for the schema.
func (*BytesStore) Close ¶
func (s *BytesStore) Close() error
Close is a no-op; the underlying client is owned by the caller.
func (*BytesStore) GetMany ¶
GetMany batches reads via BatchGetItem. Loops on UnprocessedKeys with bounded exponential backoff so a throttled batch doesn't silently truncate results.
func (*BytesStore) MergeUpdate ¶
func (s *BytesStore) MergeUpdate(ctx context.Context, k state.Key, delta []byte, ttl time.Duration) error
MergeUpdate performs the read-combine-conditional-write loop. CAS contention (ConditionalCheckFailedException) retries with exponential backoff + full jitter, identical to the BatchGetItem retry policy. Without backoff, N concurrent writers on the same hot key would all retry at once and burn tight CPU + DDB request capacity in lockstep — under enough contention they'd never make forward progress.
type Deduper ¶
type Deduper struct {
// contains filtered or unexported fields
}
Deduper is a DynamoDB-backed implementation of state.Deduper. It uses a dedicated table whose only job is to claim EventIDs atomically: the streaming runtime calls MarkSeen with each Source.Record's EventID before applying the monoid Combine, and a duplicate (a record re-delivered after a crash) is short-circuited cleanly.
Schema:
pk (S) — the EventID ttl (N) — Unix-epoch seconds when the entry should be evicted (DDB native TTL)
Atomic claim: PutItem with ConditionExpression "attribute_not_exists(pk)". Concurrent claims by two workers race; exactly one's PutItem succeeds and returns nil; the other gets ConditionalCheckFailedException and the wrapper returns firstSeen=false.
func NewDeduper ¶
NewDeduper constructs a Deduper backed by the named table. ttl is how long each EventID claim is retained before DDB's TTL feature evicts it; pick a value > the source's max delivery latency. 24h is a reasonable default for Kafka with bounded retention; longer for Kinesis with extended retention.
type Int64MaxStore ¶
type Int64MaxStore struct {
// contains filtered or unexported fields
}
Int64MaxStore is a state.Store[int64] specialized for the **monotonic counter** pattern: each `MergeUpdate(k, v)` sets the stored value to `v` only if `v > current`. Out-of-order events with values lower than what's already stored are silently dropped. Equivalent to the `SetCountIfGreater` pattern from count-core's review.
Mechanism: a single DDB UpdateItem with a conditional expression:
UpdateExpression: "SET #v = :v" ConditionExpression: "attribute_not_exists(#v) OR #v < :v"
On condition failure (existing value >= new), DDB returns ConditionalCheckFailedException — the store catches it and treats the operation as a no-op success. From the caller's perspective the MergeUpdate succeeded; the stored value is the higher of the two.
When to use this vs Int64SumStore ¶
- Int64SumStore: events carry DELTAS. `MergeUpdate(k, +1)` adds 1 to the running sum. Atomic ADD; high throughput; correct under at-least-once with monoid-Sum's commutativity.
- Int64MaxStore: events carry ABSOLUTE VALUES. `MergeUpdate(k, 42)` sets the stored value to 42 if the current is < 42, otherwise ignores. The natural fit for cache-fill / "I just observed this value" / version-stamped counter patterns where the producer knows the absolute count and downstream consumers might process out of order.
Pairs with the Max monoid ¶
Pipelines using `core.Max[int64]()` semantically expect this store rather than Int64SumStore. The Murmur monoid framework's MergeUpdate contract is "combine delta into existing"; for Max semantics, "delta" IS "the new candidate value" and "combine" is "take the bigger one" — which is exactly what the conditional UpdateItem implements at the DDB level.
Out-of-order safety ¶
Two workers processing different events for the same key can write in any order; the higher value wins. This is the behavior count-core builds with its SetCountIfGreater idiom; using Int64MaxStore inherits the same guarantee at the storage layer.
Cost ¶
One DDB UpdateItem per MergeUpdate, conditional. CCF (condition fail) is NOT charged a write capacity unit per AWS docs (the conditional check is part of the read side); successful writes are charged 1 WCU. So worst-case cost is ~1 WCU/event, same as Int64SumStore.
func NewInt64MaxStore ¶
func NewInt64MaxStore(client *dynamodb.Client, table string) *Int64MaxStore
NewInt64MaxStore constructs an Int64MaxStore against the named table. The table must exist with the standard schema (pk:S, sk:N, v:N) — use CreateInt64Table from this package as the test/DDB-local helper.
func (*Int64MaxStore) Close ¶
func (s *Int64MaxStore) Close() error
Close is a no-op; the underlying DDB client is owned by the caller.
func (*Int64MaxStore) Get ¶
Get returns the stored value at k. Identical implementation to Int64SumStore.Get — the storage shape is the same; only MergeUpdate differs.
func (*Int64MaxStore) GetMany ¶
GetMany delegates to the same BatchGetItem-with-UnprocessedKeys retry loop Int64SumStore uses. Implementation is identical.
func (*Int64MaxStore) MergeUpdate ¶
func (s *Int64MaxStore) MergeUpdate(ctx context.Context, k state.Key, delta int64, ttl time.Duration) error
MergeUpdate sets the stored value at k to delta IF delta is strictly greater than the current stored value (or if no value yet exists). Otherwise it's a no-op success — the higher value already wins.
Note: `delta` here is misleading wording carried over from the Store interface. For Int64MaxStore it's the new ABSOLUTE candidate value, not a delta-to-add. Callers passing a counter from the producer side (count-core's typical shape) pass the count itself.
type Int64SumStore ¶
type Int64SumStore struct {
// contains filtered or unexported fields
}
Int64SumStore is a state.Store[int64] specialized for the KindSum monoid. MergeUpdate uses DynamoDB's atomic ADD UpdateExpression: no read, no CAS retry, no application-side race conditions. The fastest path for high-frequency counter pipelines.
func NewInt64SumStore ¶
func NewInt64SumStore(client *dynamodb.Client, table string) *Int64SumStore
NewInt64SumStore returns a Store backed by the given DDB table. The table must already exist with schema (pk: S, sk: N) — see CreateInt64Table for a helper used in tests.
func (*Int64SumStore) Close ¶
func (s *Int64SumStore) Close() error
Close is a no-op — the underlying client is owned by the caller.
func (*Int64SumStore) GetMany ¶
GetMany batches reads via BatchGetItem. Returns one entry per requested key in order; missing keys return zero value and ok=false at that index.
func (*Int64SumStore) MergeUpdate ¶
func (s *Int64SumStore) MergeUpdate(ctx context.Context, k state.Key, delta int64, ttl time.Duration) error
MergeUpdate atomically adds delta to the value at k via DynamoDB's ADD UpdateExpression. Idempotent under at-least-once dedup applied upstream — the DDB call itself is a single atomic operation, no application-side CAS required.
If ttl is nonzero, sets the ttl attribute to now + ttl (Unix epoch seconds). DynamoDB's TTL feature evicts the row asynchronously when ttl elapses; useful for windowed aggregations to retire old buckets.