Documentation
¶
Overview ¶
Package recentlyinteracted defines a TopK "recently interacted" pipeline fed by TWO sources at once: a Segment-style Kinesis stream (consumed via an AWS Lambda trigger) and an internal Kafka topic (consumed by a long-lived ECS streaming worker). Both drivers share a single pipeline definition and a single DynamoDB-backed TopK state — the result is a unified Top-N of the most-interacted entities across the two channels, queryable through the auto-generated query service.
Why two sources?
- Kinesis is the canonical destination for managed analytics ingest (Segment, AWS-native event buses); Lambda's Kinesis trigger is the operationally cheap path for it (per-shard fan-out, BatchItemFailures for partial-batch retry).
- Kafka is the canonical destination for internal service events (CDC streams, application events). A long-running ECS worker with franz-go is the right fit — it's cheap to keep open and Kafka offset commits behave nicely with at-least-once dedup.
Murmur's pipeline DSL is execution-mode-agnostic, so the same TopK definition (Misra-Gries summary, daily windowing, DDB BytesStore) can run behind both at once with no separate query-time merge logic — both writers MergeUpdate against the same DDB row, and the Misra-Gries Combine collapses duplicates correctly across sources.
Index ¶
Constants ¶
const PipelineName = "recently_interacted"
PipelineName is the canonical pipeline identifier — surfaces in metrics, the admin UI, and the auto-generated query service.
Variables ¶
This section is empty.
Functions ¶
func AttachKafkaSource ¶
func AttachKafkaSource(pipe *pipeline.Pipeline[Interaction, []byte], cfg Config) (*pipeline.Pipeline[Interaction, []byte], error)
AttachKafkaSource wires a franz-go Kafka source into pipe. Used by the ECS worker binary; the Lambda binary leaves Source nil because Lambda owns shard polling.
Returns the pipeline (chained for fluent style) so the caller can pass it directly to streaming.Run.
func Build ¶
func Build(ctx context.Context, cfg Config) (*pipeline.Pipeline[Interaction, []byte], state.Store[[]byte], state.Deduper, error)
Build constructs the shared pipeline. The driver-specific source is wired in by the caller: the Lambda binary leaves Source nil (Lambda owns polling), the Kafka worker binary calls AttachKafkaSource on the returned pipeline.
Returns the pipeline, the underlying state store (so the caller can Close it on shutdown), and an optional Deduper (nil when DDBDedupTable is empty).
Types ¶
type Config ¶
type Config struct {
// DynamoDB
DDBEndpoint string // empty for real AWS; "http://localhost:8000" for dynamodb-local
DDBTable string // TopK byte-store table
DDBRegion string
// Optional dedup table, shared across both drivers. Strongly recommended
// in production: Lambda BatchItemFailures may redeliver records adjacent
// to a failure, and Kafka rebalances may re-emit unacked records.
DDBDedupTable string
DedupTTL time.Duration
// Kafka (live source — used only by the ECS worker binary)
KafkaBrokers string // comma-separated
KafkaTopic string
ConsumerGroup string
// TopK parameters
K uint32 // sketch size (memory ~K; default 32 if zero)
WindowRetention time.Duration // daily-bucket retention (default 30d)
}
Config bundles deployment-time settings. The Lambda binary uses a subset (DDB + dedup + TopK params); the ECS worker also needs Kafka settings.
type Interaction ¶
type Interaction struct {
// EntityID is the thing being interacted with (a product ID, a content
// ID, etc.). The TopK sketch ranks entities by interaction frequency.
EntityID string `json:"entity_id"`
// UserID is the actor; carried through for diagnostics but not used by
// the aggregation key.
UserID string `json:"user_id"`
// Source is "kinesis" or "kafka" — useful for breaking down "where the
// interaction came from" in downstream analytics. Not part of the
// aggregation key.
Source string `json:"source,omitempty"`
}
Interaction is the cross-source event shape. Both Kinesis (Segment-emitted) and Kafka (internal) producers normalize their payloads to this shape — in production you'd usually keep a separate per-source struct and project both into a common Interaction in your decoder. For the example we keep it simple.
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
lambda
command
Lambda binary for the recently-interacted-topk example.
|
Lambda binary for the recently-interacted-topk example. |
|
query
command
Connect-RPC query server for the recently-interacted-topk example.
|
Connect-RPC query server for the recently-interacted-topk example. |
|
worker
command
Streaming worker binary for the recently-interacted-topk example.
|
Streaming worker binary for the recently-interacted-topk example. |