murmur

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package murmur is a thin facade over pkg/pipeline for the most common pipeline shapes. The verbose lower-level builder (pkg/pipeline) is still available for pipelines that don't fit a preset; the facades here trade flexibility for dramatically less boilerplate on the 90% case.

Presets:

Counter[T]        — Sum monoid; each event contributes 1
UniqueCount[T]    — HLL monoid; one element per event
TopN[T]           — TopK monoid; one (key, 1) per event

Each preset builds a pkg/pipeline.Pipeline you can then hand to streaming.Run, bootstrap.Run, or replay.Run — same lifecycle as a hand-built pipeline.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DynamoDBStreamsHandler

func DynamoDBStreamsHandler[T any, V any](
	p *pipeline.Pipeline[T, V],
	decode dynamodbstreams.Decoder[T],
	cfg LambdaConfig,
) (dynamodbstreams.Handler, error)

DynamoDBStreamsHandler builds a DDB Streams Lambda handler with the same standard option set as KinesisHandler.

func KinesisHandler

func KinesisHandler[T any, V any](
	p *pipeline.Pipeline[T, V],
	decode kinesis.Decoder[T],
	cfg LambdaConfig,
) (kinesis.Handler, error)

KinesisHandler builds a Kinesis Lambda handler from a pipeline plus a Decoder. Equivalent to manually constructing kinesis.NewHandler with the standard production option set:

  • WithMetrics(cfg.Recorder)
  • WithDedup(cfg.Dedup) when non-nil
  • WithDecodeErrorCallback(...) that logs via cfg.Logger

The returned handler is ready for lambda.Start.

func MustHandler

func MustHandler[H any](h H, err error) H

MustHandler is the panic-on-error variant for the typical Lambda main pattern where any error during handler construction is fatal anyway:

handler := murmur.MustHandler(murmur.KinesisHandler(pipe, decode, cfg))
lambda.Start(handler)

Cuts the boilerplate from 4 lines (declare, check err, log.Fatal, lambda.Start) to 2.

func RunStreamingWorker

func RunStreamingWorker[T any, V any](ctx context.Context, p *pipeline.Pipeline[T, V], opts ...streaming.RunOption) int

RunStreamingWorker is the boilerplate-eating entry point for a streaming worker binary. It wires SIGINT/SIGTERM to the runtime context, runs streaming.Run with any RunOptions you pass, and returns the exit code (0 for clean shutdown, 1 for runtime error, 2 for invalid pipeline). Suitable for `os.Exit(murmur.RunStreamingWorker(ctx, pipe, opts...))`.

Use this when you have a single pipeline per binary and want graceful-shutdown semantics. For multi-pipeline workers or custom error handling, drive streaming.Run yourself.

Example

ExampleRunStreamingWorker shows the boilerplate-eating worker entry point. The recorder + dedup options are forwarded to streaming.Run so a typical production main is ~15 lines.

package main

import (
	"context"
	"time"

	"github.com/gallowaysoftware/murmur/pkg/exec/streaming"
	"github.com/gallowaysoftware/murmur/pkg/metrics"
	"github.com/gallowaysoftware/murmur/pkg/murmur"
	"github.com/gallowaysoftware/murmur/pkg/source"
	"github.com/gallowaysoftware/murmur/pkg/state"
)

// memStore is a tiny in-memory state.Store[int64] used only by the examples
// below so they are self-contained on pkg.go.dev. Production code uses
// pkg/state/dynamodb.Int64SumStore.
type memStore struct{ m map[state.Key]int64 }

func newMemStore() *memStore { return &memStore{m: map[state.Key]int64{}} }
func (s *memStore) Get(_ context.Context, k state.Key) (int64, bool, error) {
	v, ok := s.m[k]
	return v, ok, nil
}
func (s *memStore) GetMany(context.Context, []state.Key) ([]int64, []bool, error) {
	return nil, nil, nil
}
func (s *memStore) MergeUpdate(_ context.Context, k state.Key, d int64, _ time.Duration) error {
	s.m[k] += d
	return nil
}
func (s *memStore) Close() error { return nil }

// nopSource yields one record per Read call, then closes. Stand-in for a real
// kafka.Source / kinesis.Source so the example doesn't need a broker running.
type nopEvent struct{ K string }
type nopSource struct{ events []nopEvent }

func (s *nopSource) Read(_ context.Context, out chan<- source.Record[nopEvent]) error {
	for i, e := range s.events {
		out <- source.Record[nopEvent]{
			EventID: "evt-" + e.K + string(rune('0'+i)),
			Value:   e,
			Ack:     func() error { return nil },
		}
	}
	return nil
}
func (*nopSource) Name() string { return "nop" }
func (*nopSource) Close() error { return nil }

func main() {
	store := newMemStore()
	src := &nopSource{events: []nopEvent{{"a"}}}

	pipe := murmur.Counter[nopEvent]("page_views").
		From(src).KeyBy(func(e nopEvent) string { return e.K }).
		StoreIn(store).Build()

	rec := metrics.NewInMemory()
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	_ = murmur.RunStreamingWorker(ctx, pipe,
		streaming.WithMetrics(rec),
		streaming.WithMaxAttempts(5),
	)
	// Real code would also pass streaming.WithDedup(deduper) and
	// streaming.WithDeadLetter(fn) for production-grade at-least-once.
}

func SQSHandler

func SQSHandler[T any, V any](
	p *pipeline.Pipeline[T, V],
	decode sqs.Decoder[T],
	cfg LambdaConfig,
) (sqs.Handler, error)

SQSHandler builds an SQS Lambda handler with the same standard option set as KinesisHandler / DynamoDBStreamsHandler.

Type parameters note: SQS's HandlerOption is parameterized over T (the input record type) so the option helpers can carry the type through to NewHandler. The wrapper preserves that — the [T] type parameter at the call site is required.

Types

type CounterBuilder

type CounterBuilder[T any] struct {
	// contains filtered or unexported fields
}

CounterBuilder builds a Counter pipeline. Required: From, KeyBy or KeyByMany, StoreIn. Optional: Daily/Hourly/Trailing windowing, Cache.

func Counter

func Counter[T any](name string) *CounterBuilder[T]

Counter is a Sum-monoid counter pipeline preset. Each event contributes 1; the pipeline's value type is int64.

Example

ExampleCounter shows the canonical Murmur counter pipeline — one event contributes 1 to a per-key Sum, daily windowing, write-through to a state store, run via streaming.Run.

package main

import (
	"context"
	"time"

	"github.com/gallowaysoftware/murmur/pkg/exec/streaming"
	"github.com/gallowaysoftware/murmur/pkg/murmur"
	"github.com/gallowaysoftware/murmur/pkg/source"
	"github.com/gallowaysoftware/murmur/pkg/state"
)

// memStore is a tiny in-memory state.Store[int64] used only by the examples
// below so they are self-contained on pkg.go.dev. Production code uses
// pkg/state/dynamodb.Int64SumStore.
type memStore struct{ m map[state.Key]int64 }

func newMemStore() *memStore { return &memStore{m: map[state.Key]int64{}} }
func (s *memStore) Get(_ context.Context, k state.Key) (int64, bool, error) {
	v, ok := s.m[k]
	return v, ok, nil
}
func (s *memStore) GetMany(context.Context, []state.Key) ([]int64, []bool, error) {
	return nil, nil, nil
}
func (s *memStore) MergeUpdate(_ context.Context, k state.Key, d int64, _ time.Duration) error {
	s.m[k] += d
	return nil
}
func (s *memStore) Close() error { return nil }

// nopSource yields one record per Read call, then closes. Stand-in for a real
// kafka.Source / kinesis.Source so the example doesn't need a broker running.
type nopEvent struct{ K string }
type nopSource struct{ events []nopEvent }

func (s *nopSource) Read(_ context.Context, out chan<- source.Record[nopEvent]) error {
	for i, e := range s.events {
		out <- source.Record[nopEvent]{
			EventID: "evt-" + e.K + string(rune('0'+i)),
			Value:   e,
			Ack:     func() error { return nil },
		}
	}
	return nil
}
func (*nopSource) Name() string { return "nop" }
func (*nopSource) Close() error { return nil }

func main() {
	store := newMemStore()

	pipe := murmur.Counter[nopEvent]("page_views").
		From(&nopSource{events: []nopEvent{{"a"}, {"a"}, {"b"}}}).
		KeyBy(func(e nopEvent) string { return e.K }).
		Daily(7 * 24 * time.Hour).
		StoreIn(store).
		Build()

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	_ = streaming.Run(ctx, pipe)

	// store.m now holds {a: 2, b: 1} keyed by today's bucket. Production
	// code reads through query.GetWindow / GetRange rather than the store
	// directly.
}

func (*CounterBuilder[T]) Build

func (b *CounterBuilder[T]) Build() *pipeline.Pipeline[T, int64]

Build returns the assembled Pipeline. The returned pipeline is ready for streaming.Run / bootstrap.Run / replay.Run; Build itself does not execute it.

func (*CounterBuilder[T]) Cache

func (b *CounterBuilder[T]) Cache(c state.Cache[int64]) *CounterBuilder[T]

Cache sets the optional read accelerator (typically a Valkey-backed cache). Writes are mirrored to the cache after the primary store has accepted them; reads can short-circuit through the cache. Cache loss never affects correctness — see pkg/state/valkey.

func (*CounterBuilder[T]) Daily

func (b *CounterBuilder[T]) Daily(retention time.Duration) *CounterBuilder[T]

Daily configures daily tumbling buckets with the given retention.

func (*CounterBuilder[T]) From

func (b *CounterBuilder[T]) From(s source.Source[T]) *CounterBuilder[T]

From sets the live event source. Required for streaming.Run; can be omitted when the same builder is used to construct a pipeline that will only run in bootstrap or replay mode.

func (*CounterBuilder[T]) Hourly

func (b *CounterBuilder[T]) Hourly(retention time.Duration) *CounterBuilder[T]

Hourly configures hourly tumbling buckets with the given retention.

func (*CounterBuilder[T]) KeyBy

func (b *CounterBuilder[T]) KeyBy(fn func(T) string) *CounterBuilder[T]

KeyBy sets the function that derives the aggregation key from each event. The returned string is the entity used in the state store and any queries. Required (or use KeyByMany for hierarchical rollups).

func (*CounterBuilder[T]) KeyByMany

func (b *CounterBuilder[T]) KeyByMany(fn func(T) []string) *CounterBuilder[T]

KeyByMany sets a multi-key extractor — each event contributes its count to every key returned by fn. Use for hierarchical rollups where one event counts against many aggregation keys at once:

murmur.Counter[Like]("likes").
    KeyByMany(func(e Like) []string {
        return []string{
            "post:" + e.PostID,
            "post:" + e.PostID + "|country:" + e.Country,
            "country:" + e.Country,
            "global",
        }
    })

Mutually exclusive with KeyBy. Dedup applies once per event regardless of how many keys are emitted.

func (*CounterBuilder[T]) StoreIn

func (b *CounterBuilder[T]) StoreIn(s state.Store[int64]) *CounterBuilder[T]

StoreIn sets the state store the pipeline writes through. Required. Typically state.Store[int64] backed by pkg/state/dynamodb.Int64SumStore for production, or a fakeStore for tests.

func (*CounterBuilder[T]) Trailing

func (b *CounterBuilder[T]) Trailing(durations ...time.Duration) *CounterBuilder[T]

Trailing records the trailing-window durations the pipeline will be queried for and configures Daily windowing with retention sized to the largest requested window plus one bucket of slack.

Use it instead of Daily when the consumer side is known to query fixed trailing windows:

murmur.Counter[Like]("post_likes").
    KeyBy(func(e Like) string { return e.PostID }).
    Trailing(24*time.Hour, 7*24*time.Hour, 30*24*time.Hour).
    StoreIn(store).
    Build()

The pipeline above keeps 31 days of daily buckets and is ready to answer trailing-1d / trailing-7d / trailing-30d queries.

Passing zero or negative durations is silently dropped; passing no durations at all is a no-op (the builder stays un-windowed).

Trailing supersedes any prior Daily / Hourly call on the same builder. Multiple Trailing calls re-set the trailing-window set from scratch rather than accumulating across calls.

func (*CounterBuilder[T]) TrailingWindows

func (b *CounterBuilder[T]) TrailingWindows() []time.Duration

TrailingWindows returns the trailing-window durations recorded on the builder via Trailing, in the order they were supplied (with zero / negative values filtered out). Returns nil if Trailing was never called.

Used by downstream tooling that wants to discover the canonical query windows for a pipeline — e.g., a query server that wants to pre-register trailing-7d / trailing-30d as named convenience endpoints, or a dashboard that wants to render exactly the trailing windows the pipeline was designed for.

type LambdaConfig

type LambdaConfig struct {
	// Recorder receives events / errors / latencies. Defaults to
	// metrics.Noop. For production wire a CloudWatch / Prometheus /
	// Datadog adapter.
	Recorder metrics.Recorder

	// Dedup, when set, makes at-least-once delivery idempotent at the
	// monoid layer. Strongly recommended for any non-idempotent monoid
	// (Sum, HLL, TopK).
	Dedup state.Deduper

	// Logger is used by the default decode-error callback. Defaults to
	// slog.Default(). Wire a structured logger for production.
	Logger *slog.Logger
}

LambdaConfig is the unified configuration shared across the three Lambda runtime helpers. The boilerplate-eating equivalent of RunStreamingWorker for streaming workers.

Wire it once; pass it to KinesisHandler / DynamoDBStreamsHandler / SQSHandler — each builds the matching handler with sensible defaults (slog-driven decode-error logging, metrics.Noop unless overridden, dedup hooked up).

type TopNBuilder

type TopNBuilder[T any] struct {
	// contains filtered or unexported fields
}

TopNBuilder builds a TopN (Misra-Gries) pipeline. Required: KeyBy, StoreIn. Optional: From, Daily windowing.

func TopN

func TopN[T any](name string, k uint32, elementFn func(T) string) *TopNBuilder[T]

TopN is a TopK-monoid pipeline preset. Each event contributes one (key, 1) observation to the running top-K sketch.

Example

ExampleTopN shows the Misra-Gries TopK preset — useful for "top N most active X over a window."

package main

import (
	"time"

	"github.com/gallowaysoftware/murmur/pkg/murmur"
	"github.com/gallowaysoftware/murmur/pkg/state"
)

// nopSource yields one record per Read call, then closes. Stand-in for a real
// kafka.Source / kinesis.Source so the example doesn't need a broker running.
type nopEvent struct{ K string }

func main() {
	var store state.Store[[]byte]

	_ = murmur.TopN[nopEvent]("recently_interacted", 10, func(e nopEvent) string {
		return e.K // the entity id
	}).
		KeyBy(func(e nopEvent) string { return "global" }).
		Daily(7 * 24 * time.Hour).
		StoreIn(store).
		Build()
}

func (*TopNBuilder[T]) Build

func (b *TopNBuilder[T]) Build() *pipeline.Pipeline[T, []byte]

Build assembles a *pipeline.Pipeline ready for streaming.Run / bootstrap.Run / replay.Run. The value extractor lifts each event's elementFn output into a one-element TopK sketch via topk.SingleN at the configured K.

func (*TopNBuilder[T]) Daily

func (b *TopNBuilder[T]) Daily(retention time.Duration) *TopNBuilder[T]

Daily configures daily tumbling buckets with the given retention. Useful for "today's top 10" / "last 7 days' top 10" — each bucket gets its own Misra-Gries summary; the query layer merges N adjacent buckets.

func (*TopNBuilder[T]) From

func (b *TopNBuilder[T]) From(s source.Source[T]) *TopNBuilder[T]

From sets the live event source.

func (*TopNBuilder[T]) KeyBy

func (b *TopNBuilder[T]) KeyBy(fn func(T) string) *TopNBuilder[T]

KeyBy sets the function that derives the aggregation key from each event (e.g. "global" for a single Top-N over all events, or a per-tenant ID).

func (*TopNBuilder[T]) StoreIn

func (b *TopNBuilder[T]) StoreIn(s state.Store[[]byte]) *TopNBuilder[T]

StoreIn sets the state store. TopK sketch state is stored as []byte; pair with pkg/state/dynamodb.BytesStore.

type TrendingBuilder

type TrendingBuilder[T any] struct {
	// contains filtered or unexported fields
}

TrendingBuilder builds a Trending (DecayedSumBytes) pipeline. Required: KeyBy, StoreIn. Optional: From, Daily/Hourly windowing, Amount (override the default per-event score), Clock (override time.Now for tests).

func Trending[T any](name string, halfLife time.Duration) *TrendingBuilder[T]

Trending is a time-decayed-sum pipeline preset. Each event contributes a configurable score (default 1.0) to a per-key running sum that decays older contributions exponentially toward the most recent observation. The result is "score of how much activity has happened recently, with older activity counting less" — the canonical building block for "hot" / "trending" feeds.

Half-life sets the decay rate. Pass 1*time.Hour for "last hour matters most" / "fast-burning trends"; 6*time.Hour for "today's hot"; 24*time.Hour for "yesterday and today both matter."

Caveat: this is NOT Reddit's `log(votes) + (time/factor)` formula directly — Reddit's formula isn't a monoid. Trending implements time-weighted summation, which is the building block; if you want Reddit-style log-shape ranking, apply `log(EvaluateAt(d, halfLife, now))` at query time.

State type is []byte (DecayedSumBytes wire format); pair with pkg/state/dynamodb.BytesStore. Decode with compose.DecodeDecayed and evaluate at query time via compose.EvaluateAt.

func (*TrendingBuilder[T]) Amount

func (b *TrendingBuilder[T]) Amount(fn func(T) float64) *TrendingBuilder[T]

Amount overrides the per-event contribution score. Default is 1.0 per event. Use this for weighted trending — e.g., a like from a verified account contributes 5.0 instead of 1.0:

murmur.Trending[Like]("hot_posts", time.Hour).
    Amount(func(l Like) float64 {
        if l.UserVerified { return 5.0 }
        return 1.0
    })

func (*TrendingBuilder[T]) Build

func (b *TrendingBuilder[T]) Build() *pipeline.Pipeline[T, []byte]

Build assembles a *pipeline.Pipeline. The value extractor lifts each event's amountFn output to a Decayed observation timestamped at the configured clock (default time.Now), encoded via compose.DecayedBytes.

func (*TrendingBuilder[T]) Clock

func (b *TrendingBuilder[T]) Clock(now func() time.Time) *TrendingBuilder[T]

Clock overrides time.Now for the per-event timestamp. Useful for tests with deterministic clocks; production code should leave this unset.

func (*TrendingBuilder[T]) Daily

func (b *TrendingBuilder[T]) Daily(retention time.Duration) *TrendingBuilder[T]

Daily configures daily tumbling buckets with the given retention. Useful for "today's hottest" / "this week's hottest" — each bucket is a separate decayed-sum row; queries merge N adjacent buckets via the monoid's Combine.

func (*TrendingBuilder[T]) From

func (b *TrendingBuilder[T]) From(s source.Source[T]) *TrendingBuilder[T]

From sets the live event source.

func (*TrendingBuilder[T]) Hourly

func (b *TrendingBuilder[T]) Hourly(retention time.Duration) *TrendingBuilder[T]

Hourly configures hourly tumbling buckets — typical for fast-burning "trending right now" feeds.

func (*TrendingBuilder[T]) KeyBy

func (b *TrendingBuilder[T]) KeyBy(fn func(T) string) *TrendingBuilder[T]

KeyBy sets the function that derives the aggregation key from each event (e.g. post ID for "trending posts" or hashtag for "trending hashtags"). Required.

func (*TrendingBuilder[T]) StoreIn

func (b *TrendingBuilder[T]) StoreIn(s state.Store[[]byte]) *TrendingBuilder[T]

StoreIn sets the state store. Pair with pkg/state/dynamodb.NewBytesStore using compose.DecayedSumBytes(halfLife) as the monoid.

type UniqueCountBuilder

type UniqueCountBuilder[T any] struct {
	// contains filtered or unexported fields
}

UniqueCountBuilder builds a UniqueCount (HLL) pipeline. Required: KeyBy, StoreIn. Optional: From, Daily/Hourly windowing.

func UniqueCount

func UniqueCount[T any](name string, elementFn func(T) []byte) *UniqueCountBuilder[T]

UniqueCount is an HLL-monoid unique-cardinality pipeline preset. Each event contributes one element to the sketch — supplied by the elementFn argument (e.g., return e.UserID for unique-visitors-per-page).

Example

ExampleUniqueCount shows the HLL preset — one element per event lifted into a per-key cardinality sketch. The store value type is []byte so a custom store is needed; production code uses pkg/state/dynamodb.BytesStore.

package main

import (
	"time"

	"github.com/gallowaysoftware/murmur/pkg/murmur"
	"github.com/gallowaysoftware/murmur/pkg/state"
)

// nopSource yields one record per Read call, then closes. Stand-in for a real
// kafka.Source / kinesis.Source so the example doesn't need a broker running.
type nopEvent struct{ K string }

func main() {
	// Real code substitutes a state.Store[[]byte] backed by DDB BytesStore.
	var store state.Store[[]byte]

	_ = murmur.UniqueCount[nopEvent]("unique_visitors", func(e nopEvent) []byte {
		return []byte(e.K) // the visitor identifier
	}).
		KeyBy(func(e nopEvent) string { return "global" }).
		Hourly(24 * time.Hour).
		StoreIn(store).
		Build()
}

func (*UniqueCountBuilder[T]) Build

func (b *UniqueCountBuilder[T]) Build() *pipeline.Pipeline[T, []byte]

Build assembles a *pipeline.Pipeline ready for streaming.Run / bootstrap.Run / replay.Run. The HLL value extractor is wired automatically: each event's elementFn output is lifted into a one-element sketch via hll.Single.

func (*UniqueCountBuilder[T]) Daily

func (b *UniqueCountBuilder[T]) Daily(retention time.Duration) *UniqueCountBuilder[T]

Daily configures daily tumbling buckets with the given retention.

func (*UniqueCountBuilder[T]) From

func (b *UniqueCountBuilder[T]) From(s source.Source[T]) *UniqueCountBuilder[T]

From sets the live event source. Same semantics as CounterBuilder.From.

func (*UniqueCountBuilder[T]) Hourly

func (b *UniqueCountBuilder[T]) Hourly(retention time.Duration) *UniqueCountBuilder[T]

Hourly configures hourly tumbling buckets with the given retention.

func (*UniqueCountBuilder[T]) KeyBy

func (b *UniqueCountBuilder[T]) KeyBy(fn func(T) string) *UniqueCountBuilder[T]

KeyBy sets the function that derives the aggregation key from each event.

func (*UniqueCountBuilder[T]) StoreIn

func (b *UniqueCountBuilder[T]) StoreIn(s state.Store[[]byte]) *UniqueCountBuilder[T]

StoreIn sets the state store. HLL sketch state is stored as []byte; pair with pkg/state/dynamodb.BytesStore for production or a synthetic store for tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL