core

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package core provides the standard numeric and basic monoids: Sum, Count, Min, Max, First, Last, Set. These are structural — backend executors recognize their Kind and translate them to native operations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Count

func Count() monoid.Monoid[int64]

Count returns a monoid that counts events. Each event contributes 1; values are summed. Use with Pipeline's Value(func(T) int64 { return 1 }) or have the source emit unit values.

func First

func First[V any]() monoid.Monoid[Stamped[V]]

First returns a monoid that keeps the earliest-timestamped value seen. On ties, the first-arrived value wins (left side of Combine).

func Last

func Last[V any]() monoid.Monoid[Stamped[V]]

Last returns a monoid that keeps the latest-timestamped value seen. On ties, the most-recently-arrived value wins (right side of Combine).

func Max

func Max[V cmp.Ordered]() monoid.Monoid[Bounded[V]]

Max returns a monoid that tracks the running maximum of V. Identity is the unset Bounded[V]; this satisfies the monoid identity law for all V.

func Min

func Min[V cmp.Ordered]() monoid.Monoid[Bounded[V]]

Min returns a monoid that tracks the running minimum of V. Identity is the unset Bounded[V]; this satisfies the monoid identity law for all V.

Lift per-event observations via NewBounded(v) (or core.Bounded[V]{Value: v, Set: true}).

func Monotonic

func Monotonic[V cmp.Ordered](identity V) monoid.Monoid[V]

Monotonic returns a monoid where Combine takes the larger of its inputs by V's natural ordering. The caller supplies Identity explicitly because `cmp.Ordered` has no portable minimum.

When to use this vs. Max

`core.Max[V]()` is the right monoid for "the maximum observed value over the events seen so far" when each event contributes a single observation. Its value type is `Bounded[V]` (so Identity can mean "no observation yet"), which means callers lift each event via `NewBounded(v)`.

`Monotonic[V]` is the right monoid for the SetCountIfGreater pattern: upstream services emit ABSOLUTE values (e.g. "user X has 47 likes") and the pipeline must accept-only-if-greater so that an out-of-order earlier emission ("user X has 32 likes") doesn't roll the value backwards. Pair it with `pkg/state/dynamodb.Int64MaxStore` (DDB conditional UpdateItem `attribute_not_exists OR #v < :v`) for the canonical end-to-end shape: in-process Combine collapses a batch via `max`, the store performs the final conditional write.

pipe := pipeline.NewPipeline[Event, int64]("likes_total").
    From(src).
    Key(func(e Event) string { return "user:" + e.UserID }).
    Value(func(e Event) int64 { return e.AbsoluteCount }).
    Aggregate(core.Monotonic[int64](math.MinInt64)).
    StoreIn(maxStore)

Why caller-supplied Identity

For `cmp.Ordered`, there's no portable "minimum" value. For signed integers it's `math.MinInt*`; for unsigned, `0`; for `string`, `""`; for `time.Time`, the zero `Time{}`. Rather than hard-code one or require a new constraint, callers pass the appropriate min explicitly. In Int64MaxStore-backed pipelines Identity is largely vestigial — the DDB conditional handles the "no value yet" case via `attribute_not_exists` — so even an obviously-wrong Identity value would still produce correct stored state, but the in-process `Combine(Identity, x) = x` law fails if you pick wrong.

func Set

func Set[V comparable]() monoid.Monoid[map[V]struct{}]

Set returns a monoid that unions sets of comparable elements. The set representation is a Go map[V]struct{}. For high-cardinality unique-element tracking prefer the HLL sketch monoid, which trades exactness for bounded memory.

func Sum

func Sum[V Numeric]() monoid.Monoid[V]

Sum returns a monoid that adds values. Combine is +, Identity is the zero value.

Note: this is a true monoid for V where the zero value is the additive identity — that holds for all `Numeric` types in Go.

Types

type Bounded

type Bounded[V any] struct {
	Value V
	Set   bool
}

Bounded wraps a value V with a Set flag, used by Min/Max so that Identity can be represented as "no value yet" rather than the zero value of V (which would otherwise break the monoid identity law for any V where the zero is a legitimate input).

Use NewBounded to lift a single observation; the Set flag is then true. The zero Bounded[V] is Identity for both Min and Max.

func NewBounded

func NewBounded[V any](v V) Bounded[V]

NewBounded lifts a single observation into a Bounded wrapper.

type Numeric

type Numeric interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 |
		~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
		~float32 | ~float64
}

Numeric is the constraint for monoids over orderable numeric values.

type Stamped

type Stamped[V any] struct {
	Time  int64 // Unix nanoseconds
	Value V
	// Set indicates whether this Stamped carries a real value. The zero Stamped is the
	// identity for First/Last and must be distinguishable from a real event at t=0.
	Set bool
}

Stamped wraps a value V with an event time. First and Last operate on Stamped[V].

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL