query

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package query provides the read-side merge logic Murmur uses to assemble sliding-window query results from per-bucket state.

In Phase 1 these are in-process helpers callers invoke directly. In Phase 2 the auto-generated gRPC service in pkg/query/codegen will dispatch to these same functions, so the merge semantics live in one place.

Lambda-mode merge (batch_view ⊕ realtime_delta) is a Phase 2 addition — currently the helpers read directly from the primary state.Store, matching kappa-mode semantics.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Get

func Get[V any](
	ctx context.Context,
	store state.Store[V],
	entity string,
) (V, bool, error)

Get returns the all-time aggregation value for entity. Used for non-windowed pipelines (Bucket = 0).

func GetRange

func GetRange[V any](
	ctx context.Context,
	store state.Store[V],
	m monoid.Monoid[V],
	w windowed.Config,
	entity string,
	start, end time.Time,
) (V, error)

GetRange returns the aggregation merged over the bucket range covering [start, end] under the given Windowed config.

func GetRangeMany

func GetRangeMany[V any](
	ctx context.Context,
	store state.Store[V],
	m monoid.Monoid[V],
	w windowed.Config,
	entities []string,
	start, end time.Time,
) ([]V, error)

GetRangeMany is the absolute-range counterpart to GetWindowMany.

func GetTrailing

func GetTrailing[V any](
	ctx context.Context,
	store state.Store[V],
	m monoid.Monoid[V],
	w windowed.Config,
	entity string,
	duration time.Duration,
	now time.Time,
) (V, error)

GetTrailing returns the aggregation merged across the trailing window covering [now-duration, now] under the given Windowed config. A thin wrapper over GetWindow that documents the "trailing window" intent.

Use this for "last 7d unique viewers", "trailing 30d likes", "last 24h impressions" — the dominant shape for counter queries.

func GetTrailingMany

func GetTrailingMany[V any](
	ctx context.Context,
	store state.Store[V],
	m monoid.Monoid[V],
	w windowed.Config,
	entities []string,
	duration time.Duration,
	now time.Time,
) ([]V, error)

GetTrailingMany batches GetTrailing across many entities in a single fanned-out fetch. Same single-store-call contract as GetWindowMany.

func GetWindow

func GetWindow[V any](
	ctx context.Context,
	store state.Store[V],
	m monoid.Monoid[V],
	w windowed.Config,
	entity string,
	duration time.Duration,
	now time.Time,
) (V, error)

GetWindow returns the aggregation merged over the bucket range covering [now-duration, now] under the given Windowed config.

Implementation: compute the inclusive bucket-ID range, BatchGetItem all buckets in one or more requests, then fold via the monoid Combine in stable order. Missing buckets are treated as the monoid Identity.

Cost note: for fine-grained windows the bucket count can be large (e.g., a "last 30 days" query on hourly buckets reads 720 items). For sketches this means 720 sketch merges in process. If query latency becomes a bottleneck, the Phase 2 plan calls for pre-rolled rollup buckets or a Valkey-cached pre-merged window.

func GetWindowMany

func GetWindowMany[V any](
	ctx context.Context,
	store state.Store[V],
	m monoid.Monoid[V],
	w windowed.Config,
	entities []string,
	duration time.Duration,
	now time.Time,
) ([]V, error)

GetWindowMany returns the windowed merge for each of `entities` in a single fanned-out fetch. Equivalent to calling GetWindow per entity, but uses ONE underlying store.GetMany over (N entities × M buckets) keys instead of N separate GetMany calls of M keys each.

For ML rerank with windowed counter features over N=200 candidates with M=7 daily buckets, this is 14 BatchGetItem calls in parallel (~20ms p99) instead of 200 sequential GetWindow calls (~4 seconds).

Returns one value per entity in input order. Missing entities (no bucket data within the range) return the monoid Identity rather than an error — callers branch on Identity, not on a separate "present" flag, since "merged-empty" is a legitimate windowed result.

func WarmupNonWindowed

func WarmupNonWindowed[V any](
	ctx context.Context,
	cache state.Cache[V],
	store state.Store[V],
	entities []string,
) (int, error)

WarmupNonWindowed prefetches all-time entries for the given entities into the cache. Same shape as WarmupWindowed but for pipelines without a time-bucket dimension.

func WarmupWindowed

func WarmupWindowed[V any](
	ctx context.Context,
	cache state.Cache[V],
	store state.Store[V],
	w windowed.Config,
	entities []string,
	duration time.Duration,
	now time.Time,
) (int, error)

WarmupWindowed prefetches the (entity × bucket) keys covering the most-recent `duration` window for each entity into the cache, sourcing fresh values from the authoritative store via cache.Repopulate.

Use case: cold-cache p99 inflation. After a Valkey restart or a fresh query-service deployment, the first batch of GetWindow / GetWindowMany queries falls through to DDB and pays full latency. A pre-flight WarmupWindowed at process start (or on a known traffic pattern) loads the hot keys before user requests arrive.

Cost is `len(entities) × bucketCount` cache writes plus one batched store.GetMany over the same key set. For ML-rerank-style workloads where the candidate set is bounded (typically N=200 popular content items × M=7 daily buckets = 1400 keys), the warmup completes in a few hundred milliseconds against typical DDB capacity and is a straightforward step in the service-start path.

Returns the count of successfully warmed entries (entities × buckets that the store had data for) so callers can log meaningful progress.

Types

type LambdaQuery

type LambdaQuery[V any] struct {
	View   state.Store[V]
	Delta  state.Store[V]
	Monoid monoid.Monoid[V]
}

LambdaQuery merges a precomputed batch view with a realtime delta at query time. This is Murmur's lambda-architecture mode: a periodic batch job writes the authoritative view; the streaming runtime writes only the delta accumulated since the last batch checkpoint; queries combine the two via the user's monoid.

Two-store contract:

  • view : the precomputed result of the batch job over historical events. Updated atomically by the batch worker (typically swap.SetActive after writing into a fresh shadow table).
  • delta : the streaming accumulator over events since the batch checkpoint. Reset to identity when a new batch view is promoted.

Merge semantics: at query time the result is `view.Get ⊕ delta.Get` for each key (or each bucket, in the windowed case). Both stores must use the same monoid; the merge is the monoid's Combine.

At least-once dedup remains a per-event-ID concern at the streaming runtime — the lambda layer doesn't add duplicate-suppression semantics.

func (LambdaQuery[V]) Get

func (q LambdaQuery[V]) Get(ctx context.Context, entity string) (V, bool, error)

Get returns the lambda-merged value for entity (non-windowed pipelines).

func (LambdaQuery[V]) GetRange

func (q LambdaQuery[V]) GetRange(
	ctx context.Context,
	w windowed.Config,
	entity string,
	start, end time.Time,
) (V, error)

GetRange returns the lambda-merged value over the absolute time range [start, end].

func (LambdaQuery[V]) GetWindow

func (q LambdaQuery[V]) GetWindow(
	ctx context.Context,
	w windowed.Config,
	entity string,
	duration time.Duration,
	now time.Time,
) (V, error)

GetWindow returns the lambda-merged value for the rolling window of duration ending at now. Both stores are queried for the same bucket range; the per-bucket merge order is: view-bucket-N ⊕ delta-bucket-N for each N, then fold all merged buckets via Combine.

Directories

Path Synopsis
Package grpc serves Murmur's read-side query layer for an application data plane.
Package grpc serves Murmur's read-side query layer for an application data plane.
Package typed provides typed wrappers around the generic Murmur QueryService — the building block for application services that want to expose typed protos to their callers rather than the generic Value{bytes} shape.
Package typed provides typed wrappers around the generic Murmur QueryService — the building block for application services that want to expose typed protos to their callers rather than the generic Value{bytes} shape.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL