Documentation
¶
Overview ¶
Package compose provides higher-order monoids that combine simpler monoids:
- Map[K]V — merge two maps by Combining values per key
- Tuple{A,B} — merge two paired values by Combining each component
These let pipelines emit per-event multi-attribute aggregations without separate pipelines per attribute. Example: counting both views *and* unique users per page in one pipeline whose value type is Tuple[int64, []byte] (Sum on the left, HLL on the right).
Index ¶
- func DecayedBytes(amount float64, t time.Time) []byte
- func DecayedBytesNow(amount float64) []byte
- func DecayedSum(halfLife time.Duration) monoid.Monoid[Decayed]
- func DecayedSumBytes(halfLife time.Duration) monoid.Monoid[[]byte]
- func EncodeDecayed(d Decayed) []byte
- func EvaluateAt(d Decayed, halfLife time.Duration, t time.Time) float64
- func MapMerge[K comparable, V any](m monoid.Monoid[V]) monoid.Monoid[map[K]V]
- func TupleMonoid2[A, B any](ma monoid.Monoid[A], mb monoid.Monoid[B]) monoid.Monoid[Tuple2[A, B]]
- type Decayed
- type Tuple2
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DecayedBytes ¶
DecayedBytes lifts a single (amount, time) observation to the wire form expected by DecayedSumBytes pipelines. Pipelines call this from their value extractor:
Value(func(e Event) []byte {
return compose.DecayedBytes(weight(e), e.At)
})
func DecayedBytesNow ¶
DecayedBytesNow is DecayedBytes(amount, time.Now()).
func DecayedSum ¶
DecayedSum returns a monoid that decays older contributions toward the latest timestamp before summing. halfLife controls how fast contributions fade — pass 24*time.Hour for a "last day matters most" feel; 1h for "last hour matters most".
To insert an event at processing time, lift it via DecayedAt(amount, time.Now()). The streaming runtime hands the resulting Decayed value through Combine, the same pattern used by HLL.Single and TopK.SingleN.
func DecayedSumBytes ¶
DecayedSumBytes wraps DecayedSum to operate on []byte values, suitable for plugging into pkg/state/dynamodb.NewBytesStore. Each Combine call decodes both sides, runs the typed Combine, and re-encodes; the cost is a few dozen ns per merge — negligible compared to a DDB round-trip.
The wire format is the same as EncodeDecayed / DecodeDecayed; queries can decode the bytes returned by GetWindow / GetRange and evaluate the score "as of now" via EvaluateAt(d, halfLife, time.Now()).
func EncodeDecayed ¶
EncodeDecayed marshals a Decayed observation to its 17-byte wire form. Identity (Set=false) encodes as zeros, which is intentional — DDB `attribute_not_exists` reads return that shape and DecodeDecayed maps it back to Identity.
func EvaluateAt ¶
EvaluateAt returns the value of d evaluated at time t. If t is in the future relative to d.T, the value is decayed forward; if t is in the past, the value is "scaled up" (rarely useful, supplied for completeness). Use this from the query layer when "the value as of now" matters more than the stored reference time.
Returns 0 for an unset Decayed.
func MapMerge ¶
MapMerge returns a monoid that merges map[K]V values by Combining matching keys via the inner monoid m, taking the union of the two key sets.
Identity is a nil map; backends that materialize this on first write should treat nil and empty as equivalent. Combine is associative iff the inner monoid m is associative.
func TupleMonoid2 ¶
TupleMonoid2 returns a monoid that merges Tuple2[A,B] componentwise via the inner monoids ma and mb. Useful for pipelines that aggregate multiple metrics in lockstep per key (e.g., view count + unique-visitor HLL).
Types ¶
type Decayed ¶
type Decayed struct {
// Value is the current decayed sum at time T.
Value float64
// T is the reference timestamp (Unix nanoseconds).
T int64
// Set is true when this observation carries a real value; false for Identity.
Set bool
}
Decayed is a (value, time) observation under exponential decay. Combine takes the most recent timestamp's reference frame and decays the older value forward to it before adding. With an appropriate half-life, this implements time-weighted moving sums and averages without windowed bucketing.
Mathematically: Combine((v_a, t_a), (v_b, t_b)) where t_b ≥ t_a is
(v_a * 2^(-(t_b - t_a)/halfLife) + v_b, t_b)
Identity is the unset Decayed; the Set flag distinguishes "no value yet" from a legitimate (0, t=0) observation. This preserves the identity law: Combine(Identity, x) == x for all x.
Associativity is exact in real arithmetic; in IEEE-754 floats it holds within ULP for typical inputs but is not bitwise.
func DecayedAt ¶
DecayedAt builds a Decayed observation for use as a per-event delta. amount is the raw contribution at time t. The returned value has Set=true so it round-trips through Combine(Identity, ...) correctly.
func DecayedNow ¶
DecayedNow is equivalent to DecayedAt(amount, time.Now()).
func DecodeDecayed ¶
DecodeDecayed parses a 17-byte wire form back into a Decayed. Returns Identity (Set=false) on a short or empty input — both are treated as "no observation yet."