Documentation
¶
Overview ¶
Package valkey provides Valkey-backed cache implementations of state.Cache.
Murmur's invariant: DynamoDB is the source of truth, Valkey is a read-cache and sketch-accelerator. If Valkey is lost, every pipeline can rebuild its accelerator state from DDB via Repopulate. The cache must never be trusted as ground truth.
Phase 1 ships Int64Cache — write-through cache for KindSum / KindCount pipelines using atomic INCRBY. Bytes/sketch caches with native Valkey HLL/Bloom acceleration are a Phase 2 task; the encoding boundary between axiomhq HLL and Valkey-native PFADD/PFCOUNT requires conversion logic flagged in the architecture doc.
Index ¶
- type BytesCache
- func (c *BytesCache) Close() error
- func (c *BytesCache) Get(ctx context.Context, k state.Key) ([]byte, bool, error)
- func (c *BytesCache) GetMany(ctx context.Context, ks []state.Key) ([][]byte, []bool, error)
- func (c *BytesCache) MergeUpdate(ctx context.Context, k state.Key, delta []byte, ttl time.Duration) error
- func (c *BytesCache) Repopulate(ctx context.Context, src state.Store[[]byte], keys []state.Key) error
- type BytesConfig
- type Config
- type HLLCache
- func (c *HLLCache) Add(ctx context.Context, k state.Key, element []byte, ttl time.Duration) error
- func (c *HLLCache) AddMany(ctx context.Context, k state.Key, elements [][]byte, ttl time.Duration) error
- func (c *HLLCache) Cardinality(ctx context.Context, k state.Key) (uint64, bool, error)
- func (c *HLLCache) CardinalityMany(ctx context.Context, ks []state.Key) ([]uint64, []bool, error)
- func (c *HLLCache) CardinalityOver(ctx context.Context, ks []state.Key) (uint64, error)
- func (c *HLLCache) Close() error
- func (c *HLLCache) MergeInto(ctx context.Context, dst state.Key, srcs []state.Key, ttl time.Duration) error
- type HLLConfig
- type Int64Cache
- func (c *Int64Cache) Close() error
- func (c *Int64Cache) Get(ctx context.Context, k state.Key) (int64, bool, error)
- func (c *Int64Cache) GetMany(ctx context.Context, ks []state.Key) ([]int64, []bool, error)
- func (c *Int64Cache) MergeUpdate(ctx context.Context, k state.Key, delta int64, ttl time.Duration) error
- func (c *Int64Cache) Repopulate(ctx context.Context, src state.Store[int64], keys []state.Key) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BytesCache ¶
type BytesCache struct {
// contains filtered or unexported fields
}
BytesCache is a state.Cache[[]byte] backed by Valkey, suitable for any pipeline whose state is a byte-encoded monoid value (HLL, TopK, Bloom, DecayedSumBytes, custom).
Operationally:
- Get / GetMany — Valkey GET / MGET; single-digit ms latency. This is the read-path acceleration BytesStore lacks on its own (DDB cold read is 5–15 ms).
- MergeUpdate — read-modify-write through the supplied monoid: GET → Combine(current, delta) → SET. The full RMW happens server- side via a Lua script so concurrent writers don't race; under contention the script's GET-COMBINE-SET runs serially per key. Throughput is bounded by Valkey single-shard write rate (~100k op/s), not by Murmur logic.
- Repopulate — rebuild from the authoritative Store. Used after a Valkey node restart or cold start; reads are bulk-fetched via Store.GetMany and bulk-written via Valkey MSET.
Why not Valkey-native HLL/Bloom acceleration? Valkey's PFADD/PFCOUNT/ PFMERGE use Redis's encoding, which is incompatible with axiomhq's HLL bytes (different bit layout, different encoding). The same problem applies to Bloom and to TopK (Valkey has no native TopK). Bridging requires either:
(a) accept a portable encoding and convert at the BytesStore <→
Valkey boundary, OR
(b) standardize on Valkey-native everywhere and accept the format
lock-in.
BytesCache takes neither path: it stores the SAME bytes the BytesStore stores, using Valkey purely as a key-value cache. The Combine runs in Go via the supplied monoid. This gives latency without forcing a runtime-incompatible encoding choice; native Valkey acceleration is a future Phase 3 on top of the same interface.
func NewBytesCache ¶
func NewBytesCache(cfg BytesConfig) (*BytesCache, error)
NewBytesCache constructs a BytesCache. The returned Cache owns the underlying client; call Close to disconnect.
func (*BytesCache) Close ¶
func (c *BytesCache) Close() error
Close releases the underlying Valkey client.
func (*BytesCache) MergeUpdate ¶
func (c *BytesCache) MergeUpdate(ctx context.Context, k state.Key, delta []byte, ttl time.Duration) error
MergeUpdate fetches the current cached value, runs the monoid Combine in Go, and writes the merged result back via SET.
This is intentionally NOT a CAS path. The cache is "lossy by design" — the BytesStore is the source of truth. Two concurrent writers on the same key may interleave such that one's contribution drops out of the cache, but BOTH contributions still land on the BytesStore. The only cost of a lost cache write is one slower-than-ideal subsequent read until the next cache write or Repopulate covers it.
Lua-scripted server-side merge isn't viable: the Combine is user Go code, not a Valkey-supported operation. A future Phase 3 might standardize on Valkey-native sketch types (PFADD/PFCOUNT for HLL, TopK module commands) which DO permit server-side combining, at the cost of an encoding-conversion boundary against axiomhq/hyperloglog and the hand-rolled Misra-Gries / Bloom encodings.
type BytesConfig ¶
type BytesConfig struct {
// Address is the Valkey server endpoint. Defaults to "localhost:6379".
Address string
// KeyPrefix is the namespace prefix for cache keys. Required.
KeyPrefix string
// Monoid is the byte-monoid used for the read-modify-write Combine.
// Must match the monoid used by the underlying BytesStore — otherwise
// MergeUpdate will produce values that disagree across the cache /
// store boundary.
Monoid monoid.Monoid[[]byte]
// Extra lets callers append additional valkey-go options (TLS, auth, etc).
Extra []valkey.ClientOption
}
BytesConfig configures a BytesCache.
type Config ¶
type Config struct {
// Address is the Valkey server endpoint (e.g. "localhost:6379"). For ElastiCache
// Serverless or cluster mode use the cluster endpoint.
Address string
// KeyPrefix is the namespace prefix for cache keys (e.g. "page_views"). Allows
// multiple pipelines to share a Valkey instance without colliding.
KeyPrefix string
// Extra lets callers append additional valkey-go options.
Extra []valkey.ClientOption
}
Config configures an Int64Cache.
type HLLCache ¶
type HLLCache struct {
// contains filtered or unexported fields
}
HLLCache is a Valkey-native HyperLogLog accelerator, exposing the PFADD / PFCOUNT / PFMERGE primitives as a per-entity API.
Why this exists ¶
Murmur's BytesStore-authoritative HLL pipeline marshals axiomhq/hyperloglog sketches as opaque bytes into DDB. Reading cardinality requires:
- DDB GetItem (5–15ms cold)
- axiomhq UnmarshalBinary
- Estimate()
For dashboard-class queries this is fine. For hot-path queries (e.g. "show the unique-visitor count for this page on today's dashboard, refreshed every 2s"), it is not. Valkey-native PFCOUNT is single-digit ms and runs entirely server-side.
The side-by-side pattern ¶
Pipelines using the accelerator emit each event element to BOTH the BytesStore (as an axiomhq Single() sketch merged via MergeUpdate) AND to this HLLCache via Add. The two HLLs are independent estimators of the same set: each sits within HLL's ~1.6% standard error against true cardinality, with INDEPENDENT error realizations. This is acceptable for approximate-cardinality queries; if your application requires the two paths to agree bit-for-bit, this accelerator is not the right tool — use BytesCache instead, which stores the same axiomhq bytes the BytesStore stores and does the merge in Go.
Recovery ¶
On Valkey loss, the accelerator can only be repopulated by re-feeding events. There is no axiomhq → Valkey HYLL byte conversion path (axiomhq doesn't expose its register array, and Valkey's HLL encoding is incompatible with axiomhq's). If your pipeline cannot replay events, treat this as best-effort cache and route hot reads to the accelerator opportunistically with a fall-back to BytesStore on Valkey miss / unavailability.
Key layout ¶
Cache keys are "<keyPrefix>:<entity>:<bucket>" — same scheme as Int64Cache and BytesCache. This means a Windowed[HLL] pipeline's per-bucket accelerator keys are stable across restarts.
func NewHLLCache ¶
NewHLLCache constructs an HLLCache. The returned cache owns the underlying Valkey client; call Close to disconnect.
func (*HLLCache) Add ¶
Add records that element was observed for entity k via PFADD. Concurrent Adds on the same key are safe — Valkey serializes them internally. If ttl > 0, the entry's expiration is set via EXPIRE (best-effort; failures don't propagate, since the next Add will re-set it).
Mirrors the pipeline-runtime contract: the streaming worker calls Add per event after a successful BytesStore.MergeUpdate, in the same hot path. A failed Add is non-fatal (the BytesStore is the source of truth) but should be logged and counted via metrics.
func (*HLLCache) AddMany ¶
func (c *HLLCache) AddMany(ctx context.Context, k state.Key, elements [][]byte, ttl time.Duration) error
AddMany records multiple elements for the same entity in one round trip. PFADD is variadic in the wire protocol, so this is one command — useful for back-pressured batched workers and for bootstrap drivers that fold many events per key.
func (*HLLCache) Cardinality ¶
Cardinality returns PFCOUNT for entity k. Missing keys return (0, false, nil) — Valkey's PFCOUNT on a missing key returns 0 without error, but we surface the present/absent distinction explicitly via EXISTS.
func (*HLLCache) CardinalityMany ¶
CardinalityMany pipelines per-key PFCOUNT calls. Use CardinalityOver if you want the union cardinality across keys rather than per-key estimates.
func (*HLLCache) CardinalityOver ¶
CardinalityOver returns PFCOUNT k1 k2 ... — Valkey's union cardinality across multiple keys, computed server-side without materializing an intermediate sketch. The canonical use case: "how many distinct visitors across this entity's last N daily buckets" against a Windowed[HLL] pipeline.
Returns 0 (without error) if all keys are missing — Valkey's PFCOUNT semantics treat missing keys as empty sketches.
func (*HLLCache) MergeInto ¶
func (c *HLLCache) MergeInto(ctx context.Context, dst state.Key, srcs []state.Key, ttl time.Duration) error
MergeInto computes PFMERGE dst <- srcs..., storing the merged HLL at dst. If ttl > 0, an EXPIRE is set on dst.
Use case: pre-compute a "last-7-days" rollup HLL once per day from 7 daily buckets, then serve dashboard reads against the rollup with one PFCOUNT instead of 7. The trade-off is staleness — the rollup is N hours behind real-time depending on rebuild cadence.
Note: PFMERGE writes a new HLL at dst, replacing whatever was there. If dst is one of srcs, Valkey handles it correctly (the destination is read first).
type HLLConfig ¶
type HLLConfig struct {
// Address is the Valkey server endpoint. Defaults to "localhost:6379".
Address string
// KeyPrefix namespaces this accelerator's keys (e.g. "page_unique_visitors").
// Required.
KeyPrefix string
// Extra lets callers append additional valkey-go options (TLS, auth, etc).
Extra []valkey.ClientOption
}
HLLConfig configures an HLLCache.
type Int64Cache ¶
type Int64Cache struct {
// contains filtered or unexported fields
}
Int64Cache is a state.Cache[int64] backed by Valkey. MergeUpdate uses INCRBY (atomic at the Valkey side); Get / GetMany use GET / MGET. The cache key encodes the (entity, bucket) pair as "<keyPrefix>:<entity>:<bucket>".
func NewInt64Cache ¶
func NewInt64Cache(cfg Config) (*Int64Cache, error)
NewInt64Cache constructs an Int64Cache. The returned Cache owns the underlying client; call Close to disconnect.
func (*Int64Cache) Close ¶
func (c *Int64Cache) Close() error
Close releases the underlying Valkey client.