README
¶
Ergo Extensions (system)
This repository provides a small set of building blocks to add distributed process discovery and daemon orchestration to an Ergo-based cluster. It ships a supervisor that wires together:
extensions_whereis— a process that periodically inspects local processes and maintains a distributed sharded directory for process discovery.extensions_daemon— a process that (on the elected leader) recovers and launches daemon processes across nodes using consistent hashing.extensions_cron— a cron-like scheduler that triggers messages on a node or across the cluster.AddressBook— a thread-safe, eventually-consistent cache of nodes and their registered processes, with node picking via consistent hashing.
Core components live under the system package and are designed to integrate with ergo.services/ergo and a registrar (Zookeeper via github.com/qjpcpu/registrar, or the built-in in-memory registrar used by app.StartSimpleNode when no registrar is provided). The app package provides a small helper to start a node with these components wired in.
Features
- Distributed process discovery and naming via a shared address book
- Periodic local inspection and cluster-wide sync of process snapshots
- Leader-driven daemon recovery and remote spawn requests
- Node- or cluster-scoped cron jobs (
extensions_cron) with stable placement - Consistent hashing (
xxhash+buraksezer/consistent) for stable node selection - Simple APIs to register daemon launchers and spawn named processes
Requirements
ergo.services/ergo v1.999.320- A network registrar; the default code expects Zookeeper via
github.com/qjpcpu/registrar
The module’s go.mod includes a replace directive to use github.com/qjpcpu/registrar/zk for the registrar.
Install
go get github.com/qjpcpu/ergo-extensions@latest
Import the system package:
import "github.com/qjpcpu/ergo-extensions/system"
Architecture
Supervisor(extensions_sup) starts three children:WhereIsProcess(extensions_whereis):- Inspects local processes periodically (default: every 3 seconds)
- Maintains PID→Name and Name→PID maps
- Stores a snapshot (
ProcessInfoList) in-memory and updates the localAddressBook - Distributes process location information across nodes using a sharded directory model
- Answers calls:
MessageLocate{Name}→ node,MessageGetAddressBook{}→MessageAddressBook{Book IAddressBook}
DaemonMonitorProcess(extensions_daemon):- Subscribes to registrar events for leader election and membership changes
- On leader, scans registered
Launcherrecovery iterators and launches daemons to selected nodes - Sends remote spawn requests when target node is not local
CronJobProcess(extensions_cron):- Triggers cron jobs either on a single node or cluster-wide
AddressBook:- Tracks available nodes and per-node registered processes
- Picks a node for a process name using a consistent hashing ring (PartitionCount: 10240, ReplicationFactor: 40)
- Supports global process lookups via
IAddressBookQuery(returned bybook.QueryBy(caller))
app.StartSimpleNode:- Starts an Ergo node and loads the
system.Supervisorwith a sharedIAddressBook - Returns an
app.Nodewith helpers:LocateProcess,ForwardCall,ForwardSend,ForwardSpawn,WaitPID, etc.
- Starts an Ergo node and loads the
Quick Start
- Add the supervisor to your application members:
spec := gen.ApplicationSpec{
Members: []gen.ApplicationMemberSpec{
system.ApplicationMemberSpec(system.ApplicationMemberSpecOptions{}),
},
}
// Wire this application spec into your Ergo node environment/startup as usual.
Or start a node with everything wired in (uses Zookeeper registrar when Endpoints is set, otherwise falls back to an in-memory single-node registrar):
n, err := app.StartSimpleNode(app.SimpleNodeOptions{
NodeName: "node-1",
Options: zk.Options{
Endpoints: []string{"127.0.0.1:2181"},
},
CronJobs: []app.CronJob{
{
Name: gen.Atom("job.ping"),
Spec: "* * * * *",
Location: system.CronJobLocationUTC,
TriggerProcess: gen.Atom("ping"),
Scope: system.CronJobScopeCluster,
},
},
NodeForwardWorker: 8,
SyncProcessInterval: time.Second * 3,
})
_ = n
_ = err
- Register a launcher for your daemon processes (during init or startup):
var WorkerLauncher = system.Launcher{
Factory: func() gen.ProcessBehavior { return &Worker{} },
Option: gen.ProcessOptions{EnableRemote: true},
RecoveryScanner: func() system.DaemonIterator {
// Provide desired daemons to recover when the cluster leader starts/restarts.
jobs := []system.DaemonProcess{
{ProcessName: gen.Atom("worker.A")},
{ProcessName: gen.Atom("worker.B")},
}
i := 0
return func() ([]system.DaemonProcess, bool, error) {
if i == 0 {
i++
return jobs, false, nil
}
return nil, false, nil
}
},
}
func init() {
_ = system.RegisterLauncher(gen.Atom("worker"), WorkerLauncher)
}
- Spawn a named daemon using a
Spawner:
sp := system.NewSpawner(self, gen.Atom("worker"))
pid, err := sp.SpawnRegister(gen.Atom("worker.A"), /* args... */)
- Locate a process by its registered name:
Using app.Node helper:
node := n.LocateProcess(gen.Atom("worker.A"))
Or via AddressBook for distributed lookup:
respAny, err := self.Call(gen.ProcessID{Name: system.WhereIsProcess}, system.MessageGetAddressBook{})
if err != nil { /* handle */ }
book := respAny.(system.MessageAddressBook).Book
node, err := book.QueryBy(self, system.QueryOption{Timeout: 5}).Locate(gen.Atom("worker.A"))
- Access the shared
AddressBookif you need more control (e.g., local pick):
respAny, err := self.Call(gen.ProcessID{Name: system.WhereIsProcess}, system.MessageGetAddressBook{})
if err != nil { /* handle */ }
book := respAny.(system.MessageAddressBook).Book
picked := book.PickNode(gen.Atom("worker.A")) // pick based on consistent hashing
Public API (selected)
- Constants:
system.Supervisor,system.WhereIsProcess,system.DaemonMonitorProcess,system.CronJobProcess
- Supervisor helpers:
system.ApplicationMemberSpec(opts system.ApplicationMemberSpecOptions) gen.ApplicationMemberSpecsystem.FactorySystemSup(opts system.ApplicationMemberSpecOptions) gen.ProcessFactory
- SimpleNode helpers (
app.Node):LocateProcess(name gen.Atom) gen.AtomForwardCall(to string, msg any) (any, error)ForwardSend(to string, msg any) errorForwardSpawn(fac gen.ProcessFactory, args ...any) errorWaitPID(pid gen.PID) errorAddressBook() IAddressBook
- Daemon orchestration:
system.RegisterLauncher(name gen.Atom, launcher system.Launcher) errorsystem.NewSpawner(parent gen.Process, launcher gen.Atom) system.SpawnerSpawner.SpawnRegister(processName gen.Atom, args ...any) (gen.PID, error)system.SingletonDaemon(name gen.Atom, args []any) system.DaemonIteratorFactoryLauncher{ Factory, Option, RecoveryScanner }DaemonProcess{ ProcessName gen.Atom, Args []any }
- Discovery & address book:
- Call
extensions_whereiswithMessageLocate{Name gen.Atom}→gen.Atom(node) - Call
extensions_whereiswithMessageGetAddressBook{}→MessageAddressBook{Book IAddressBook} IAddressBookprovides:PickNode,GetAvailableNodes,QueryBy(caller, opts)→IAddressBookQueryIAddressBookQueryprovides:Locate(processName)→gen.Atom(node)
- Call
Registrar & Events
The code expects a working registrar from the Ergo network. With Zookeeper (github.com/qjpcpu/registrar), the following events are handled:
- Leadership changes:
EventNodeSwitchedToLeader,EventNodeSwitchedToFollower - Membership changes:
EventNodeJoined,EventNodeLeft
extensions_cron will rebalance on joins/left; extensions_daemon will re-plan launches on left/failover and trigger recovery when this node becomes leader. extensions_whereis syncs periodically and does not depend on registrar events.
Design Notes
- Consistency: the
AddressBookis eventually consistent; broadcasts retry on failures. - Locate semantics: if multiple nodes report the same process name,
LocateLocalpicks the oldest instance; ties are deterministic. - Hashing: consistent hashing ring uses
xxhashandburaksezer/consistentto spread process names across nodes. - Scheduling:
whereisinspects periodically (default: 3s);extensions_daemonschedules recovery with small delays to absorb churn. - Safety: remote spawns are issued via
SendImportantto target nodes.
Limitations
MessageLocatereturns a node, not a PID; ask the address book or the node itself for details.- Recovery scanners are user-supplied; ensure they are idempotent and resilient.
- Broadcasts are best-effort; transient network issues may delay convergence.
Development
- Code lives in
system/andapp/ - No external binaries; integrate directly with your Ergo application
- Linting/formatting follow your project’s standards
License
MIT License. See LICENSE for details.