rohyperloglog

package module
v0.0.0-...-a6ee939 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

README

HyperLogLog Plugin

The HyperLogLog plugin provides operators for approximate distinct counting using the HyperLogLog algorithm.

Installation

go get github.com/samber/ro/plugins/hyperloglog

Usage

package main

import (
    rohyperloglog "github.com/samber/ro/plugins/hyperloglog"
    "github.com/samber/ro"
)

func main() {
    // Count distinct strings
    observable := ro.Just("a", "b", "a", "c", "b", "a")
    count := observable.Pipe(
        rohyperloglog.CountDistinct[string](
            8,
            true,
            rohyperloglog.StringHash.FNV64a()
        ),
    )
    
    fmt.Printf("Distinct count: %d\n", count) // Output: 3
}

Hash Algorithm Benchmarks

The plugin includes comprehensive benchmarks for all hash algorithms. Results show performance and memory usage for both string and bytes hashing.

Running Benchmarks

To run all benchmarks:

cd plugins/hyperloglog
go test -bench=. -benchmem

To run specific benchmark suites:

# String hashers
go test -bench=BenchmarkStringHashers -benchmem

# Bytes hashers  
go test -bench=BenchmarkBytesHashers -benchmem
Sample Results

Recent benchmark results (Apple M3, Go 1.24):

String Hashers (1000 items per iteration):

  • MapHash: ~122,622 ops/sec, 0 B/op
  • Adler32: ~110,823 ops/sec, 0 B/op
  • Loselose: ~89,503 ops/sec, 0 B/op
  • CRC32: ~65,228 ops/sec, 48KB/op
  • DJB2: ~57,729 ops/sec, 0 B/op
  • FNV64a: ~20,853 ops/sec, 56KB/op

Bytes Hashers (1000 items per iteration):

  • CRC32: ~209,274 ops/sec, 0 B/op
  • DJB2: ~190,988 ops/sec, 0 B/op
  • Adler32: ~186,105 ops/sec, 0 B/op
  • Loselose: ~174,127 ops/sec, 0 B/op
  • Jenkins: ~147,613 ops/sec, 0 B/op
  • SDBM: ~139,095 ops/sec, 0 B/op
Collision Analysis

Theoretical collision probabilities based on hash size and birthday paradox:

64-bit Hashes (FNV64a, FNV64, CRC64, MapHash):

  • 50% collision probability: ~5.1 billion items (2^32.5)
  • 1% collision probability: ~671 million items
  • 0.1% collision probability: ~67 million items

32-bit Hashes (FNV32a, FNV32, CRC32, Adler32, Jenkins, DJB2, SDBM):

  • 50% collision probability: ~77,000 items (2^16.5)
  • 1% collision probability: ~9,300 items
  • 0.1% collision probability: ~930 items

Cryptographic Hashes (SHA256, SHA512, SHA1, MD5):

  • SHA256 (256-bit): 50% collision probability at ~2^128 items
  • SHA512 (512-bit): 50% collision probability at ~2^256 items
  • SHA1 (160-bit): 50% collision probability at ~2^80 items
  • MD5 (128-bit): 50% collision probability at ~2^64 items

Loselose Hash:

  • Extremely poor collision resistance - known to have massive collision rates
  • In practice: 56% collision rate on 1000 unique strings, 33% on 1000 unique bytes
  • Avoid for any serious application
Recommendations
  • Fastest: MapHash and Loselose for strings, CRC32 and Loselose for bytes
  • Best Balance: FNV64a offers good speed and collision resistance (64-bit space)
  • Cryptographic Security: SHA256/SHA512 for security-critical applications
  • Avoid: Loselose has extremely poor collision resistance
  • General Purpose: FNV64a or MapHash for most use cases

Documentation

Overview

Example (CompareDifferentHashers)
// Compare different hash functions on the same data
data := ro.Just("alice", "bob", "charlie", "alice", "bob", "david")

// Using FNV-1a
fnvObservable := ro.Pipe1(data, CountDistinct[string](8, false, StringHash.FNV64a()))
fnvSub := fnvObservable.Subscribe(ro.NewObserver(
	func(value uint64) { fmt.Printf("FNV-1a: %d\n", value) },
	func(err error) { fmt.Printf("FNV-1a error: %v\n", err) },
	func() { fmt.Println("FNV-1a completed") },
))
defer fnvSub.Unsubscribe()

// Using SHA-256
shaObservable := ro.Pipe1(data, CountDistinct[string](8, false, StringHash.SHA256()))
shaSub := shaObservable.Subscribe(ro.NewObserver(
	func(value uint64) { fmt.Printf("SHA-256: %d\n", value) },
	func(err error) { fmt.Printf("SHA-256 error: %v\n", err) },
	func() { fmt.Println("SHA-256 completed") },
))
defer shaSub.Unsubscribe()
Output:

FNV-1a: 4
FNV-1a completed
SHA-256: 4
SHA-256 completed
Example (UsingAllStringHashers)
// Demonstrate all available string hash functions
data := ro.Just("alice", "bob", "charlie", "alice", "bob", "david")

hashers := []struct {
	name string
	hash func(string) uint64
}{
	{"FNV64a", StringHash.FNV64a()},
	{"FNV64", StringHash.FNV64()},
	{"FNV32a", StringHash.FNV32a()},
	{"FNV32", StringHash.FNV32()},
	{"SHA256", StringHash.SHA256()},
	{"SHA1", StringHash.SHA1()},
	{"SHA512", StringHash.SHA512()},
	{"MD5", StringHash.MD5()},
	// Note: MapHash is non-deterministic and results may vary
	{"MapHash", StringHash.MapHash()},
}

for _, h := range hashers {
	observable := ro.Pipe1(data, CountDistinct[string](8, false, h.hash))
	subscription := observable.Subscribe(ro.NewObserver(
		func(value uint64) { fmt.Printf("%s: %d\n", h.name, value) },
		func(err error) { fmt.Printf("%s error: %v\n", h.name, err) },
		func() { fmt.Printf("%s completed\n", h.name) },
	))
	defer subscription.Unsubscribe()
}
Output:

FNV64a: 4
FNV64a completed
FNV64: 4
FNV64 completed
FNV32a: 1
FNV32a completed
FNV32: 1
FNV32 completed
SHA256: 4
SHA256 completed
SHA1: 4
SHA1 completed
SHA512: 4
SHA512 completed
MD5: 4
MD5 completed
MapHash: 6
MapHash completed

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// StringHash provides hash functions for strings
	StringHash = StringHashers{}
	// BytesHash provides hash functions for byte slices
	BytesHash = BytesHashers{}
)

Convenience variables for easy access

View Source
var ErrInvalidPrecision = fmt.Errorf("rohyperloglog.CountDistinct: precision has to be >= 4 and <= 18")

Functions

func CountDistinct

func CountDistinct[T comparable](precision uint8, sparse bool, hashFunc func(input T) uint64) func(ro.Observable[T]) ro.Observable[uint64]
Example
// Count distinct strings using hyperloglog
observable := ro.Pipe1(
	ro.Just("alice", "bob", "charlie", "alice", "bob", "david"),
	CountDistinct[string](8, false, func(input string) uint64 {
		h := fnv.New64a()
		h.Write([]byte(input))
		return h.Sum64()
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[uint64]())
defer subscription.Unsubscribe()
Output:

Next: 4
Completed
Example (WithError)
// Count distinct with potential errors
// Note: This demonstrates what happens with invalid precision

// This will panic when the operator is created
defer func() {
	if r := recover(); r != nil {
		fmt.Println("Panic caught:", r)
	}
}()

_ = CountDistinct[string](20, false, func(input string) uint64 {
	// Invalid precision (should be 4-18)
	h := fnv.New64a()
	h.Write([]byte(input))
	return h.Sum64()
})
Output:

Panic caught: rohyperloglog.CountDistinct: precision has to be >= 4 and <= 18
Example (WithLargeDataset)
// Count distinct in a large dataset
observable := ro.Pipe1(
	ro.Just("user1", "user2", "user3", "user4", "user5", "user6", "user7", "user8", "user9", "user10"),
	CountDistinct[string](16, false, func(input string) uint64 {
		h := fnv.New64a()
		h.Write([]byte(input))
		return h.Sum64()
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[uint64]())
defer subscription.Unsubscribe()
Output:

Next: 2
Completed
Example (WithPrecision)
// Count distinct with different precision levels
observable := ro.Pipe1(
	ro.Just("a", "b", "c", "d", "e", "f", "g", "h", "i", "j"),
	CountDistinct[string](4, false, func(input string) uint64 {
		h := fnv.New64a()
		h.Write([]byte(input))
		return h.Sum64()
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[uint64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Completed
Example (WithSHA256)
// Count distinct using SHA256 hash
observable := ro.Pipe1(
	ro.Just("alice", "bob", "charlie", "alice", "bob", "david"),
	CountDistinct[string](12, false, func(input string) uint64 {
		hash := sha256.Sum256([]byte(input))
		return uint64(hash[0])<<56 | uint64(hash[1])<<48 | uint64(hash[2])<<40 | uint64(hash[3])<<32 |
			uint64(hash[4])<<24 | uint64(hash[5])<<16 | uint64(hash[6])<<8 | uint64(hash[7])
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[uint64]())
defer subscription.Unsubscribe()
Output:

Next: 4
Completed
Example (WithSparse)
// Count distinct with sparse representation
observable := ro.Pipe1(
	ro.Just("alice", "bob", "charlie", "david", "eve"),
	CountDistinct[string](8, true, func(input string) uint64 {
		h := fnv.New64a()
		h.Write([]byte(input))
		return h.Sum64()
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[uint64]())
defer subscription.Unsubscribe()
Output:

Next: 5
Completed
Example (WithStructs)
// Count distinct structs using hyperloglog
type User struct {
	ID   int
	Name string
}

observable := ro.Pipe1(
	ro.Just(
		User{ID: 1, Name: "Alice"},
		User{ID: 2, Name: "Bob"},
		User{ID: 1, Name: "Alice"}, // Duplicate
		User{ID: 3, Name: "Charlie"},
		User{ID: 2, Name: "Bob"}, // Duplicate
	),
	CountDistinct[User](10, false, func(input User) uint64 {
		h := fnv.New64a()
		h.Write([]byte(input.Name))
		return h.Sum64()
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[uint64]())
defer subscription.Unsubscribe()
Output:

Next: 3
Completed

func CountDistinctReduce

func CountDistinctReduce[T comparable](precision uint8, sparse bool, hashFunc func(input T) uint64) func(ro.Observable[T]) ro.Observable[uint64]
Example
// Count distinct with incremental updates
observable := ro.Pipe1(
	ro.Just("alice", "bob", "charlie", "alice", "bob", "david"),
	CountDistinctReduce[string](8, false, func(input string) uint64 {
		h := fnv.New64a()
		h.Write([]byte(input))
		return h.Sum64()
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[uint64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 3
Next: 3
Next: 4
Completed
Example (WithStringHash)
// Count distinct with incremental updates using StringHash
observable := ro.Pipe1(
	ro.Just("alice", "bob", "charlie", "alice", "bob", "david"),
	CountDistinctReduce[string](8, false, StringHash.FNV64a()),
)

subscription := observable.Subscribe(ro.PrintObserver[uint64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 3
Next: 3
Next: 3
Next: 4
Completed
Example (WithStructs)
// Count distinct structs with incremental updates
type User struct {
	ID   int
	Name string
}

observable := ro.Pipe1(
	ro.Just(
		User{ID: 1, Name: "Alice"},
		User{ID: 2, Name: "Bob"},
		User{ID: 1, Name: "Alice"}, // Duplicate
		User{ID: 3, Name: "Charlie"},
		User{ID: 2, Name: "Bob"}, // Duplicate
	),
	CountDistinctReduce[User](10, false, func(input User) uint64 {
		h := fnv.New64a()
		h.Write([]byte(input.Name))
		return h.Sum64()
	}),
)

subscription := observable.Subscribe(ro.PrintObserver[uint64]())
defer subscription.Unsubscribe()
Output:

Next: 1
Next: 2
Next: 2
Next: 3
Next: 3
Completed

Types

type BytesHashers

type BytesHashers struct{}

BytesHashers provides common hash functions for byte slices

func (BytesHashers) Adler32

func (BytesHashers) Adler32() func([]byte) uint64

Adler32 returns a hash function using Adler-32 for byte slices

func (BytesHashers) CRC32

func (BytesHashers) CRC32() func([]byte) uint64

CRC32 returns a hash function using CRC-32 for byte slices

func (BytesHashers) CRC64

func (BytesHashers) CRC64() func([]byte) uint64

CRC64 returns a hash function using CRC-64 for byte slices

func (BytesHashers) DJB2

func (BytesHashers) DJB2() func([]byte) uint64

DJB2 returns a hash function using DJB2 algorithm for byte slices

func (BytesHashers) FNV32

func (BytesHashers) FNV32() func([]byte) uint64

FNV32 returns a hash function using FNV-1 32-bit for byte slices

func (BytesHashers) FNV32a

func (BytesHashers) FNV32a() func([]byte) uint64

FNV32a returns a hash function using FNV-1a 32-bit for byte slices

func (BytesHashers) FNV64

func (BytesHashers) FNV64() func([]byte) uint64

FNV64 returns a hash function using FNV-1 64-bit for byte slices

func (BytesHashers) FNV64a

func (BytesHashers) FNV64a() func([]byte) uint64

FNV64a returns a hash function using FNV-1a 64-bit for byte slices

func (BytesHashers) Jenkins

func (BytesHashers) Jenkins() func([]byte) uint64

Jenkins returns a hash function using Jenkins hash (one-at-a-time) for byte slices

func (BytesHashers) Loselose

func (BytesHashers) Loselose() func([]byte) uint64

Loselose returns a hash function using the "lose lose" algorithm for byte slices

func (BytesHashers) MD5

func (BytesHashers) MD5() func([]byte) uint64

MD5 returns a hash function using MD5 for byte slices

func (BytesHashers) MapHash

func (BytesHashers) MapHash() func([]byte) uint64

MapHash returns a hash function using maphash for byte slices

func (BytesHashers) SDBM

func (BytesHashers) SDBM() func([]byte) uint64

SDBM returns a hash function using SDBM algorithm for byte slices

func (BytesHashers) SHA1

func (BytesHashers) SHA1() func([]byte) uint64

SHA1 returns a hash function using SHA-1 for byte slices

func (BytesHashers) SHA256

func (BytesHashers) SHA256() func([]byte) uint64

SHA256 returns a hash function using SHA-256 for byte slices

func (BytesHashers) SHA512

func (BytesHashers) SHA512() func([]byte) uint64

SHA512 returns a hash function using SHA-512 for byte slices

type StringHashers

type StringHashers struct{}

StringHashers provides common hash functions for strings

func (StringHashers) Adler32

func (StringHashers) Adler32() func(string) uint64

Adler32 returns a hash function using Adler-32

func (StringHashers) CRC32

func (StringHashers) CRC32() func(string) uint64

CRC32 returns a hash function using CRC-32

func (StringHashers) CRC64

func (StringHashers) CRC64() func(string) uint64

CRC64 returns a hash function using CRC-64

func (StringHashers) DJB2

func (StringHashers) DJB2() func(string) uint64

DJB2 returns a hash function using DJB2 algorithm

func (StringHashers) FNV32

func (StringHashers) FNV32() func(string) uint64

FNV32 returns a hash function using FNV-1 32-bit

func (StringHashers) FNV32a

func (StringHashers) FNV32a() func(string) uint64

FNV32a returns a hash function using FNV-1a 32-bit

func (StringHashers) FNV64

func (StringHashers) FNV64() func(string) uint64

FNV64 returns a hash function using FNV-1 64-bit

func (StringHashers) FNV64a

func (StringHashers) FNV64a() func(string) uint64

FNV64a returns a hash function using FNV-1a 64-bit

func (StringHashers) Jenkins

func (StringHashers) Jenkins() func(string) uint64

Jenkins returns a hash function using Jenkins hash (one-at-a-time)

func (StringHashers) Loselose

func (StringHashers) Loselose() func(string) uint64

Loselose returns a hash function using the "lose lose" algorithm

func (StringHashers) MD5

func (StringHashers) MD5() func(string) uint64

MD5 returns a hash function using MD5

func (StringHashers) MapHash

func (StringHashers) MapHash() func(string) uint64

MapHash returns a hash function using maphash

func (StringHashers) SDBM

func (StringHashers) SDBM() func(string) uint64

SDBM returns a hash function using SDBM algorithm

func (StringHashers) SHA1

func (StringHashers) SHA1() func(string) uint64

SHA1 returns a hash function using SHA-1

func (StringHashers) SHA256

func (StringHashers) SHA256() func(string) uint64

SHA256 returns a hash function using SHA-256

func (StringHashers) SHA512

func (StringHashers) SHA512() func(string) uint64

SHA512 returns a hash function using SHA-512

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL