grpc

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package grpc serves Murmur's read-side query layer for an application data plane. The package name predates the Connect-RPC migration; the underlying implementation now uses Connect, which means a single mount-point speaks THREE protocols simultaneously:

  • gRPC — for Go / JVM / Rust / Python clients using the standard grpc-go / grpc-java / etc. clients
  • gRPC-Web — for browsers without a sidecar proxy
  • Connect (HTTP + JSON) — for browsers and curl, no transport setup

The wire contract is defined in proto/murmur/v1/query.proto. Anyone is welcome to point a different client at the same handler — Go's grpc-go, Connect's connect-go, browsers via @connectrpc/connect-web, and curl all hit the same routes.

Phase 1 ships a generic Value (bytes) shape: the server takes a pipeline-typed Store, monoid, and windowing config, plus an Encoder[V] that converts the typed value into wire bytes. Clients are responsible for matching encoding (Int64LE for Sum/Count, raw bytes for sketches, etc.). Phase 2 will codegen pipeline-typed responses (CounterResponse, HLLResponse) from the pipeline definition and remove the caller-side decoding burden.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[V any] struct {
	Store  state.Store[V]
	Monoid monoid.Monoid[V]
	Window *windowed.Config // optional; required only for GetWindow / GetRange
	Encode Encoder[V]

	// Now, if non-nil, overrides the time.Now used for sliding-window queries. Useful
	// for tests with deterministic clocks.
	Now func() time.Time

	// Recorder, if non-nil, receives per-RPC latency, error, and event
	// metrics. The streaming runtime records "store_merge" / "cache_merge"
	// latency under the pipeline name; the query side records
	// "<pipeline>:query_get", "<pipeline>:query_get_many",
	// "<pipeline>:query_get_window", "<pipeline>:query_get_range",
	// "<pipeline>:query_get_window_many", "<pipeline>:query_get_range_many",
	// "<pipeline>:query_get_trailing", "<pipeline>:query_get_trailing_many".
	// Use a metrics.InMemory in development; a Prometheus / CloudWatch
	// adapter in production.
	Recorder metrics.Recorder

	// Pipeline names this query server's parent pipeline for metrics
	// labels. Defaults to "query" when unset; set explicitly when one
	// process serves multiple pipelines.
	Pipeline string
}

Config configures a query Server.

type Encoder

type Encoder[V any] func(V) []byte

Encoder converts a typed aggregation value to wire bytes. Common encoders are available as Int64LE / BytesIdentity.

func BytesIdentity

func BytesIdentity() Encoder[[]byte]

BytesIdentity encodes []byte values verbatim — for sketches whose marshaled form is already the desired wire format.

func Int64LE

func Int64LE() Encoder[int64]

Int64LE encodes int64 values as 8-byte little-endian.

type Server

type Server[V any] struct {
	// contains filtered or unexported fields
}

Server bridges the generated Connect QueryServiceHandler to a pipeline's Store + monoid. Mount it on an http.ServeMux via Handler().

Concurrent requests for the same (RPC, entity, bucket/window/range) are coalesced via a singleflight.Group keyed by the request shape, so a thousand simultaneous feed renders asking for the same hot counter become one underlying store call. The dedup window is the lifetime of the in-flight call — once the future resolves, the next request is fresh.

func NewServer

func NewServer[V any](cfg Config[V]) *Server[V]

NewServer constructs a query Server.

func (*Server[V]) Get

Get implements murmur.v1.QueryService/Get. Returns the all-time aggregation value for entity (non-windowed pipelines). On a missing key, returns {present: false, data: nil}; clients should branch on `present` rather than on len(data).

Concurrent identical Gets are coalesced via singleflight: under load on a hot entity, one underlying store.Get serves N waiters. Set `req.fresh_read = true` to bypass coalescing and force an authoritative read — used for read-your-writes ("user just liked this; show their like count").

func (*Server[V]) GetMany

GetMany implements murmur.v1.QueryService/GetMany. Same shape as Get but for many entities in one round-trip; the response preserves request order so clients can zip without an extra index map.

func (*Server[V]) GetRange

GetRange implements murmur.v1.QueryService/GetRange. Merges every bucket whose ID falls in [start_unix, end_unix] inclusive. Same not-windowed failure mode as GetWindow.

Coalesced via singleflight on (entity, start_unix, end_unix) — the range is fully specified by the caller, so identical concurrent ranges share work directly.

func (*Server[V]) GetRangeMany

GetRangeMany implements murmur.v1.QueryService/GetRangeMany. Same shape as GetWindowMany over an absolute [start_unix, end_unix] range.

func (*Server[V]) GetTrailing

GetTrailing implements murmur.v1.QueryService/GetTrailing. Semantically identical to GetWindow — both merge the most-recent buckets covering `duration_seconds` ending at the server's now — but exposed under a distinct RPC so callsites that think in "trailing windows" (last-7d, last-30d) don't have to translate intent. Same not-windowed precondition and same singleflight coalesce shape as GetWindow.

func (*Server[V]) GetTrailingMany

GetTrailingMany implements murmur.v1.QueryService/GetTrailingMany. Same shape as GetWindowMany; pairs with GetTrailing for the batched-trailing-windows case (e.g. trailing-7d engagement for 200 candidate posts in one round-trip).

func (*Server[V]) GetWindow

GetWindow implements murmur.v1.QueryService/GetWindow. Merges the N most-recent buckets covering `duration_seconds` ending at the server's now via the configured monoid. Returns CodeFailedPrecondition for non-windowed pipelines so clients can route to Get instead.

Concurrent identical GetWindows are coalesced via singleflight; the coalesce key includes the bucketed `now` so two requests one second apart can share work, while requests across a bucket boundary do not.

func (*Server[V]) GetWindowMany

GetWindowMany implements murmur.v1.QueryService/GetWindowMany. Batches windowed merges across many entities into a single underlying store fetch. Same windowed-pipeline precondition as GetWindow.

fresh_read bypasses singleflight. The default path coalesces concurrent identical requests at the (sorted-entities, duration, bucket) granularity.

func (*Server[V]) Handler

func (s *Server[V]) Handler() (string, http.Handler)

Handler returns the Connect HTTP handler and its mount path. Wire it into a net/http server with `mux.Handle(path, h)`. The path follows the Connect convention `/murmur.v1.QueryService/`.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL