Versions in this module Expand all Collapse all v0 v0.1.0 May 12, 2026 Changes in this version + func DynamoDBStreamsHandler[T any, V any](p *pipeline.Pipeline[T, V], decode dynamodbstreams.Decoder[T], ...) (dynamodbstreams.Handler, error) + func KinesisHandler[T any, V any](p *pipeline.Pipeline[T, V], decode kinesis.Decoder[T], cfg LambdaConfig) (kinesis.Handler, error) + func MustHandler[H any](h H, err error) H + func RunStreamingWorker[T any, V any](ctx context.Context, p *pipeline.Pipeline[T, V], opts ...streaming.RunOption) int + func SQSHandler[T any, V any](p *pipeline.Pipeline[T, V], decode sqs.Decoder[T], cfg LambdaConfig) (sqs.Handler, error) + type CounterBuilder struct + func Counter[T any](name string) *CounterBuilder[T] + func (b *CounterBuilder[T]) Build() *pipeline.Pipeline[T, int64] + func (b *CounterBuilder[T]) Cache(c state.Cache[int64]) *CounterBuilder[T] + func (b *CounterBuilder[T]) Daily(retention time.Duration) *CounterBuilder[T] + func (b *CounterBuilder[T]) From(s source.Source[T]) *CounterBuilder[T] + func (b *CounterBuilder[T]) Hourly(retention time.Duration) *CounterBuilder[T] + func (b *CounterBuilder[T]) KeyBy(fn func(T) string) *CounterBuilder[T] + func (b *CounterBuilder[T]) KeyByMany(fn func(T) []string) *CounterBuilder[T] + func (b *CounterBuilder[T]) StoreIn(s state.Store[int64]) *CounterBuilder[T] + func (b *CounterBuilder[T]) Trailing(durations ...time.Duration) *CounterBuilder[T] + func (b *CounterBuilder[T]) TrailingWindows() []time.Duration + type LambdaConfig struct + Dedup state.Deduper + Logger *slog.Logger + Recorder metrics.Recorder + type TopNBuilder struct + func TopN[T any](name string, k uint32, elementFn func(T) string) *TopNBuilder[T] + func (b *TopNBuilder[T]) Build() *pipeline.Pipeline[T, []byte] + func (b *TopNBuilder[T]) Daily(retention time.Duration) *TopNBuilder[T] + func (b *TopNBuilder[T]) From(s source.Source[T]) *TopNBuilder[T] + func (b *TopNBuilder[T]) KeyBy(fn func(T) string) *TopNBuilder[T] + func (b *TopNBuilder[T]) StoreIn(s state.Store[[]byte]) *TopNBuilder[T] + type TrendingBuilder struct + func Trending[T any](name string, halfLife time.Duration) *TrendingBuilder[T] + func (b *TrendingBuilder[T]) Amount(fn func(T) float64) *TrendingBuilder[T] + func (b *TrendingBuilder[T]) Build() *pipeline.Pipeline[T, []byte] + func (b *TrendingBuilder[T]) Clock(now func() time.Time) *TrendingBuilder[T] + func (b *TrendingBuilder[T]) Daily(retention time.Duration) *TrendingBuilder[T] + func (b *TrendingBuilder[T]) From(s source.Source[T]) *TrendingBuilder[T] + func (b *TrendingBuilder[T]) Hourly(retention time.Duration) *TrendingBuilder[T] + func (b *TrendingBuilder[T]) KeyBy(fn func(T) string) *TrendingBuilder[T] + func (b *TrendingBuilder[T]) StoreIn(s state.Store[[]byte]) *TrendingBuilder[T] + type UniqueCountBuilder struct + func UniqueCount[T any](name string, elementFn func(T) []byte) *UniqueCountBuilder[T] + func (b *UniqueCountBuilder[T]) Build() *pipeline.Pipeline[T, []byte] + func (b *UniqueCountBuilder[T]) Daily(retention time.Duration) *UniqueCountBuilder[T] + func (b *UniqueCountBuilder[T]) From(s source.Source[T]) *UniqueCountBuilder[T] + func (b *UniqueCountBuilder[T]) Hourly(retention time.Duration) *UniqueCountBuilder[T] + func (b *UniqueCountBuilder[T]) KeyBy(fn func(T) string) *UniqueCountBuilder[T] + func (b *UniqueCountBuilder[T]) StoreIn(s state.Store[[]byte]) *UniqueCountBuilder[T]