compose

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package compose provides higher-order monoids that combine simpler monoids:

  • Map[K]V — merge two maps by Combining values per key
  • Tuple{A,B} — merge two paired values by Combining each component

These let pipelines emit per-event multi-attribute aggregations without separate pipelines per attribute. Example: counting both views *and* unique users per page in one pipeline whose value type is Tuple[int64, []byte] (Sum on the left, HLL on the right).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DecayedBytes

func DecayedBytes(amount float64, t time.Time) []byte

DecayedBytes lifts a single (amount, time) observation to the wire form expected by DecayedSumBytes pipelines. Pipelines call this from their value extractor:

Value(func(e Event) []byte {
    return compose.DecayedBytes(weight(e), e.At)
})

func DecayedBytesNow

func DecayedBytesNow(amount float64) []byte

DecayedBytesNow is DecayedBytes(amount, time.Now()).

func DecayedSum

func DecayedSum(halfLife time.Duration) monoid.Monoid[Decayed]

DecayedSum returns a monoid that decays older contributions toward the latest timestamp before summing. halfLife controls how fast contributions fade — pass 24*time.Hour for a "last day matters most" feel; 1h for "last hour matters most".

To insert an event at processing time, lift it via DecayedAt(amount, time.Now()). The streaming runtime hands the resulting Decayed value through Combine, the same pattern used by HLL.Single and TopK.SingleN.

func DecayedSumBytes

func DecayedSumBytes(halfLife time.Duration) monoid.Monoid[[]byte]

DecayedSumBytes wraps DecayedSum to operate on []byte values, suitable for plugging into pkg/state/dynamodb.NewBytesStore. Each Combine call decodes both sides, runs the typed Combine, and re-encodes; the cost is a few dozen ns per merge — negligible compared to a DDB round-trip.

The wire format is the same as EncodeDecayed / DecodeDecayed; queries can decode the bytes returned by GetWindow / GetRange and evaluate the score "as of now" via EvaluateAt(d, halfLife, time.Now()).

func EncodeDecayed

func EncodeDecayed(d Decayed) []byte

EncodeDecayed marshals a Decayed observation to its 17-byte wire form. Identity (Set=false) encodes as zeros, which is intentional — DDB `attribute_not_exists` reads return that shape and DecodeDecayed maps it back to Identity.

func EvaluateAt

func EvaluateAt(d Decayed, halfLife time.Duration, t time.Time) float64

EvaluateAt returns the value of d evaluated at time t. If t is in the future relative to d.T, the value is decayed forward; if t is in the past, the value is "scaled up" (rarely useful, supplied for completeness). Use this from the query layer when "the value as of now" matters more than the stored reference time.

Returns 0 for an unset Decayed.

func MapMerge

func MapMerge[K comparable, V any](m monoid.Monoid[V]) monoid.Monoid[map[K]V]

MapMerge returns a monoid that merges map[K]V values by Combining matching keys via the inner monoid m, taking the union of the two key sets.

Identity is a nil map; backends that materialize this on first write should treat nil and empty as equivalent. Combine is associative iff the inner monoid m is associative.

func TupleMonoid2

func TupleMonoid2[A, B any](ma monoid.Monoid[A], mb monoid.Monoid[B]) monoid.Monoid[Tuple2[A, B]]

TupleMonoid2 returns a monoid that merges Tuple2[A,B] componentwise via the inner monoids ma and mb. Useful for pipelines that aggregate multiple metrics in lockstep per key (e.g., view count + unique-visitor HLL).

Types

type Decayed

type Decayed struct {
	// Value is the current decayed sum at time T.
	Value float64
	// T is the reference timestamp (Unix nanoseconds).
	T int64
	// Set is true when this observation carries a real value; false for Identity.
	Set bool
}

Decayed is a (value, time) observation under exponential decay. Combine takes the most recent timestamp's reference frame and decays the older value forward to it before adding. With an appropriate half-life, this implements time-weighted moving sums and averages without windowed bucketing.

Mathematically: Combine((v_a, t_a), (v_b, t_b)) where t_b ≥ t_a is

(v_a * 2^(-(t_b - t_a)/halfLife) + v_b, t_b)

Identity is the unset Decayed; the Set flag distinguishes "no value yet" from a legitimate (0, t=0) observation. This preserves the identity law: Combine(Identity, x) == x for all x.

Associativity is exact in real arithmetic; in IEEE-754 floats it holds within ULP for typical inputs but is not bitwise.

func DecayedAt

func DecayedAt(amount float64, t time.Time) Decayed

DecayedAt builds a Decayed observation for use as a per-event delta. amount is the raw contribution at time t. The returned value has Set=true so it round-trips through Combine(Identity, ...) correctly.

func DecayedNow

func DecayedNow(amount float64) Decayed

DecayedNow is equivalent to DecayedAt(amount, time.Now()).

func DecodeDecayed

func DecodeDecayed(b []byte) Decayed

DecodeDecayed parses a 17-byte wire form back into a Decayed. Returns Identity (Set=false) on a short or empty input — both are treated as "no observation yet."

type Tuple2

type Tuple2[A, B any] struct {
	A A
	B B
}

Tuple2 is a paired value with two components.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL