valkey

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package valkey provides Valkey-backed cache implementations of state.Cache.

Murmur's invariant: DynamoDB is the source of truth, Valkey is a read-cache and sketch-accelerator. If Valkey is lost, every pipeline can rebuild its accelerator state from DDB via Repopulate. The cache must never be trusted as ground truth.

Phase 1 ships Int64Cache — write-through cache for KindSum / KindCount pipelines using atomic INCRBY. Bytes/sketch caches with native Valkey HLL/Bloom acceleration are a Phase 2 task; the encoding boundary between axiomhq HLL and Valkey-native PFADD/PFCOUNT requires conversion logic flagged in the architecture doc.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BytesCache

type BytesCache struct {
	// contains filtered or unexported fields
}

BytesCache is a state.Cache[[]byte] backed by Valkey, suitable for any pipeline whose state is a byte-encoded monoid value (HLL, TopK, Bloom, DecayedSumBytes, custom).

Operationally:

  • Get / GetMany — Valkey GET / MGET; single-digit ms latency. This is the read-path acceleration BytesStore lacks on its own (DDB cold read is 5–15 ms).
  • MergeUpdate — read-modify-write through the supplied monoid: GET → Combine(current, delta) → SET. The full RMW happens server- side via a Lua script so concurrent writers don't race; under contention the script's GET-COMBINE-SET runs serially per key. Throughput is bounded by Valkey single-shard write rate (~100k op/s), not by Murmur logic.
  • Repopulate — rebuild from the authoritative Store. Used after a Valkey node restart or cold start; reads are bulk-fetched via Store.GetMany and bulk-written via Valkey MSET.

Why not Valkey-native HLL/Bloom acceleration? Valkey's PFADD/PFCOUNT/ PFMERGE use Redis's encoding, which is incompatible with axiomhq's HLL bytes (different bit layout, different encoding). The same problem applies to Bloom and to TopK (Valkey has no native TopK). Bridging requires either:

(a) accept a portable encoding and convert at the BytesStore <→
    Valkey boundary, OR
(b) standardize on Valkey-native everywhere and accept the format
    lock-in.

BytesCache takes neither path: it stores the SAME bytes the BytesStore stores, using Valkey purely as a key-value cache. The Combine runs in Go via the supplied monoid. This gives latency without forcing a runtime-incompatible encoding choice; native Valkey acceleration is a future Phase 3 on top of the same interface.

func NewBytesCache

func NewBytesCache(cfg BytesConfig) (*BytesCache, error)

NewBytesCache constructs a BytesCache. The returned Cache owns the underlying client; call Close to disconnect.

func (*BytesCache) Close

func (c *BytesCache) Close() error

Close releases the underlying Valkey client.

func (*BytesCache) Get

func (c *BytesCache) Get(ctx context.Context, k state.Key) ([]byte, bool, error)

Get returns the cached bytes at k. Missing keys return nil, false, nil.

func (*BytesCache) GetMany

func (c *BytesCache) GetMany(ctx context.Context, ks []state.Key) ([][]byte, []bool, error)

GetMany batches reads via MGET.

func (*BytesCache) MergeUpdate

func (c *BytesCache) MergeUpdate(ctx context.Context, k state.Key, delta []byte, ttl time.Duration) error

MergeUpdate fetches the current cached value, runs the monoid Combine in Go, and writes the merged result back via SET.

This is intentionally NOT a CAS path. The cache is "lossy by design" — the BytesStore is the source of truth. Two concurrent writers on the same key may interleave such that one's contribution drops out of the cache, but BOTH contributions still land on the BytesStore. The only cost of a lost cache write is one slower-than-ideal subsequent read until the next cache write or Repopulate covers it.

Lua-scripted server-side merge isn't viable: the Combine is user Go code, not a Valkey-supported operation. A future Phase 3 might standardize on Valkey-native sketch types (PFADD/PFCOUNT for HLL, TopK module commands) which DO permit server-side combining, at the cost of an encoding-conversion boundary against axiomhq/hyperloglog and the hand-rolled Misra-Gries / Bloom encodings.

func (*BytesCache) Repopulate

func (c *BytesCache) Repopulate(ctx context.Context, src state.Store[[]byte], keys []state.Key) error

