state

package
v0.0.0-...-84a1714 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Overview

Package state defines the StateStore abstraction Murmur pipelines write aggregations to. Concrete implementations live in subpackages (state/dynamodb, state/valkey).

Murmur's invariant: DynamoDB is always the source of truth. Valkey, when configured, is a read-cache and sketch-accelerator — never trusted as ground truth. If Valkey is lost, every pipeline can rebuild its accelerator state from DynamoDB.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cache

type Cache[V any] interface {
	Store[V]
	// Repopulate rebuilds the cache from the authoritative Store, e.g. after a Valkey
	// node restart. Implementations may stream from Store and insert in batches.
	Repopulate(ctx context.Context, src Store[V], keys []Key) error
}

Cache is the read-side accelerator. It mirrors a subset of Store data and may serve reads with lower latency. Pipelines configured with a cache write through to both Store and Cache; on Cache miss, they fall back to Store.

A Cache is *never* a source of truth. Implementations should treat their data as repopulatable from the underlying Store at any time.

func NewInstrumentedCache

func NewInstrumentedCache[V any](inner Cache[V], recorder metrics.Recorder, label string) Cache[V]

NewInstrumentedCache wraps cache with recorder-driven latency reporting. Defaults: nil recorder → inner cache unwrapped.

type Deduper

type Deduper interface {
	// MarkSeen atomically attempts to claim eventID. Returns firstSeen=true
	// if the caller is the first to mark this ID; firstSeen=false if it was
	// already claimed. Returns an error only on transient backend failures
	// — duplicates are NOT errors.
	MarkSeen(ctx context.Context, eventID string) (firstSeen bool, err error)

	// Close releases any underlying resources.
	Close() error
}

Deduper is the at-least-once dedup contract. The streaming runtime calls MarkSeen with each Source.Record's EventID before applying the monoid Combine; on a duplicate (worker crashed mid-write, source replays the record) MarkSeen returns firstSeen=false and the runtime skips the merge.

Implementations must be:

  • Atomic. Two concurrent calls with the same EventID must produce exactly one firstSeen=true and one firstSeen=false — never two of either.
  • Bounded. EventIDs older than a configured retention should fall out so the dedup table doesn't grow forever. Typical TTL is hours to days depending on how long messages can sit unacked in the source's retention window.

type Instrumented

type Instrumented[V any] struct {
	// contains filtered or unexported fields
}

Instrumented wraps a Store[V] (or Cache[V]) with metrics.Recorder hooks for every operation. The wrapped store sees identical traffic; the recorder gets per-op latency and error counts.

Operation names follow the streaming runtime's convention:

  • "<label>:store_get" (Get)
  • "<label>:store_get_many" (GetMany)
  • "<label>:store_merge_update" (MergeUpdate)

where `label` is the supplied namespace (typically the pipeline name). Wrap the SAME label as the streaming/query layer uses so latency histograms aggregate cleanly.

Usage:

rawStore := mddb.NewInt64SumStore(client, "page_views")
store := state.Instrumented[int64](rawStore, recorder, "page_views")
pipeline.NewPipeline[Event, int64](...).StoreIn(store)

Cost: one time.Now() at op entry + one RecordLatency at op exit. Sub-microsecond when the recorder is a Noop; ~100ns + lock contention for the InMemory recorder. Negligible against any DDB / Valkey operation cost.

For Cache[V], see InstrumentedCache below — same pattern, adds the Repopulate latency op.

func (*Instrumented[V]) Close

func (s *Instrumented[V]) Close() error

Close passes through.

func (*Instrumented[V]) Get

func (s *Instrumented[V]) Get(ctx context.Context, k Key) (V, bool, error)

Get records latency under "<label>:store_get" and propagates the inner Store's result.

func (*Instrumented[V]) GetMany

func (s *Instrumented[V]) GetMany(ctx context.Context, ks []Key) ([]V, []bool, error)

GetMany records latency under "<label>:store_get_many".

func (*Instrumented[V]) MergeUpdate

func (s *Instrumented[V]) MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error

MergeUpdate records latency under "<label>:store_merge_update".

Note: the streaming runtime ALSO records "store_merge" latency from its own clock, capturing the runtime's view of the operation (including any wrapping the runtime applies). The two op names are distinct so they aggregate to separate histograms — each tells a different story (store-side cost vs runtime-side wrapped cost).

type InstrumentedCache

type InstrumentedCache[V any] struct {
	// contains filtered or unexported fields
}

InstrumentedCache wraps a Cache[V] with recorder hooks. Same op name scheme as Instrumented[V] plus "<label>:cache_repopulate".

func (*InstrumentedCache[V]) Close

func (c *InstrumentedCache[V]) Close() error

Close passes through.

func (*InstrumentedCache[V]) Get

func (c *InstrumentedCache[V]) Get(ctx context.Context, k Key) (V, bool, error)

Get records latency under "<label>:cache_get".

func (*InstrumentedCache[V]) GetMany

func (c *InstrumentedCache[V]) GetMany(ctx context.Context, ks []Key) ([]V, []bool, error)

GetMany records latency under "<label>:cache_get_many".

func (*InstrumentedCache[V]) MergeUpdate

func (c *InstrumentedCache[V]) MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error

MergeUpdate records latency under "<label>:cache_merge_update".

func (*InstrumentedCache[V]) Repopulate

func (c *InstrumentedCache[V]) Repopulate(ctx context.Context, src Store[V], keys []Key) error

Repopulate records latency under "<label>:cache_repopulate".

type Key

type Key struct {
	Entity string
	Bucket int64
}

Key identifies a stored aggregation value. Entity is the user-supplied aggregation key (e.g. a page ID, customer ID). Bucket, when nonzero, is the time-bucket ID for windowed aggregations; bucket 0 means "no window / all-time."

type Store

type Store[V any] interface {
	// Get reads the current value at k. Returns zero value of V and ok=false if missing.
	Get(ctx context.Context, k Key) (val V, ok bool, err error)

	// GetMany batches reads. Returns one entry per requested key; missing keys are
	// represented by the zero value of V at that index with ok=false.
	GetMany(ctx context.Context, ks []Key) (vals []V, ok []bool, err error)

	// MergeUpdate combines delta into the existing value at k via the monoid associated
	// with the pipeline. ttl, if nonzero, sets/extends the TTL on the underlying record
	// (used for windowed aggregations to expire old buckets).
	MergeUpdate(ctx context.Context, k Key, delta V, ttl time.Duration) error

	// Close releases any underlying resources (connection pools, batchers).
	Close() error
}

Store is the StateStore contract. Implementations must be safe for concurrent use.

MergeUpdate atomically applies the monoid Combine of the current value with delta and writes the result. Implementations may use native primitives (DynamoDB UpdateItem ADD, Valkey PFADD) when the monoid Kind permits, falling back to read-modify-write under a conditional write for non-native cases.

func NewInstrumented

func NewInstrumented[V any](inner Store[V], recorder metrics.Recorder, label string) Store[V]

NewInstrumented wraps store with recorder-driven latency reporting under the given label. Defaults: when recorder is nil, returns the inner store unwrapped (zero overhead path).

Directories

Path Synopsis
Package dynamodb provides a DynamoDB-backed implementation of state.Store, the source-of-truth state for Murmur pipelines.
Package dynamodb provides a DynamoDB-backed implementation of state.Store, the source-of-truth state for Murmur pipelines.
Package valkey provides Valkey-backed cache implementations of state.Cache.
Package valkey provides Valkey-backed cache implementations of state.Cache.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL