Documentation
¶
Overview ¶
Package state defines the StateStore abstraction Murmur pipelines write aggregations to. Concrete implementations live in subpackages (state/dynamodb, state/valkey).
Murmur's invariant: DynamoDB is always the source of truth. Valkey, when configured, is a read-cache and sketch-accelerator — never trusted as ground truth. If Valkey is lost, every pipeline can rebuild its accelerator state from DynamoDB.
Index ¶
- type Cache
- type Deduper
- type Instrumented
- func (s *Instrumented[V]) Close() error
- func (s *Instrumented[V]) Get(ctx context.Context, k Key) (V, bool, error)
- func (s *Instrumented[V]) GetMany(ctx context.Context, ks []Key) ([]V, []bool, error)
- func (s *Instrumented[V]) MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error
- type InstrumentedCache
- func (c *InstrumentedCache[V]) Close() error
- func (c *InstrumentedCache[V]) Get(ctx context.Context, k Key) (V, bool, error)
- func (c *InstrumentedCache[V]) GetMany(ctx context.Context, ks []Key) ([]V, []bool, error)
- func (c *InstrumentedCache[V]) MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error
- func (c *InstrumentedCache[V]) Repopulate(ctx context.Context, src Store[V], keys []Key) error
- type Key
- type OnMergeFunc
- type OnMergeOption
- type Store
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Cache ¶
type Cache[V any] interface { Store[V] // Repopulate rebuilds the cache from the authoritative Store, e.g. after a Valkey // node restart. Implementations may stream from Store and insert in batches. Repopulate(ctx context.Context, src Store[V], keys []Key) error }
Cache is the read-side accelerator. It mirrors a subset of Store data and may serve reads with lower latency. Pipelines configured with a cache write through to both Store and Cache; on Cache miss, they fall back to Store.
A Cache is *never* a source of truth. Implementations should treat their data as repopulatable from the underlying Store at any time.
type Deduper ¶
type Deduper interface {
// MarkSeen atomically attempts to claim eventID. Returns firstSeen=true
// if the caller is the first to mark this ID; firstSeen=false if it was
// already claimed. Returns an error only on transient backend failures
// — duplicates are NOT errors.
MarkSeen(ctx context.Context, eventID string) (firstSeen bool, err error)
// Close releases any underlying resources.
Close() error
}
Deduper is the at-least-once dedup contract. The streaming runtime calls MarkSeen with each Source.Record's EventID before applying the monoid Combine; on a duplicate (worker crashed mid-write, source replays the record) MarkSeen returns firstSeen=false and the runtime skips the merge.
Implementations must be:
- Atomic. Two concurrent calls with the same EventID must produce exactly one firstSeen=true and one firstSeen=false — never two of either.
- Bounded. EventIDs older than a configured retention should fall out so the dedup table doesn't grow forever. Typical TTL is hours to days depending on how long messages can sit unacked in the source's retention window.
type Instrumented ¶
type Instrumented[V any] struct { // contains filtered or unexported fields }
Instrumented wraps a Store[V] (or Cache[V]) with metrics.Recorder hooks for every operation. The wrapped store sees identical traffic; the recorder gets per-op latency and error counts.
Operation names follow the streaming runtime's convention:
- "<label>:store_get" (Get)
- "<label>:store_get_many" (GetMany)
- "<label>:store_merge_update" (MergeUpdate)
where `label` is the supplied namespace (typically the pipeline name). Wrap the SAME label as the streaming/query layer uses so latency histograms aggregate cleanly.
Usage:
rawStore := mddb.NewInt64SumStore(client, "page_views") store := state.Instrumented[int64](rawStore, recorder, "page_views") pipeline.NewPipeline[Event, int64](...).StoreIn(store)
Cost: one time.Now() at op entry + one RecordLatency at op exit. Sub-microsecond when the recorder is a Noop; ~100ns + lock contention for the InMemory recorder. Negligible against any DDB / Valkey operation cost.
For Cache[V], see InstrumentedCache below — same pattern, adds the Repopulate latency op.
func (*Instrumented[V]) Get ¶
Get records latency under "<label>:store_get" and propagates the inner Store's result.
func (*Instrumented[V]) MergeUpdate ¶
MergeUpdate records latency under "<label>:store_merge_update".
Note: the streaming runtime ALSO records "store_merge" latency from its own clock, capturing the runtime's view of the operation (including any wrapping the runtime applies). The two op names are distinct so they aggregate to separate histograms — each tells a different story (store-side cost vs runtime-side wrapped cost).
type InstrumentedCache ¶
type InstrumentedCache[V any] struct { // contains filtered or unexported fields }
InstrumentedCache wraps a Cache[V] with recorder hooks. Same op name scheme as Instrumented[V] plus "<label>:cache_repopulate".
func (*InstrumentedCache[V]) Close ¶
func (c *InstrumentedCache[V]) Close() error
Close passes through.
func (*InstrumentedCache[V]) MergeUpdate ¶
func (c *InstrumentedCache[V]) MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error
MergeUpdate records latency under "<label>:cache_merge_update".
func (*InstrumentedCache[V]) Repopulate ¶
Repopulate records latency under "<label>:cache_repopulate".
type Key ¶
Key identifies a stored aggregation value. Entity is the user-supplied aggregation key (e.g. a page ID, customer ID). Bucket, when nonzero, is the time-bucket ID for windowed aggregations; bucket 0 means "no window / all-time."
type OnMergeFunc ¶
OnMergeFunc is the post-merge hook signature. It is invoked after a successful MergeUpdate on the wrapped Store with the same key and delta that were applied. Hook implementations should be fast and non-blocking — they run on the streaming runtime's hot path.
Typical use cases:
- Emit a downstream "merged" event to Kafka/SQS so consumers can react without setting up a DDB Streams → Lambda chain.
- Update a secondary index or in-memory cache that mirrors merge activity.
- Drive change-data-capture style projections.
type OnMergeOption ¶
type OnMergeOption func(*onMergeConfig)
OnMergeOption configures a WithOnMerge wrapper.
func WithHookErrorPropagation ¶
func WithHookErrorPropagation(propagate bool) OnMergeOption
WithHookErrorPropagation controls whether a hook error is returned from MergeUpdate or merely logged. Default: false (log only). When true, a non-nil error from the hook is returned to the caller AFTER the inner MergeUpdate has already succeeded — the merge itself is NOT rolled back.
func WithHookLogger ¶
func WithHookLogger(logger *slog.Logger) OnMergeOption
WithHookLogger overrides the slog.Logger used for hook-error log lines. Defaults to slog.Default().
type Store ¶
type Store[V any] interface { // Get reads the current value at k. Returns zero value of V and ok=false if missing. Get(ctx context.Context, k Key) (val V, ok bool, err error) // GetMany batches reads. Returns one entry per requested key; missing keys are // represented by the zero value of V at that index with ok=false. GetMany(ctx context.Context, ks []Key) (vals []V, ok []bool, err error) // MergeUpdate combines delta into the existing value at k via the monoid associated // with the pipeline. ttl, if nonzero, sets/extends the TTL on the underlying record // (used for windowed aggregations to expire old buckets). MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error // Close releases any underlying resources (connection pools, batchers). Close() error }
Store is the StateStore contract. Implementations must be safe for concurrent use.
MergeUpdate atomically applies the monoid Combine of the current value with delta and writes the result. Implementations may use native primitives (DynamoDB UpdateItem ADD, Valkey PFADD) when the monoid Kind permits, falling back to read-modify-write under a conditional write for non-native cases.
func NewInstrumented ¶
NewInstrumented wraps store with recorder-driven latency reporting under the given label. Defaults: when recorder is nil, returns the inner store unwrapped (zero overhead path).
func WithOnMerge ¶
func WithOnMerge[V any](inner Store[V], fn OnMergeFunc[V], opts ...OnMergeOption) Store[V]
WithOnMerge wraps inner with a post-write hook. fn is invoked after every successful MergeUpdate with the same key and delta that were applied. The hook is NOT invoked when MergeUpdate returns a non-nil error.
By default, hook errors are logged via slog and swallowed — the wrapper's MergeUpdate returns nil to its caller as long as the inner merge succeeded. Pass WithHookErrorPropagation(true) to surface hook errors to the caller instead. Either way, the inner merge is NOT rolled back: by the time the hook runs, the merge has already been durably written.
Nil fn is safe: WithOnMerge returns inner unwrapped (zero overhead).
All other Store methods (Get, GetMany, Close) pass through to inner unchanged.
Typical use case: count-core wants to emit a BotInteractionCountIntervalBackendEvent after each merge without setting up a DDB Streams → Lambda chain. The hook is fire-and-forget from the merge's perspective; durability of the merge itself is the inner Store's responsibility (DDB UpdateItem is atomic).
Directories
¶
| Path | Synopsis |
|---|---|
|
Package dynamodb provides a DynamoDB-backed implementation of state.Store, the source-of-truth state for Murmur pipelines.
|
Package dynamodb provides a DynamoDB-backed implementation of state.Store, the source-of-truth state for Murmur pipelines. |
|
Package valkey provides Valkey-backed cache implementations of state.Cache.
|
Package valkey provides Valkey-backed cache implementations of state.Cache. |