Documentation
¶
Overview ¶
Package pipeline is the Murmur DSL surface — the entry point users touch when defining an aggregation. A Pipeline composes a Source, a key-extraction function, a value- extraction function, a structural Monoid, an optional windowing config, a primary State store, and an optional read Cache. Built pipelines are deployed by the runtime (pkg/exec/...) onto streaming/bootstrap/batch targets.
Type parameters:
T — the record type emitted by the Source V — the aggregation value type (e.g. int64 for counters; []byte for sketch state)
Aggregation keys are always strings (matching the DDB partition-key shape). Composite keys are the user's responsibility to encode — return e.PageID + "|" + e.Region from the key extractor.
Generics are explicit at NewPipeline. This trades some ergonomics for a single builder type rather than a tower of stage-typed builders. Refining the DSL surface is a Phase 2 design task once we have real users.
Index ¶
- Variables
- type Pipeline
- func (p *Pipeline[T, V]) Aggregate(m monoid.Monoid[V], opts ...windowed.Config) *Pipeline[T, V]
- func (p *Pipeline[T, V]) Build() error
- func (p *Pipeline[T, V]) Cache(c state.Cache[V]) *Pipeline[T, V]
- func (p *Pipeline[T, V]) CacheStore() state.Cache[V]
- func (p *Pipeline[T, V]) From(s source.Source[T]) *Pipeline[T, V]
- func (p *Pipeline[T, V]) Key(fn func(T) string) *Pipeline[T, V]
- func (p *Pipeline[T, V]) KeyByMany(fn func(T) []string) *Pipeline[T, V]
- func (p *Pipeline[T, V]) KeyFn() func(T) string
- func (p *Pipeline[T, V]) KeysFn() func(T) []string
- func (p *Pipeline[T, V]) Monoid() monoid.Monoid[V]
- func (p *Pipeline[T, V]) Name() string
- func (p *Pipeline[T, V]) Query() QueryConfig
- func (p *Pipeline[T, V]) ServeOn(q QueryConfig) *Pipeline[T, V]
- func (p *Pipeline[T, V]) Source() source.Source[T]
- func (p *Pipeline[T, V]) Store() state.Store[V]
- func (p *Pipeline[T, V]) StoreIn(s state.Store[V]) *Pipeline[T, V]
- func (p *Pipeline[T, V]) Value(fn func(T) V) *Pipeline[T, V]
- func (p *Pipeline[T, V]) ValueFn() func(T) V
- func (p *Pipeline[T, V]) Window() *windowed.Config
- type QueryConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrMissingSource = errors.New("pipeline: source not set (use From)") ErrMissingKeyFn = errors.New("pipeline: key function not set (use Key)") ErrMissingValueFn = errors.New("pipeline: value function not set (use Value)") ErrMissingMonoid = errors.New("pipeline: monoid not set (use Aggregate)") ErrMissingStore = errors.New("pipeline: state store not set (use StoreIn)") )
Errors returned by Build.
Functions ¶
This section is empty.
Types ¶
type Pipeline ¶
Pipeline is the assembled aggregation definition. Use the builder methods (From, Key, Value, Aggregate, StoreIn, Cache, ServeOn) to populate it, then Build to validate and freeze.
func NewPipeline ¶
NewPipeline begins constructing a pipeline with the given name. The name is used as the basis for state-table names, metrics labels, and the generated gRPC service name.
func (*Pipeline[T, V]) Aggregate ¶
Aggregate sets the structural monoid and any optional windowing config. Pass at most one *windowed.Config — additional configs are ignored.
func (*Pipeline[T, V]) Build ¶
Build validates the pipeline definition for fields required by every execution mode: key extractor (Key OR KeyByMany), value extractor, monoid, state store. The source is mode-specific (Live needs source.Source; Bootstrap needs snapshot.Source) and is checked by the runtime that consumes the pipeline.
func (*Pipeline[T, V]) Cache ¶
Cache sets an optional read cache + sketch-accelerator. Valkey is the typical choice. The cache is never source of truth; its contents are repopulatable from the primary Store at any time.
func (*Pipeline[T, V]) CacheStore ¶
CacheStore returns the optional cache (nil if none).
func (*Pipeline[T, V]) Key ¶
Key sets the function that extracts the aggregation key from an event. Composite keys should be encoded into a single string by the caller (e.g. "<page>|<region>").
Mutually exclusive with KeyByMany. If both are set, KeyByMany takes precedence — Key is ignored.
func (*Pipeline[T, V]) KeyByMany ¶
KeyByMany sets a multi-key extractor: each event contributes its value to every key returned by fn. Used for hierarchical rollups — one "like" event might contribute to:
[]string{
"post:" + e.PostID, // per-post total
"post:" + e.PostID + "|country:" + e.Country, // per-post-per-country
"country:" + e.Country, // per-country total
"global", // global total
}
Each emitted key triggers an independent state.MergeUpdate against the same store, so an N-level hierarchy costs N store writes per event. Cost-balance against your query patterns: explicit rollups are cheap to query (one read per level) but expensive to write; query-time rollups are the inverse.
Dedup is applied ONCE per event (against the EventID); on a duplicate, all N per-key merges are skipped together.
Mutually exclusive with Key.
func (*Pipeline[T, V]) KeyFn ¶
KeyFn returns the configured single-key extractor (nil if KeyByMany was used instead, or if no extractor was set).
func (*Pipeline[T, V]) KeysFn ¶
KeysFn returns a function that produces all keys an event should contribute to. If KeyByMany was set, returns it directly; otherwise wraps the single-key Key extractor in a 1-element slice. Returns nil if neither was set.
Runtimes (streaming.Run, lambda handlers) should call KeysFn rather than KeyFn so multi-key fanout works transparently.
func (*Pipeline[T, V]) Query ¶
func (p *Pipeline[T, V]) Query() QueryConfig
Query returns the gRPC query configuration.
func (*Pipeline[T, V]) ServeOn ¶
func (p *Pipeline[T, V]) ServeOn(q QueryConfig) *Pipeline[T, V]
ServeOn configures the auto-generated gRPC query service.
func (*Pipeline[T, V]) StoreIn ¶
StoreIn sets the primary state store. DynamoDB is the recommended default; this is the source of truth for aggregations.
func (*Pipeline[T, V]) Value ¶
Value sets the function that extracts the aggregation value from an event. For counters, this is typically `func(_ T) int64 { return 1 }`. For sketches that ingest arbitrary keys, the value function would derive the keying byte slice.
type QueryConfig ¶
type QueryConfig struct {
GRPCAddr string
HTTPAddr string // optional grpc-gateway HTTP/JSON listener (Phase 2)
}
QueryConfig describes how the auto-generated gRPC service should be served. Concrete generation lives in pkg/query/codegen.