recentlyinteracted

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

README

Example: recently-interacted Top-N (Kinesis + Kafka, one pipeline)

A Murmur pipeline that maintains a daily-windowed Top-N "recently interacted entities" sketch fed by two sources at once:

  • Kinesis, consumed via an AWS Lambda Kinesis trigger (cmd/lambda). Operationally cheap path for managed analytics ingest (Segment-style event buses, AWS-native event sources). Lambda owns shard polling, fan-out, and partial-batch retry.
  • Kafka (or MSK), consumed by a long-running ECS Fargate streaming worker (cmd/worker). Right fit for internal application events / CDC streams.

Both binaries share one pipeline.go definition and write to the same DynamoDB row. The Misra-Gries Combine collapses duplicates correctly across sources, so a single query against cmd/query returns the merged Top-N over all incoming interactions, regardless of which channel they came in on.

Why two sources?

Real systems frequently have one event channel for managed third-party data (Segment → Kinesis is the canonical case) and a separate channel for internal events (a Kafka topic). Murmur's structural-monoid pipeline DSL is execution-mode-agnostic: the same Aggregate(topk.New(K), windowed.Daily(...)) runs unchanged behind both drivers. Single source of truth, two ingest paths.

Schema

type Interaction struct {
    EntityID string  // ranked thing — product ID, content ID, etc.
    UserID   string  // actor (carried through; not part of the agg key)
    Source   string  // "kinesis" or "kafka" — diagnostics only
}

Pipeline (shared between all three binaries)

pipeline.NewPipeline[Interaction, []byte]("recently_interacted").
    Key(func(Interaction) string { return "global" }).
    Value(func(e Interaction) []byte {
        return topk.SingleN(K, e.EntityID, 1)
    }).
    Aggregate(topk.New(K), windowed.Daily(30*24*time.Hour)).
    StoreIn(dynamodb.NewBytesStore(...))

The Lambda binary leaves Source unset (Lambda owns polling). The Kafka worker calls AttachKafkaSource(pipe, cfg) to attach a franz-go source.

Run locally

Stand up dependencies:

cd ../..
docker compose up -d kafka dynamodb-local

Create the DDB tables (first time only):

aws --endpoint-url=http://localhost:8000 dynamodb create-table \
    --table-name recently_interacted \
    --attribute-definitions AttributeName=pk,AttributeType=S AttributeName=sk,AttributeType=N \
    --key-schema AttributeName=pk,KeyType=HASH AttributeName=sk,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST

aws --endpoint-url=http://localhost:8000 dynamodb create-table \
    --table-name recently_interacted_dedup \
    --attribute-definitions AttributeName=pk,AttributeType=S \
    --key-schema AttributeName=pk,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST

Run the Kafka worker:

export DDB_ENDPOINT=http://localhost:8000
export DDB_DEDUP_TABLE=recently_interacted_dedup
go run ./examples/recently-interacted-topk/cmd/worker

The Lambda binary is meant for AWS — locally, you can drive it via sam local invoke with a synthetic Kinesis event payload, or run the test under ./test/... which exercises the same NewKinesisHandler against a fake DynamoDB.

Run the query server in a third terminal:

go run ./examples/recently-interacted-topk/cmd/query

Then query the merged Top-N:

# All time (single bucket if non-windowed)
grpcurl -plaintext -d '{"entity":"global"}' \
    localhost:50051 murmur.v1.QueryService/Get

# Last 7 days (merges 7 daily Misra-Gries summaries)
grpcurl -plaintext -d '{"entity":"global","duration_seconds":604800}' \
    localhost:50051 murmur.v1.QueryService/GetWindow

The response's data is a serialized Misra-Gries summary (raw bytes); decode via pkg/monoid/sketch/topk.Decode, or use the embedded admin UI which renders the items + counts directly.

Production deployment

  • The Lambda binary builds for provided.al2 / arm64 — see the comment block in cmd/lambda/main.go for the exact go build invocation.
  • Configure the event-source mapping with FunctionResponseTypes=["ReportBatchItemFailures"] so failed records (after the in-handler retry budget) are isolated rather than redelivering the whole batch.
  • The Kafka worker runs as a standard Murmur ECS Fargate service.
  • Both processes use the same DDB_TABLE and DDB_DEDUP_TABLE. The dedup table is strongly recommended in production — Lambda BatchItemFailures may redeliver records adjacent to a failure, and Kafka rebalances may re-emit unacked records. Without dedup, a redelivery spuriously increments the Misra-Gries count.

Documentation

Overview

Package recentlyinteracted defines a TopK "recently interacted" pipeline fed by TWO sources at once: a Segment-style Kinesis stream (consumed via an AWS Lambda trigger) and an internal Kafka topic (consumed by a long-lived ECS streaming worker). Both drivers share a single pipeline definition and a single DynamoDB-backed TopK state — the result is a unified Top-N of the most-interacted entities across the two channels, queryable through the auto-generated query service.

Why two sources?

  • Kinesis is the canonical destination for managed analytics ingest (Segment, AWS-native event buses); Lambda's Kinesis trigger is the operationally cheap path for it (per-shard fan-out, BatchItemFailures for partial-batch retry).
  • Kafka is the canonical destination for internal service events (CDC streams, application events). A long-running ECS worker with franz-go is the right fit — it's cheap to keep open and Kafka offset commits behave nicely with at-least-once dedup.

Murmur's pipeline DSL is execution-mode-agnostic, so the same TopK definition (Misra-Gries summary, daily windowing, DDB BytesStore) can run behind both at once with no separate query-time merge logic — both writers MergeUpdate against the same DDB row, and the Misra-Gries Combine collapses duplicates correctly across sources.

Index

Constants

View Source
const PipelineName = "recently_interacted"

PipelineName is the canonical pipeline identifier — surfaces in metrics, the admin UI, and the auto-generated query service.

Variables

This section is empty.

Functions

func AttachKafkaSource

func AttachKafkaSource(pipe *pipeline.Pipeline[Interaction, []byte], cfg Config) (*pipeline.Pipeline[Interaction, []byte], error)

AttachKafkaSource wires a franz-go Kafka source into pipe. Used by the ECS worker binary; the Lambda binary leaves Source nil because Lambda owns shard polling.

Returns the pipeline (chained for fluent style) so the caller can pass it directly to streaming.Run.

func Build

Build constructs the shared pipeline. The driver-specific source is wired in by the caller: the Lambda binary leaves Source nil (Lambda owns polling), the Kafka worker binary calls AttachKafkaSource on the returned pipeline.

Returns the pipeline, the underlying state store (so the caller can Close it on shutdown), and an optional Deduper (nil when DDBDedupTable is empty).

Types

type Config

type Config struct {
	// DynamoDB
	DDBEndpoint string // empty for real AWS; "http://localhost:8000" for dynamodb-local
	DDBTable    string // TopK byte-store table
	DDBRegion   string

	// Optional dedup table, shared across both drivers. Strongly recommended
	// in production: Lambda BatchItemFailures may redeliver records adjacent
	// to a failure, and Kafka rebalances may re-emit unacked records.
	DDBDedupTable string
	DedupTTL      time.Duration

	// Kafka (live source — used only by the ECS worker binary)
	KafkaBrokers  string // comma-separated
	KafkaTopic    string
	ConsumerGroup string

	// TopK parameters
	K               uint32        // sketch size (memory ~K; default 32 if zero)
	WindowRetention time.Duration // daily-bucket retention (default 30d)
}

Config bundles deployment-time settings. The Lambda binary uses a subset (DDB + dedup + TopK params); the ECS worker also needs Kafka settings.

type Interaction

type Interaction struct {
	// EntityID is the thing being interacted with (a product ID, a content
	// ID, etc.). The TopK sketch ranks entities by interaction frequency.
	EntityID string `json:"entity_id"`

	// UserID is the actor; carried through for diagnostics but not used by
	// the aggregation key.
	UserID string `json:"user_id"`

	// Source is "kinesis" or "kafka" — useful for breaking down "where the
	// interaction came from" in downstream analytics. Not part of the
	// aggregation key.
	Source string `json:"source,omitempty"`
}

Interaction is the cross-source event shape. Both Kinesis (Segment-emitted) and Kafka (internal) producers normalize their payloads to this shape — in production you'd usually keep a separate per-source struct and project both into a common Interaction in your decoder. For the example we keep it simple.

Directories

Path Synopsis
cmd
lambda command
Lambda binary for the recently-interacted-topk example.
Lambda binary for the recently-interacted-topk example.
query command
Connect-RPC query server for the recently-interacted-topk example.
Connect-RPC query server for the recently-interacted-topk example.
worker command
Streaming worker binary for the recently-interacted-topk example.
Streaming worker binary for the recently-interacted-topk example.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL