state

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package state defines the StateStore abstraction Murmur pipelines write aggregations to. Concrete implementations live in subpackages (state/dynamodb, state/valkey).

Murmur's invariant: DynamoDB is always the source of truth. Valkey, when configured, is a read-cache and sketch-accelerator — never trusted as ground truth. If Valkey is lost, every pipeline can rebuild its accelerator state from DynamoDB.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cache

type Cache[V any] interface {
	Store[V]
	// Repopulate rebuilds the cache from the authoritative Store, e.g. after a Valkey
	// node restart. Implementations may stream from Store and insert in batches.
	Repopulate(ctx context.Context, src Store[V], keys []Key) error
}

Cache is the read-side accelerator. It mirrors a subset of Store data and may serve reads with lower latency. Pipelines configured with a cache write through to both Store and Cache; on Cache miss, they fall back to Store.

A Cache is *never* a source of truth. Implementations should treat their data as repopulatable from the underlying Store at any time.

func NewInstrumentedCache

func NewInstrumentedCache[V any](inner Cache[V], recorder metrics.Recorder, label string) Cache[V]

NewInstrumentedCache wraps cache with recorder-driven latency reporting. Defaults: nil recorder → inner cache unwrapped.

type Deduper

type Deduper interface {
	// MarkSeen atomically attempts to claim eventID. Returns firstSeen=true
	// if the caller is the first to mark this ID; firstSeen=false if it was
	// already claimed. Returns an error only on transient backend failures
	// — duplicates are NOT errors.
	MarkSeen(ctx context.Context, eventID string) (firstSeen bool, err error)

	// Close releases any underlying resources.
	Close() error
}

Deduper is the at-least-once dedup contract. The streaming runtime calls MarkSeen with each Source.Record's EventID before applying the monoid Combine; on a duplicate (worker crashed mid-write, source replays the record) MarkSeen returns firstSeen=false and the runtime skips the merge.

Implementations must be:

  • Atomic. Two concurrent calls with the same EventID must produce exactly one firstSeen=true and one firstSeen=false — never two of either.
  • Bounded. EventIDs older than a configured retention should fall out so the dedup table doesn't grow forever. Typical TTL is hours to days depending on how long messages can sit unacked in the source's retention window.

type Instrumented

type Instrumented[V any] struct {
	// contains filtered or unexported fields
}

Instrumented wraps a Store[V] (or Cache[V]) with metrics.Recorder hooks for every operation. The wrapped store sees identical traffic; the recorder gets per-op latency and error counts.

Operation names follow the streaming runtime's convention:

  • "<label>:store_get" (Get)
  • "<label>:store_get_many" (GetMany)
  • "<label>:store_merge_update" (MergeUpdate)

where `label` is the supplied namespace (typically the pipeline name). Wrap the SAME label as the streaming/query layer uses so latency histograms aggregate cleanly.

Usage:

rawStore := mddb.NewInt64SumStore(client, "page_views")
store := state.Instrumented[int64](rawStore, recorder, "page_views")
pipeline.NewPipeline[Event, int64](...).StoreIn(store)

Cost: one time.Now() at op entry + one RecordLatency at op exit. Sub-microsecond when the recorder is a Noop; ~100ns + lock contention for the InMemory recorder. Negligible against any DDB / Valkey operation cost.

For Cache[V], see InstrumentedCache below — same pattern, adds the Repopulate latency op.

func (*Instrumented[V]) Close

func (s *Instrumented[V]) Close() error

Close passes through.

func (*Instrumented[V]) Get

func (s *Instrumented[V]) Get(ctx context.Context, k Key) (V, bool, error)

Get records latency under "<label>:store_get" and propagates the inner Store's result.

func (*Instrumented[V]) GetMany

func (s *Instrumented[V]) GetMany(ctx context.Context, ks []Key) ([]V, []bool, error)

GetMany records latency under "<label>:store_get_many".

func (*Instrumented[V]) MergeUpdate

func (s *Instrumented[V]) MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error

MergeUpdate records latency under "<label>:store_merge_update".

Note: the streaming runtime ALSO records "store_merge" latency from its own clock, capturing the runtime's view of the operation (including any wrapping the runtime applies). The two op names are distinct so they aggregate to separate histograms — each tells a different story (store-side cost vs runtime-side wrapped cost).

type InstrumentedCache

type InstrumentedCache[V any] struct {
	// contains filtered or unexported fields
}

InstrumentedCache wraps a Cache[V] with recorder hooks. Same op name scheme as Instrumented[V] plus "<label>:cache_repopulate".

func (*InstrumentedCache[V]) Close

func (c *InstrumentedCache[V]) Close() error

Close passes through.

func (*InstrumentedCache[V]) Get

func (c *InstrumentedCache[V]) Get(ctx context.Context, k Key) (V, bool, error)

Get records latency under "<label>:cache_get".

func (*InstrumentedCache[V]) GetMany

func (c *InstrumentedCache[V]) GetMany(ctx context.Context, ks []Key) ([]V, []bool, error)

GetMany records latency under "<label>:cache_get_many".

func (*InstrumentedCache[V]) MergeUpdate

func (c *InstrumentedCache[V]) MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error

MergeUpdate records latency under "<label>:cache_merge_update".

func (*InstrumentedCache[V]) Repopulate

func (c *InstrumentedCache[V]) Repopulate(ctx context.Context, src Store[V], keys []Key) error

Repopulate records latency under "<label>:cache_repopulate".

type Key

type Key struct {
	Entity string
	Bucket int64
}

Key identifies a stored aggregation value. Entity is the user-supplied aggregation key (e.g. a page ID, customer ID). Bucket, when nonzero, is the time-bucket ID for windowed aggregations; bucket 0 means "no window / all-time."

type OnMergeFunc

type OnMergeFunc[V any] func(ctx context.Context, key Key, delta V) error

OnMergeFunc is the post-merge hook signature. It is invoked after a successful MergeUpdate on the wrapped Store with the same key and delta that were applied. Hook implementations should be fast and non-blocking — they run on the streaming runtime's hot path.

Typical use cases:

  • Emit a downstream "merged" event to Kafka/SQS so consumers can react without setting up a DDB Streams → Lambda chain.
  • Update a secondary index or in-memory cache that mirrors merge activity.
  • Drive change-data-capture style projections.

type OnMergeOption

type OnMergeOption func(*onMergeConfig)

OnMergeOption configures a WithOnMerge wrapper.

func WithHookErrorPropagation

func WithHookErrorPropagation(propagate bool) OnMergeOption

WithHookErrorPropagation controls whether a hook error is returned from MergeUpdate or merely logged. Default: false (log only). When true, a non-nil error from the hook is returned to the caller AFTER the inner MergeUpdate has already succeeded — the merge itself is NOT rolled back.

func WithHookLogger

func WithHookLogger(logger *slog.Logger) OnMergeOption

WithHookLogger overrides the slog.Logger used for hook-error log lines. Defaults to slog.Default().

type Store

type Store[V any] interface {
	// Get reads the current value at k. Returns zero value of V and ok=false if missing.
	Get(ctx context.Context, k Key) (val V, ok bool, err error)

	// GetMany batches reads. Returns one entry per requested key; missing keys are
	// represented by the zero value of V at that index with ok=false.
	GetMany(ctx context.Context, ks []Key) (vals []V, ok []bool, err error)

	// MergeUpdate combines delta into the existing value at k via the monoid associated
	// with the pipeline. ttl, if nonzero, sets/extends the TTL on the underlying record
	// (used for windowed aggregations to expire old buckets).
	MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error

	// Close releases any underlying resources (connection pools, batchers).
	Close() error
}

Store is the StateStore contract. Implementations must be safe for concurrent use.

MergeUpdate atomically applies the monoid Combine of the current value with delta and writes the result. Implementations may use native primitives (DynamoDB UpdateItem ADD, Valkey PFADD) when the monoid Kind permits, falling back to read-modify-write under a conditional write for non-native cases.

func NewInstrumented

func NewInstrumented[V any](inner Store[V], recorder metrics.Recorder, label string) Store[V]

NewInstrumented wraps store with recorder-driven latency reporting under the given label. Defaults: when recorder is nil, returns the inner store unwrapped (zero overhead path).

func WithOnMerge

func WithOnMerge[V any](inner Store[V], fn OnMergeFunc[V], opts ...OnMergeOption) Store[V]

WithOnMerge wraps inner with a post-write hook. fn is invoked after every successful MergeUpdate with the same key and delta that were applied. The hook is NOT invoked when MergeUpdate returns a non-nil error.

By default, hook errors are logged via slog and swallowed — the wrapper's MergeUpdate returns nil to its caller as long as the inner merge succeeded. Pass WithHookErrorPropagation(true) to surface hook errors to the caller instead. Either way, the inner merge is NOT rolled back: by the time the hook runs, the merge has already been durably written.

Nil fn is safe: WithOnMerge returns inner unwrapped (zero overhead).

All other Store methods (Get, GetMany, Close) pass through to inner unchanged.

Typical use case: count-core wants to emit a BotInteractionCountIntervalBackendEvent after each merge without setting up a DDB Streams → Lambda chain. The hook is fire-and-forget from the merge's perspective; durability of the merge itself is the inner Store's responsibility (DDB UpdateItem is atomic).

Directories

Path Synopsis
Package dynamodb provides a DynamoDB-backed implementation of state.Store, the source-of-truth state for Murmur pipelines.
Package dynamodb provides a DynamoDB-backed implementation of state.Store, the source-of-truth state for Murmur pipelines.
Package valkey provides Valkey-backed cache implementations of state.Cache.
Package valkey provides Valkey-backed cache implementations of state.Cache.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL