Documentation
¶
Overview ¶
Package query provides the read-side merge logic Murmur uses to assemble sliding-window query results from per-bucket state.
In Phase 1 these are in-process helpers callers invoke directly. In Phase 2 the auto-generated gRPC service in pkg/query/codegen will dispatch to these same functions, so the merge semantics live in one place.
Lambda-mode merge (batch_view ⊕ realtime_delta) is a Phase 2 addition — currently the helpers read directly from the primary state.Store, matching kappa-mode semantics.
Index ¶
- func Get[V any](ctx context.Context, store state.Store[V], entity string) (V, bool, error)
- func GetRange[V any](ctx context.Context, store state.Store[V], m monoid.Monoid[V], ...) (V, error)
- func GetRangeMany[V any](ctx context.Context, store state.Store[V], m monoid.Monoid[V], ...) ([]V, error)
- func GetTrailing[V any](ctx context.Context, store state.Store[V], m monoid.Monoid[V], ...) (V, error)
- func GetTrailingMany[V any](ctx context.Context, store state.Store[V], m monoid.Monoid[V], ...) ([]V, error)
- func GetWindow[V any](ctx context.Context, store state.Store[V], m monoid.Monoid[V], ...) (V, error)
- func GetWindowMany[V any](ctx context.Context, store state.Store[V], m monoid.Monoid[V], ...) ([]V, error)
- func WarmupNonWindowed[V any](ctx context.Context, cache state.Cache[V], store state.Store[V], ...) (int, error)
- func WarmupWindowed[V any](ctx context.Context, cache state.Cache[V], store state.Store[V], ...) (int, error)
- type LambdaQuery
- func (q LambdaQuery[V]) Get(ctx context.Context, entity string) (V, bool, error)
- func (q LambdaQuery[V]) GetRange(ctx context.Context, w windowed.Config, entity string, start, end time.Time) (V, error)
- func (q LambdaQuery[V]) GetWindow(ctx context.Context, w windowed.Config, entity string, duration time.Duration, ...) (V, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Get ¶
Get returns the all-time aggregation value for entity. Used for non-windowed pipelines (Bucket = 0).
func GetRange ¶
func GetRange[V any]( ctx context.Context, store state.Store[V], m monoid.Monoid[V], w windowed.Config, entity string, start, end time.Time, ) (V, error)
GetRange returns the aggregation merged over the bucket range covering [start, end] under the given Windowed config.
func GetRangeMany ¶
func GetRangeMany[V any]( ctx context.Context, store state.Store[V], m monoid.Monoid[V], w windowed.Config, entities []string, start, end time.Time, ) ([]V, error)
GetRangeMany is the absolute-range counterpart to GetWindowMany.
func GetTrailing ¶
func GetTrailing[V any]( ctx context.Context, store state.Store[V], m monoid.Monoid[V], w windowed.Config, entity string, duration time.Duration, now time.Time, ) (V, error)
GetTrailing returns the aggregation merged across the trailing window covering [now-duration, now] under the given Windowed config. A thin wrapper over GetWindow that documents the "trailing window" intent.
Use this for "last 7d unique viewers", "trailing 30d likes", "last 24h impressions" — the dominant shape for counter queries.
func GetTrailingMany ¶
func GetTrailingMany[V any]( ctx context.Context, store state.Store[V], m monoid.Monoid[V], w windowed.Config, entities []string, duration time.Duration, now time.Time, ) ([]V, error)
GetTrailingMany batches GetTrailing across many entities in a single fanned-out fetch. Same single-store-call contract as GetWindowMany.
func GetWindow ¶
func GetWindow[V any]( ctx context.Context, store state.Store[V], m monoid.Monoid[V], w windowed.Config, entity string, duration time.Duration, now time.Time, ) (V, error)
GetWindow returns the aggregation merged over the bucket range covering [now-duration, now] under the given Windowed config.
Implementation: compute the inclusive bucket-ID range, BatchGetItem all buckets in one or more requests, then fold via the monoid Combine in stable order. Missing buckets are treated as the monoid Identity.
Cost note: for fine-grained windows the bucket count can be large (e.g., a "last 30 days" query on hourly buckets reads 720 items). For sketches this means 720 sketch merges in process. If query latency becomes a bottleneck, the Phase 2 plan calls for pre-rolled rollup buckets or a Valkey-cached pre-merged window.
func GetWindowMany ¶
func GetWindowMany[V any]( ctx context.Context, store state.Store[V], m monoid.Monoid[V], w windowed.Config, entities []string, duration time.Duration, now time.Time, ) ([]V, error)
GetWindowMany returns the windowed merge for each of `entities` in a single fanned-out fetch. Equivalent to calling GetWindow per entity, but uses ONE underlying store.GetMany over (N entities × M buckets) keys instead of N separate GetMany calls of M keys each.
For ML rerank with windowed counter features over N=200 candidates with M=7 daily buckets, this is 14 BatchGetItem calls in parallel (~20ms p99) instead of 200 sequential GetWindow calls (~4 seconds).
Returns one value per entity in input order. Missing entities (no bucket data within the range) return the monoid Identity rather than an error — callers branch on Identity, not on a separate "present" flag, since "merged-empty" is a legitimate windowed result.
func WarmupNonWindowed ¶
func WarmupNonWindowed[V any]( ctx context.Context, cache state.Cache[V], store state.Store[V], entities []string, ) (int, error)
WarmupNonWindowed prefetches all-time entries for the given entities into the cache. Same shape as WarmupWindowed but for pipelines without a time-bucket dimension.
func WarmupWindowed ¶
func WarmupWindowed[V any]( ctx context.Context, cache state.Cache[V], store state.Store[V], w windowed.Config, entities []string, duration time.Duration, now time.Time, ) (int, error)
WarmupWindowed prefetches the (entity × bucket) keys covering the most-recent `duration` window for each entity into the cache, sourcing fresh values from the authoritative store via cache.Repopulate.
Use case: cold-cache p99 inflation. After a Valkey restart or a fresh query-service deployment, the first batch of GetWindow / GetWindowMany queries falls through to DDB and pays full latency. A pre-flight WarmupWindowed at process start (or on a known traffic pattern) loads the hot keys before user requests arrive.
Cost is `len(entities) × bucketCount` cache writes plus one batched store.GetMany over the same key set. For ML-rerank-style workloads where the candidate set is bounded (typically N=200 popular content items × M=7 daily buckets = 1400 keys), the warmup completes in a few hundred milliseconds against typical DDB capacity and is a straightforward step in the service-start path.
Returns the count of successfully warmed entries (entities × buckets that the store had data for) so callers can log meaningful progress.
Types ¶
type LambdaQuery ¶
LambdaQuery merges a precomputed batch view with a realtime delta at query time. This is Murmur's lambda-architecture mode: a periodic batch job writes the authoritative view; the streaming runtime writes only the delta accumulated since the last batch checkpoint; queries combine the two via the user's monoid.
Two-store contract:
- view : the precomputed result of the batch job over historical events. Updated atomically by the batch worker (typically swap.SetActive after writing into a fresh shadow table).
- delta : the streaming accumulator over events since the batch checkpoint. Reset to identity when a new batch view is promoted.
Merge semantics: at query time the result is `view.Get ⊕ delta.Get` for each key (or each bucket, in the windowed case). Both stores must use the same monoid; the merge is the monoid's Combine.
At least-once dedup remains a per-event-ID concern at the streaming runtime — the lambda layer doesn't add duplicate-suppression semantics.
func (LambdaQuery[V]) Get ¶
Get returns the lambda-merged value for entity (non-windowed pipelines).
func (LambdaQuery[V]) GetRange ¶
func (q LambdaQuery[V]) GetRange( ctx context.Context, w windowed.Config, entity string, start, end time.Time, ) (V, error)
GetRange returns the lambda-merged value over the absolute time range [start, end].
func (LambdaQuery[V]) GetWindow ¶
func (q LambdaQuery[V]) GetWindow( ctx context.Context, w windowed.Config, entity string, duration time.Duration, now time.Time, ) (V, error)
GetWindow returns the lambda-merged value for the rolling window of duration ending at now. Both stores are queried for the same bucket range; the per-bucket merge order is: view-bucket-N ⊕ delta-bucket-N for each N, then fold all merged buckets via Combine.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package grpc serves Murmur's read-side query layer for an application data plane.
|
Package grpc serves Murmur's read-side query layer for an application data plane. |
|
Package typed provides typed wrappers around the generic Murmur QueryService — the building block for application services that want to expose typed protos to their callers rather than the generic Value{bytes} shape.
|
Package typed provides typed wrappers around the generic Murmur QueryService — the building block for application services that want to expose typed protos to their callers rather than the generic Value{bytes} shape. |