Documentation
¶
Overview ¶
Package grpc serves Murmur's read-side query layer for an application data plane. The package name predates the Connect-RPC migration; the underlying implementation now uses Connect, which means a single mount-point speaks THREE protocols simultaneously:
- gRPC — for Go / JVM / Rust / Python clients using the standard grpc-go / grpc-java / etc. clients
- gRPC-Web — for browsers without a sidecar proxy
- Connect (HTTP + JSON) — for browsers and curl, no transport setup
The wire contract is defined in proto/murmur/v1/query.proto. Anyone is welcome to point a different client at the same handler — Go's grpc-go, Connect's connect-go, browsers via @connectrpc/connect-web, and curl all hit the same routes.
Phase 1 ships a generic Value (bytes) shape: the server takes a pipeline-typed Store, monoid, and windowing config, plus an Encoder[V] that converts the typed value into wire bytes. Clients are responsible for matching encoding (Int64LE for Sum/Count, raw bytes for sketches, etc.). Phase 2 will codegen pipeline-typed responses (CounterResponse, HLLResponse) from the pipeline definition and remove the caller-side decoding burden.
Index ¶
- type Config
- type Encoder
- type Server
- func (s *Server[V]) Get(ctx context.Context, req *connect.Request[pb.GetRequest]) (*connect.Response[pb.GetResponse], error)
- func (s *Server[V]) GetMany(ctx context.Context, req *connect.Request[pb.GetManyRequest]) (*connect.Response[pb.GetManyResponse], error)
- func (s *Server[V]) GetRange(ctx context.Context, req *connect.Request[pb.GetRangeRequest]) (*connect.Response[pb.GetRangeResponse], error)
- func (s *Server[V]) GetRangeMany(ctx context.Context, req *connect.Request[pb.GetRangeManyRequest]) (*connect.Response[pb.GetRangeManyResponse], error)
- func (s *Server[V]) GetTrailing(ctx context.Context, req *connect.Request[pb.GetTrailingRequest]) (*connect.Response[pb.GetTrailingResponse], error)
- func (s *Server[V]) GetTrailingMany(ctx context.Context, req *connect.Request[pb.GetTrailingManyRequest]) (*connect.Response[pb.GetTrailingManyResponse], error)
- func (s *Server[V]) GetWindow(ctx context.Context, req *connect.Request[pb.GetWindowRequest]) (*connect.Response[pb.GetWindowResponse], error)
- func (s *Server[V]) GetWindowMany(ctx context.Context, req *connect.Request[pb.GetWindowManyRequest]) (*connect.Response[pb.GetWindowManyResponse], error)
- func (s *Server[V]) Handler() (string, http.Handler)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config[V any] struct { Store state.Store[V] Monoid monoid.Monoid[V] Window *windowed.Config // optional; required only for GetWindow / GetRange Encode Encoder[V] // Now, if non-nil, overrides the time.Now used for sliding-window queries. Useful // for tests with deterministic clocks. Now func() time.Time // Recorder, if non-nil, receives per-RPC latency, error, and event // metrics. The streaming runtime records "store_merge" / "cache_merge" // latency under the pipeline name; the query side records // "<pipeline>:query_get", "<pipeline>:query_get_many", // "<pipeline>:query_get_window", "<pipeline>:query_get_range", // "<pipeline>:query_get_window_many", "<pipeline>:query_get_range_many", // "<pipeline>:query_get_trailing", "<pipeline>:query_get_trailing_many". // Use a metrics.InMemory in development; a Prometheus / CloudWatch // adapter in production. Recorder metrics.Recorder // Pipeline names this query server's parent pipeline for metrics // labels. Defaults to "query" when unset; set explicitly when one // process serves multiple pipelines. Pipeline string }
Config configures a query Server.
type Encoder ¶
Encoder converts a typed aggregation value to wire bytes. Common encoders are available as Int64LE / BytesIdentity.
func BytesIdentity ¶
BytesIdentity encodes []byte values verbatim — for sketches whose marshaled form is already the desired wire format.
type Server ¶
type Server[V any] struct { // contains filtered or unexported fields }
Server bridges the generated Connect QueryServiceHandler to a pipeline's Store + monoid. Mount it on an http.ServeMux via Handler().
Concurrent requests for the same (RPC, entity, bucket/window/range) are coalesced via a singleflight.Group keyed by the request shape, so a thousand simultaneous feed renders asking for the same hot counter become one underlying store call. The dedup window is the lifetime of the in-flight call — once the future resolves, the next request is fresh.
func (*Server[V]) Get ¶
func (s *Server[V]) Get(ctx context.Context, req *connect.Request[pb.GetRequest]) (*connect.Response[pb.GetResponse], error)
Get implements murmur.v1.QueryService/Get. Returns the all-time aggregation value for entity (non-windowed pipelines). On a missing key, returns {present: false, data: nil}; clients should branch on `present` rather than on len(data).
Concurrent identical Gets are coalesced via singleflight: under load on a hot entity, one underlying store.Get serves N waiters. Set `req.fresh_read = true` to bypass coalescing and force an authoritative read — used for read-your-writes ("user just liked this; show their like count").
func (*Server[V]) GetMany ¶
func (s *Server[V]) GetMany(ctx context.Context, req *connect.Request[pb.GetManyRequest]) (*connect.Response[pb.GetManyResponse], error)
GetMany implements murmur.v1.QueryService/GetMany. Same shape as Get but for many entities in one round-trip; the response preserves request order so clients can zip without an extra index map.
func (*Server[V]) GetRange ¶
func (s *Server[V]) GetRange(ctx context.Context, req *connect.Request[pb.GetRangeRequest]) (*connect.Response[pb.GetRangeResponse], error)
GetRange implements murmur.v1.QueryService/GetRange. Merges every bucket whose ID falls in [start_unix, end_unix] inclusive. Same not-windowed failure mode as GetWindow.
Coalesced via singleflight on (entity, start_unix, end_unix) — the range is fully specified by the caller, so identical concurrent ranges share work directly.
func (*Server[V]) GetRangeMany ¶
func (s *Server[V]) GetRangeMany(ctx context.Context, req *connect.Request[pb.GetRangeManyRequest]) (*connect.Response[pb.GetRangeManyResponse], error)
GetRangeMany implements murmur.v1.QueryService/GetRangeMany. Same shape as GetWindowMany over an absolute [start_unix, end_unix] range.
func (*Server[V]) GetTrailing ¶
func (s *Server[V]) GetTrailing(ctx context.Context, req *connect.Request[pb.GetTrailingRequest]) (*connect.Response[pb.GetTrailingResponse], error)
GetTrailing implements murmur.v1.QueryService/GetTrailing. Semantically identical to GetWindow — both merge the most-recent buckets covering `duration_seconds` ending at the server's now — but exposed under a distinct RPC so callsites that think in "trailing windows" (last-7d, last-30d) don't have to translate intent. Same not-windowed precondition and same singleflight coalesce shape as GetWindow.
func (*Server[V]) GetTrailingMany ¶
func (s *Server[V]) GetTrailingMany(ctx context.Context, req *connect.Request[pb.GetTrailingManyRequest]) (*connect.Response[pb.GetTrailingManyResponse], error)
GetTrailingMany implements murmur.v1.QueryService/GetTrailingMany. Same shape as GetWindowMany; pairs with GetTrailing for the batched-trailing-windows case (e.g. trailing-7d engagement for 200 candidate posts in one round-trip).
func (*Server[V]) GetWindow ¶
func (s *Server[V]) GetWindow(ctx context.Context, req *connect.Request[pb.GetWindowRequest]) (*connect.Response[pb.GetWindowResponse], error)
GetWindow implements murmur.v1.QueryService/GetWindow. Merges the N most-recent buckets covering `duration_seconds` ending at the server's now via the configured monoid. Returns CodeFailedPrecondition for non-windowed pipelines so clients can route to Get instead.
Concurrent identical GetWindows are coalesced via singleflight; the coalesce key includes the bucketed `now` so two requests one second apart can share work, while requests across a bucket boundary do not.
func (*Server[V]) GetWindowMany ¶
func (s *Server[V]) GetWindowMany(ctx context.Context, req *connect.Request[pb.GetWindowManyRequest]) (*connect.Response[pb.GetWindowManyResponse], error)
GetWindowMany implements murmur.v1.QueryService/GetWindowMany. Batches windowed merges across many entities into a single underlying store fetch. Same windowed-pipeline precondition as GetWindow.
fresh_read bypasses singleflight. The default path coalesces concurrent identical requests at the (sorted-entities, duration, bucket) granularity.