Repopulate rebuilds cache entries from the authoritative store. Used after a Valkey restart or cold start.

type BytesConfig

type BytesConfig struct {
	// Address is the Valkey server endpoint. Defaults to "localhost:6379".
	Address string

	// KeyPrefix is the namespace prefix for cache keys. Required.
	KeyPrefix string

	// Monoid is the byte-monoid used for the read-modify-write Combine.
	// Must match the monoid used by the underlying BytesStore — otherwise
	// MergeUpdate will produce values that disagree across the cache /
	// store boundary.
	Monoid monoid.Monoid[[]byte]

	// Extra lets callers append additional valkey-go options (TLS, auth, etc).
	Extra []valkey.ClientOption
}

BytesConfig configures a BytesCache.

type Config

type Config struct {
	// Address is the Valkey server endpoint (e.g. "localhost:6379"). For ElastiCache
	// Serverless or cluster mode use the cluster endpoint.
	Address string

	// KeyPrefix is the namespace prefix for cache keys (e.g. "page_views"). Allows
	// multiple pipelines to share a Valkey instance without colliding.
	KeyPrefix string

	// Extra lets callers append additional valkey-go options.
	Extra []valkey.ClientOption
}

Config configures an Int64Cache.

type HLLCache

type HLLCache struct {
	// contains filtered or unexported fields
}

HLLCache is a Valkey-native HyperLogLog accelerator, exposing the PFADD / PFCOUNT / PFMERGE primitives as a per-entity API.

Why this exists

Murmur's BytesStore-authoritative HLL pipeline marshals axiomhq/hyperloglog sketches as opaque bytes into DDB. Reading cardinality requires:

  1. DDB GetItem (5–15ms cold)
  2. axiomhq UnmarshalBinary
  3. Estimate()

For dashboard-class queries this is fine. For hot-path queries (e.g. "show the unique-visitor count for this page on today's dashboard, refreshed every 2s"), it is not. Valkey-native PFCOUNT is single-digit ms and runs entirely server-side.

The side-by-side pattern

Pipelines using the accelerator emit each event element to BOTH the BytesStore (as an axiomhq Single() sketch merged via MergeUpdate) AND to this HLLCache via Add. The two HLLs are independent estimators of the same set: each sits within HLL's ~1.6% standard error against true cardinality, with INDEPENDENT error realizations. This is acceptable for approximate-cardinality queries; if your application requires the two paths to agree bit-for-bit, this accelerator is not the right tool — use BytesCache instead, which stores the same axiomhq bytes the BytesStore stores and does the merge in Go.

Recovery

On Valkey loss, the accelerator can only be repopulated by re-feeding events. There is no axiomhq → Valkey HYLL byte conversion path (axiomhq doesn't expose its register array, and Valkey's HLL encoding is incompatible with axiomhq's). If your pipeline cannot replay events, treat this as best-effort cache and route hot reads to the accelerator opportunistically with a fall-back to BytesStore on Valkey miss / unavailability.

Key layout

Cache keys are "<keyPrefix>:<entity>:<bucket>" — same scheme as Int64Cache and BytesCache. This means a Windowed[HLL] pipeline's per-bucket accelerator keys are stable across restarts.

func NewHLLCache

func NewHLLCache(cfg HLLConfig) (*HLLCache, error)

NewHLLCache constructs an HLLCache. The returned cache owns the underlying Valkey client; call Close to disconnect.

func (*HLLCache) Add

func (c *HLLCache) Add(ctx context.Context, k state.Key, element []byte, ttl time.Duration) error

Add records that element was observed for entity k via PFADD. Concurrent Adds on the same key are safe — Valkey serializes them internally. If ttl > 0, the entry's expiration is set via EXPIRE (best-effort; failures don't propagate, since the next Add will re-set it).

Mirrors the pipeline-runtime contract: the streaming worker calls Add per event after a successful BytesStore.MergeUpdate, in the same hot path. A failed Add is non-fatal (the BytesStore is the source of truth) but should be logged and counted via metrics.

func (*HLLCache) AddMany

func (c *HLLCache) AddMany(ctx context.Context, k state.Key, elements [][]byte, ttl time.Duration) error

AddMany records multiple elements for the same entity in one round trip. PFADD is variadic in the wire protocol, so this is one command — useful for back-pressured batched workers and for bootstrap drivers that fold many events per key.

func (*HLLCache) Cardinality

func (c *HLLCache) Cardinality(ctx context.Context, k state.Key) (uint64, bool, error)

Cardinality returns PFCOUNT for entity k. Missing keys return (0, false, nil) — Valkey's PFCOUNT on a missing key returns 0 without error, but we surface the present/absent distinction explicitly via EXISTS.

func (*HLLCache) CardinalityMany

func (c *HLLCache) CardinalityMany(ctx context.Context, ks []state.Key) ([]uint64, []bool, error)

CardinalityMany pipelines per-key PFCOUNT calls. Use CardinalityOver if you want the union cardinality across keys rather than per-key estimates.

func (*HLLCache) CardinalityOver

func (c *HLLCache) CardinalityOver(ctx context.Context, ks []state.Key) (uint64, error)

CardinalityOver returns PFCOUNT k1 k2 ... — Valkey's union cardinality across multiple keys, computed server-side without materializing an intermediate sketch. The canonical use case: "how many distinct visitors across this entity's last N daily buckets" against a Windowed[HLL] pipeline.

Returns 0 (without error) if all keys are missing — Valkey's PFCOUNT semantics treat missing keys as empty sketches.

func (*HLLCache) Close

func (c *HLLCache) Close() error

Close releases the underlying Valkey client.

func (*HLLCache) MergeInto

func (c *HLLCache) MergeInto(ctx context.Context, dst state.Key, srcs []state.Key, ttl time.Duration) error

MergeInto computes PFMERGE dst <- srcs..., storing the merged HLL at dst. If ttl > 0, an EXPIRE is set on dst.

Use case: pre-compute a "last-7-days" rollup HLL once per day from 7 daily buckets, then serve dashboard reads against the rollup with one PFCOUNT instead of 7. The trade-off is staleness — the rollup is N hours behind real-time depending on rebuild cadence.

Note: PFMERGE writes a new HLL at dst, replacing whatever was there. If dst is one of srcs, Valkey handles it correctly (the destination is read first).

type HLLConfig

type HLLConfig struct {
	// Address is the Valkey server endpoint. Defaults to "localhost:6379".
	Address string

	// KeyPrefix namespaces this accelerator's keys (e.g. "page_unique_visitors").
	// Required.
	KeyPrefix string

	// Extra lets callers append additional valkey-go options (TLS, auth, etc).
	Extra []valkey.ClientOption
}

HLLConfig configures an HLLCache.

type Int64Cache

type Int64Cache struct {
	// contains filtered or unexported fields
}

Int64Cache is a state.Cache[int64] backed by Valkey. MergeUpdate uses INCRBY (atomic at the Valkey side); Get / GetMany use GET / MGET. The cache key encodes the (entity, bucket) pair as "<keyPrefix>:<entity>:<bucket>".

func NewInt64Cache

func NewInt64Cache(cfg Config) (*Int64Cache, error)

NewInt64Cache constructs an Int64Cache. The returned Cache owns the underlying client; call Close to disconnect.

func (*Int64Cache) Close

func (c *Int64Cache) Close() error

Close releases the underlying Valkey client.

func (*Int64Cache) Get

func (c *Int64Cache) Get(ctx context.Context, k state.Key) (int64, bool, error)

Get returns the cached value at k. Missing keys return 0, false, nil.

func (*Int64Cache) GetMany

func (c *Int64Cache) GetMany(ctx context.Context, ks []state.Key) ([]int64, []bool, error)

GetMany batches reads via MGET.

func (*Int64Cache) MergeUpdate

func (c *Int64Cache) MergeUpdate(ctx context.Context, k state.Key, delta int64, ttl time.Duration) error

MergeUpdate atomically adds delta via INCRBY. If ttl > 0, also sets EXPIRE so the cached row decays naturally for windowed buckets.

func (*Int64Cache) Repopulate

func (c *Int64Cache) Repopulate(ctx context.Context, src state.Store[int64], keys []state.Key) error

Repopulate rebuilds cache entries from the authoritative store. Used after Valkey node loss or cold-start to rehydrate hot keys before serving queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL