clstr-go

module
v0.30.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT

README

clstr

A distributed event sourcing framework for Go, combining three powerful patterns into a cohesive toolkit for building scalable, event-driven systems.

Why clstr?

Building distributed systems is hard. You need event persistence, concurrent message handling, and cluster coordination—each complex on its own, even more so together. clstr provides production-ready implementations of these patterns that are designed to work seamlessly as a unit.

flowchart LR
    subgraph app["Your Application"]
        direction LR
        Cluster["Cluster<br/>(routing)"] --> Actor["Actor<br/>(processing)"] --> ES["Event Sourcing<br/>(persistence)"]
    end

The Three Pillars

Event Sourcing — Capture Every Change

Store your domain state as a sequence of immutable events rather than mutable records. This gives you:

  • Complete audit trail — Every state change is preserved forever
  • Time travel — Reconstruct state at any point in history
  • Event replay — Rebuild read models or fix bugs by replaying events
  • Optimistic concurrency — Conflict detection without locks
  • Snapshots — Performance optimization for aggregates with many events
// Define your aggregate
type Account struct {
    es.BaseAggregate
    balance int
}

func (a *Account) Deposit(amount int) {
    a.Raise(&Deposited{Amount: amount})
}

func (a *Account) Apply(event es.Event) {
    switch e := event.Data.(type) {
    case *Deposited:
        a.balance += e.Amount
    }
}

// Use it with the repository
repo := es.NewTypedRepositoryFrom[*Account](log, env.Repository())
account, _ := repo.GetByID(ctx, "acc-123")
account.Deposit(100)
repo.Save(ctx, account)
Actor Model — Scalable Concurrency

Process messages through isolated actors with mailbox-based delivery. Each actor handles one message at a time, eliminating race conditions:

  • Mailbox isolation — No shared state, no locks, no races
  • Type-safe handlers — Register handlers by message type with automatic JSON marshaling
  • Bounded concurrency — Control parallelism with configurable task schedulers
  • Panic containment — Failing handlers don't crash the system
  • Request/Response — Built-in synchronous communication between actors
// Define typed handlers
type GreetHandler struct{}

func (h *GreetHandler) Handle(ctx *actor.HandlerCtx, msg *Greet) (*Greeting, error) {
    return &Greeting{Message: "Hello, " + msg.Name}, nil
}

// Create an actor with handlers
a := actor.New(
    actor.WithHandler[*Greet, *Greeting](&GreetHandler{}),
)
a.Start(ctx)

// Send messages
response, _ := actor.Request[*Greeting](ctx, a, &Greet{Name: "World"})
Cluster — Distributed Coordination

Route requests to the right node using consistent hashing. Scale horizontally by adding nodes:

  • Consistent hashing — Deterministic key-to-shard mapping with minimal reshuffling
  • Smart routing — Clients automatically route to the correct node
  • Topology awareness — Nodes know about each other and their shard ownership
  • Transport abstraction — Swap between in-memory (testing) and NATS (production)
// Create a cluster node
node := cluster.NewNode(
    cluster.WithNodeID("node-1"),
    cluster.WithTransport(transport),
    cluster.WithHandler(actorHandler),
)
node.Start(ctx)

// Route requests by key
client := cluster.NewClient(transport, cluster.WithNumShards(256))
response, _ := client.RequestKey(ctx, "user-123", "GetProfile", payload)

Better Together

The real power comes from combining all three:

Pattern Standalone Combined
Event Sourcing Persistence & audit trail Events trigger actor messages across the cluster
Actor Model Concurrent processing Actors are distributed across nodes via sharding
Cluster Request routing Routes to actors that persist via event sourcing

Example flow:

  1. Client sends CreateOrder to cluster, keyed by order-123
  2. Cluster routes to the node owning that shard
  3. Actor receives the message, loads the Order aggregate
  4. Event Sourcing replays events to rebuild state
  5. Actor executes business logic, raises OrderCreated event
  6. Event Sourcing persists the event
  7. Consumers react to the event (send email, update read model, etc.)

Storage Backend

clstr uses NATS JetStream as its primary storage backend, providing:

  • Durable event streams with configurable retention
  • Key-value store for snapshots and checkpoints
  • Pub/sub transport for cluster communication
  • Exactly-once delivery semantics
// Connect to NATS
conn := nats.MustConnect("nats://localhost:4222")

// Create the environment with NATS backend
env := es.NewEnv(
    es.WithEventStore(natses.NewEventStore(conn)),
    es.WithSnapshotter(natses.NewSnapshotter(conn)),
)

Supporting Packages

  • cache — LRU cache with TTL support for aggregate caching
  • perkey — Per-key serialization (sequential per key, parallel across keys)
  • ds — Generic data structures (ordered Set)
  • metrics — Pluggable metrics interfaces (Counter, Gauge, Histogram, Timer)

Observability

clstr provides optional Prometheus metrics for all three pillars through a pluggable interface pattern:

import promadapter "github.com/codewandler/clstr-go/adapters/prometheus"

// Initialize metrics for all pillars
metrics := promadapter.NewAllMetrics(prometheus.DefaultRegisterer)

// Use with ES
env := es.NewEnv(es.WithMetrics(metrics.ES), ...)

// Use with Actor
a := actor.New(actor.Options{Metrics: metrics.Actor, ...}, handler)

// Use with Cluster
client, _ := cluster.NewClient(cluster.ClientOptions{Metrics: metrics.Cluster, ...})

Available Metrics:

Pillar Metrics
ES Store/repo latencies, events appended, cache hits/misses, consumer lag, concurrency conflicts
Actor Message duration, messages processed, panics, mailbox depth, scheduler inflight
Cluster Request/handler duration, transport errors, shards owned

See the Prometheus adapter for detailed documentation.

Performance

Benchmark results from the counter example (7-node cluster, NATS in Docker):

Metric Value
Throughput 43,000 req/sec
p99 latency 20ms
Actor handler ~8μs per message
Memory ~12MB heap after 500k requests

Every request is correctly routed, processed exactly once, and the counter value always matches the request count.

Quick Start

# Install
go get github.com/your-org/clstr

# Run tests
task test

# Run the cluster demo
task loadtest

See the examples directory for complete working examples.

Design Principles

  • Explicit over implicit — No magic, clear data flow
  • Composition over inheritance — Small, focused interfaces
  • Testing first — In-memory implementations for every abstraction
  • Production ready — Battle-tested with NATS JetStream

License

MIT

Directories

Path Synopsis
adapters
api
prometheus
Package prometheus provides Prometheus implementations of the metrics interfaces for all three pillars (ES, Actor, Cluster).
Package prometheus provides Prometheus implementations of the metrics interfaces for all three pillars (ES, Actor, Cluster).
core
actor/v2
Package actor provides a mailbox-based actor model implementation for building concurrent, message-driven systems.
Package actor provides a mailbox-based actor model implementation for building concurrent, message-driven systems.
app
Package app provides a high-level API for building cluster applications that combine actors with sharded message routing.
Package app provides a high-level API for building cluster applications that combine actors with sharded message routing.
cache
Package cache provides a simple key-value cache interface with LRU eviction and TTL support.
Package cache provides a simple key-value cache interface with LRU eviction and TTL support.
cluster
Package cluster provides distributed shard-based message routing with consistent hashing for building scalable, partitioned systems.
Package cluster provides distributed shard-based message routing with consistent hashing for building scalable, partitioned systems.
ds
Package ds provides generic data structures for use in event sourcing systems.
Package ds provides generic data structures for use in event sourcing systems.
es
Package es provides an event sourcing framework for building event-driven applications.
Package es provides an event sourcing framework for building event-driven applications.
metrics
Package metrics provides abstract metrics interfaces that allow pluggable instrumentation backends (Prometheus, StatsD, etc.) without coupling the core packages to any specific implementation.
Package metrics provides abstract metrics interfaces that allow pluggable instrumentation backends (Prometheus, StatsD, etc.) without coupling the core packages to any specific implementation.
perkey
Package perkey provides a scheduler that serializes work per key while allowing work for different keys to execute concurrently.
Package perkey provides a scheduler that serializes work per key while allowing work for different keys to execute concurrently.
reflector
Package reflector provides type reflection utilities with caching.
Package reflector provides type reflection utilities with caching.
sf
Package sf provides a generic single-flight mechanism for deduplicating concurrent function calls with the same key.
Package sf provides a generic single-flight mechanism for deduplicating concurrent function calls with the same key.
examples
loadtest command
Package main is a performance benchmark for the Event Sourcing pillar.
Package main is a performance benchmark for the Event Sourcing pillar.
ports
kv

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL