Documentation
¶
Overview ¶
options.go — functional options for Pool construction.
Change log ¶
- v0.1: Initial set: WithAutoScale, WithTTL, WithCrashHandler, WithHealthInterval.
Pattern ¶
All public options follow the functional-options pattern (Dave Cheney, 2014). Each option is a function that mutates an internal `config` struct. They are applied in order inside New[C], before any goroutines are started, so there are no synchronization requirements here.
Adding a new option ¶
- Add the field to `config`.
- Set a sensible default in `defaultConfig()`.
- Write a `WithXxx` function below.
- Document the zero-value behaviour in the comment.
pool.go — Session-affine process pool with singleflight Acquire.
Core invariant ¶
1 sessionID → 1 Worker, for the lifetime of the session.
Concurrency model (read this before touching Acquire) ¶
There are three maps protected by a single mutex (p.mu):
p.sessions map[string]Worker[C] — live sessionID → worker bindings
p.inflight map[string]chan struct{} — in-progress Acquire for a session
And one lock-free channel:
p.available chan Worker[C] — free workers ready to be assigned
The singleflight guarantee for Acquire(ctx, sessionID):
- Lock → check sessions → if found: unlock, return (FAST PATH).
- Lock → check inflight → if pending: grab chan, unlock, wait on it, then restart from step 1 when chan closes.
- Lock → create inflight[sessionID] = make(chan struct{}) → unlock.
- Block on <-p.available (or ctx cancel).
- Call w.Healthy(ctx). If unhealthy: discard, close inflight chan, return error.
- Lock → sessions[sessionID]=w, delete inflight[sid], close(ch) → unlock. Closing ch broadcasts to all goroutines waiting in step 2.
- Return &Session[C]{…}
Why a chan struct{} instead of sync.Mutex per session?
- A mutex would only let one waiter in. We need ALL waiters to unblock when the acquiring goroutine completes (step 6 closes ch → zero-copy broadcast).
- Closing a channel is safe to call exactly once and is always non-blocking.
Session.Release ¶
Session.Release removes the entry from p.sessions and pushes the worker back onto p.available. It does NOT close or kill the worker — the binary keeps running and will be assigned to the next caller.
Crash path ¶
processWorker.monitor() calls p.onCrash(sessionID) when it detects that the subprocess exited while holding a session. onCrash:
- Removes the session from p.sessions.
- If there is a pending inflight chan for the same sessionID, closes it so any goroutine waiting in step 2 of Acquire unblocks (they will then get an error because Acquire finds neither a session nor a valid inflight).
- Calls the user-supplied crashHandler if set.
factory.go — WorkerFactory implementations.
What lives here ¶
- ProcessFactory: the default factory. Spawns any OS binary, assigns it an OS-allocated TCP port, and polls GET <address>/health until 200 OK. Users pass this to New[C] so they never have to implement WorkerFactory themselves for the common HTTP case.
Port allocation ¶
Ports are assigned by the OS (net.Listen("tcp", "127.0.0.1:0")). The binary receives its port via the PORT environment variable AND via any arg that contains the literal string "{{.Port}}" — that token is replaced with the actual port number at spawn time.
Health polling ¶
After the process starts, ProcessFactory polls GET <address>/health every 200ms, up to 30 attempts (6 seconds total). If the worker never responds with 200 OK, Spawn returns an error and kills the process. The concrete port + binary are logged at startup.
Crash monitoring ¶
A background goroutine calls cmd.Wait(). On exit, if the worker still holds a sessionID the pool's onCrash callback is invoked so the session affinity map is cleaned up.
pool_ttl.go — Idle session TTL sweeper for Pool[C].
Why a separate file? ¶
pool.go owns the concurrency model (Acquire, Release, onCrash). This file owns time-based session lifecycle — kept separate so each file has one job and can be read/reviewed in isolation.
How it works ¶
Every session that enters p.sessions also gets an entry in p.lastAccessed (a map[string]time.Time guarded by the same p.mu lock). Every call to Acquire that hits the fast path "touches" the session by updating its timestamp. The sweeper goroutine wakes up every ttl/2 and evicts sessions whose timestamp is older than cfg.ttl, calling release() on each so the worker is returned to the available channel.
Concurrency ¶
p.lastAccessed is always read/written under p.mu — the same lock that guards p.sessions and p.inflight. No extra synchronization is needed.
Disabling TTL ¶
Pass WithTTL(0) to New[C]. The ttlSweepLoop in pool.go returns immediately when cfg.ttl == 0, so this file's sweeper is never started.
Index ¶
- Variables
- func NewLocalRegistry[C any]() *localRegistry[C]
- type Option
- type Pool
- func (p *Pool[C]) Acquire(ctx context.Context, sessionID string) (*Session[C], error)
- func (p *Pool[C]) GetSession(ctx context.Context, sessionID string) (*Session[C], error)
- func (p *Pool[C]) KillSession(sessionID string) error
- func (p *Pool[C]) Shutdown(ctx context.Context) error
- func (p *Pool[C]) Stats() PoolStats
- type PoolStats
- type ProcessFactory
- func (f *ProcessFactory) Spawn(ctx context.Context) (Worker[*http.Client], error)
- func (f *ProcessFactory) WithCPULimit(cores float64) *ProcessFactory
- func (f *ProcessFactory) WithEnv(kv string) *ProcessFactory
- func (f *ProcessFactory) WithHealthPath(path string) *ProcessFactory
- func (f *ProcessFactory) WithInsecureSandbox() *ProcessFactory
- func (f *ProcessFactory) WithMemoryLimit(bytes int64) *ProcessFactory
- func (f *ProcessFactory) WithPIDsLimit(n int64) *ProcessFactory
- func (f *ProcessFactory) WithStartHealthCheckDelay(d time.Duration) *ProcessFactory
- func (f *ProcessFactory) WithStartTimeout(d time.Duration) *ProcessFactory
- type ProcessWorker
- type Session
- type SessionRegistry
- type Worker
- type WorkerFactory
Constants ¶
This section is empty.
Variables ¶
var ErrWorkerDead = errors.New("worker process has died")
Functions ¶
func NewLocalRegistry ¶
func NewLocalRegistry[C any]() *localRegistry[C]
Types ¶
type Option ¶
type Option func(*config)
Option is a functional option for New[C].
func WithAutoScale ¶
WithAutoScale sets the minimum and maximum number of live workers.
- min: the pool always keeps at least this many workers healthy, even with zero active sessions.
- max: hard cap on concurrent workers; Acquire blocks once this limit is reached until a worker becomes available.
Panics if min < 1 or max < min.
func WithCrashHandler ¶
WithCrashHandler registers a callback invoked when a worker's subprocess exits unexpectedly while it holds an active session.
fn receives the sessionID that was lost. Use it to:
- Delete session-specific state in your database
- Return an error to the end user ("your session was interrupted")
- Trigger a re-run of the failed job
fn is called from a background monitor goroutine. It must not block for extended periods; spawn a goroutine if you need to do heavy work.
If WithCrashHandler is not set, crashes are only logged.
func WithHealthInterval ¶
WithHealthInterval sets how often the pool's background health-check loop calls Worker.Healthy() on every live worker.
Shorter intervals detect unhealthy workers faster but add more HTTP/RPC overhead. The default (5s) is a good balance for most workloads.
Set d = 0 to disable background health checks entirely. Workers are still checked once during Acquire (step 6 of the singleflight protocol).
func WithStartHealthCheckDelay ¶
WithStartHealthCheckDelay delay the health check for the first time. let the process start and breath before hammering with health checks
func WithTTL ¶
WithTTL sets the idle-session timeout.
A session is considered idle when no Acquire call has touched it within d. When the TTL fires, the session is removed from the affinity map and its worker is returned to the available pool.
Set d = 0 to disable TTL (sessions live until explicitly Released or the pool shuts down). This is useful for REPL-style processes where the caller owns the session lifetime.
func WithWorkerReuse ¶
WithWorkerReuse controls whether a worker is recycled when its session's TTL expires. If true (the default), the worker is returned to the available pool to serve new sessions. If false, the worker process is killed when the session expires, and a fresh worker is spawned to maintain the minimum pool capacity.
type Pool ¶
type Pool[C any] struct { // contains filtered or unexported fields }
Pool manages a set of workers and routes requests by sessionID. Create one with New[C].
func New ¶
func New[C any](factory WorkerFactory[C], opts ...Option) (*Pool[C], error)
New creates a pool backed by factory, applies opts, and starts min workers. Returns an error if any of the initial workers fail to start.
func (*Pool[C]) Acquire ¶
Acquire returns the Worker pinned to sessionID.
If sessionID already has a worker, it is returned immediately (fast path). If sessionID is new, a free worker is popped from the available channel, health-checked, and pinned to the session.
If another goroutine is currently acquiring the same sessionID, this call blocks until that acquisition completes and then returns the same worker (singleflight guarantee — no two goroutines can pin different workers to the same sessionID simultaneously).
Blocks until a worker is available or ctx is cancelled.
func (*Pool[C]) GetSession ¶
GetSession returns the Session pinned to sessionID if it exists.
Unlike Acquire, this does NOT allocate a new worker from available or singleflight slow paths. It is purely a lookup. It updates last-accessed time and active connection counts just like Acquire's fast path.
Returns (nil, nil) if no session exists for this ID.
func (*Pool[C]) KillSession ¶
KillSession forcefully tears down the worker pinned to sessionID. This is used by daemon control-plane EOF cleanup to prevent orphaned stateful workers when a client disconnects unexpectedly. If the session does not exist, KillSession returns nil.
func (*Pool[C]) Shutdown ¶
Shutdown gracefully stops the pool. It closes all background goroutines and then kills every worker. In-flight Acquire calls will receive a context cancellation error if the caller's ctx is tied to the application lifetime.
Two signals are sent deliberately:
- p.cancel() cancels p.ctx, which unblocks any in-flight addWorker goroutines that are blocking on factory.Spawn (they use a context.WithTimeout derived from p.ctx).
- close(p.done) signals the healthCheckLoop and runTTLSweep goroutines to exit their ticker loops cleanly.
type PoolStats ¶
type PoolStats struct {
// TotalWorkers is the number of workers currently registered in the pool
// (starting + healthy + busy). Does not count workers being scaled down.
TotalWorkers int
// AvailableWorkers is the number of idle workers ready to accept a new session.
AvailableWorkers int
// ActiveSessions is the number of sessionID → worker bindings currently live.
ActiveSessions int
// InflightAcquires is the number of Acquire calls currently in the "slow path"
// (waiting for a worker to become available). Useful for queue-depth alerting.
InflightAcquires int
// Node is a snapshot of host-level resource availability (memory, CPU idle).
// Populated by observer.PollNodeStats(); zero-valued on non-Linux platforms
// or if the poll fails.
//
// Note: on Linux, Stats() blocks for ~100 ms to measure CPU idle.
Node observer.NodeStats
}
PoolStats is a point-in-time snapshot of pool state for dashboards / alerts.
type ProcessFactory ¶
type ProcessFactory struct {
// contains filtered or unexported fields
}
ProcessFactory is the default WorkerFactory[*http.Client]. It spawns `binary` as a subprocess, allocates a free OS port, and polls GET <address>/health until the worker reports healthy.
Use NewProcessFactory to create one; pass it directly to New[C]:
pool, err := herd.New(herd.NewProcessFactory("./my-binary", "--port", "{{.Port}}"))
func NewProcessFactory ¶
func NewProcessFactory(binary string, args ...string) *ProcessFactory
NewProcessFactory returns a ProcessFactory that spawns the given binary.
Any arg containing the literal string "{{.Port}}" is replaced with the OS-assigned port number at spawn time. The port is also injected via the PORT environment variable for binaries that prefer env-based config.
factory := herd.NewProcessFactory("./ollama", "serve", "--port", "{{.Port}}")
func (*ProcessFactory) Spawn ¶
Spawn implements WorkerFactory[*http.Client]. It allocates a free port, starts the binary, and blocks until the worker passes a /health check or ctx is cancelled.
func (*ProcessFactory) WithCPULimit ¶
func (f *ProcessFactory) WithCPULimit(cores float64) *ProcessFactory
WithCPULimit sets the cgroup CPU quota in cores for each spawned worker. For example, 0.5 means half a CPU and 2 means two CPUs. A value of 0 disables the limit.
func (*ProcessFactory) WithEnv ¶
func (f *ProcessFactory) WithEnv(kv string) *ProcessFactory
WithEnv appends an extra KEY=VALUE environment variable that is injected into every worker spawned by this factory. The literal string "{{.Port}}" is replaced with the worker's allocated port number, which is useful for binaries that accept the listen address via an env var rather than a flag.
factory := herd.NewProcessFactory("ollama", "serve").
WithEnv("OLLAMA_HOST=127.0.0.1:{{.Port}}").
WithEnv("OLLAMA_MODELS=/tmp/shared-ollama-models")
func (*ProcessFactory) WithHealthPath ¶
func (f *ProcessFactory) WithHealthPath(path string) *ProcessFactory
WithHealthPath sets the HTTP path that herd polls to decide whether a worker is ready. The path must return HTTP 200 when the process is healthy.
Default: "/health"
Use this for binaries that expose liveness on a non-standard path:
factory := herd.NewProcessFactory("ollama", "serve").
WithHealthPath("/") // ollama serves GET / → 200 "Ollama is running"
func (*ProcessFactory) WithInsecureSandbox ¶
func (f *ProcessFactory) WithInsecureSandbox() *ProcessFactory
WithInsecureSandbox disables the namespace/cgroup sandbox. Use only for local debugging on non-Linux systems or when you explicitly trust the spawned processes.
func (*ProcessFactory) WithMemoryLimit ¶
func (f *ProcessFactory) WithMemoryLimit(bytes int64) *ProcessFactory
WithMemoryLimit sets the cgroup memory limit, in bytes, for each spawned worker. A value of 0 disables the memory limit.
func (*ProcessFactory) WithPIDsLimit ¶
func (f *ProcessFactory) WithPIDsLimit(n int64) *ProcessFactory
WithPIDsLimit sets the cgroup PID limit for each spawned worker. Pass -1 for unlimited. Values of 0 or less than -1 are invalid.
func (*ProcessFactory) WithStartHealthCheckDelay ¶
func (f *ProcessFactory) WithStartHealthCheckDelay(d time.Duration) *ProcessFactory
WithStartHealthCheckDelay delay the health check for the first time. let the process start and breath before hammering with health checks
func (*ProcessFactory) WithStartTimeout ¶
func (f *ProcessFactory) WithStartTimeout(d time.Duration) *ProcessFactory
WithStartTimeout sets the maximum duration herd will poll the worker's health endpoint after spawning the process before giving up and killing it.
Default: 30 seconds
type ProcessWorker ¶
type ProcessWorker struct {
// contains filtered or unexported fields
}
processWorker implements Worker[*http.Client]. It is the value returned by ProcessFactory.Spawn.
func (*ProcessWorker) Address ¶
func (w *ProcessWorker) Address() string
func (*ProcessWorker) Client ¶
func (w *ProcessWorker) Client() *http.Client
func (*ProcessWorker) Close ¶
func (w *ProcessWorker) Close() error
Close drains and kills the subprocess. After Close returns, the process is guaranteed to be gone.
func (*ProcessWorker) Healthy ¶
func (w *ProcessWorker) Healthy(ctx context.Context) error
Healthy performs a GET <address><healthPath> and returns nil on 200 OK. ctx controls the timeout of this single request.
func (*ProcessWorker) ID ¶
func (w *ProcessWorker) ID() string
func (*ProcessWorker) OnCrash ¶
func (w *ProcessWorker) OnCrash(fn func(string))
OnCrash sets a callback invoked when the worker process exits unexpectedly.
type Session ¶
type Session[C any] struct { // ID is the sessionID that was passed to Pool.Acquire. ID string // Worker is the underlying worker pinned to this session. // Use Worker.Client() to talk to the subprocess. Worker Worker[C] // contains filtered or unexported fields }
Session is a scoped handle returned by Pool.Acquire.
It binds one sessionID to one worker for the duration of the session. Call Release when the session is done — this frees the worker so it can be assigned to the next sessionID. Failing to call Release leaks a worker.
A Session is NOT safe for concurrent use by multiple goroutines. Multiple HTTP requests for the same sessionID should each call Acquire independently; the pool guarantees they always receive the same underlying worker.
func (*Session[C]) ConnRelease ¶
func (s *Session[C]) ConnRelease()
type SessionRegistry ¶
type SessionRegistry[C any] interface { // Get returns the worker pinned to sessionID. // Returns (nil, nil) if no session exists for this ID. Get(ctx context.Context, sessionID string) (Worker[C], error) // Put pins a worker to a sessionID. Put(ctx context.Context, sessionID string, w Worker[C]) error // Delete removes the pinning for sessionID. Delete(ctx context.Context, sessionID string) error // List returns a snapshot of all currently active sessions. // Primarily used for background health checks and cleanup. List(ctx context.Context) (map[string]Worker[C], error) // Len returns the number of active sessions. Len() int }
SessionRegistry tracks which workers are pinned to which session IDs. In a distributed setup (Enterprise), this registry is shared across multiple nodes.
type Worker ¶
type Worker[C any] interface { // ID returns a stable, unique identifier for this worker (e.g. "worker-3"). // Never reused — not even after a crash and restart. ID() string // Address returns the internal network URI the worker // is listening on (e.g., '127.0.0.1:54321'). Address() string // Client returns the typed connection to the worker process. // For most users this is *http.Client; gRPC users return their stub here. Client() C // Healthy performs a liveness check against the subprocess. // Returns nil if the worker is accepting requests; non-nil otherwise. // Pool.Acquire calls this before handing a worker to a new session, // so a stale or crashed worker is never returned to a caller. Healthy(ctx context.Context) error // OnCrash sets a callback triggered when the worker process exits unexpectedly. OnCrash(func(sessionID string)) // Close performs graceful shutdown of the worker process. // Called by the pool during scale-down or Pool.Shutdown. io.Closer }
Worker represents one running subprocess managed by the pool.
C is the typed client the caller uses to talk to the subprocess — for example *http.Client, a gRPC connection, or a custom struct. The type parameter is constrained to "any" so the pool is fully generic.
type WorkerFactory ¶
type WorkerFactory[C any] interface { // Spawn starts one new worker and blocks until it is healthy. // If ctx is cancelled before the worker becomes healthy, Spawn must // kill the process and return a non-nil error. Spawn(ctx context.Context) (Worker[C], error) }
WorkerFactory knows how to spawn one worker process and return a typed Worker[C] that is ready to accept requests (i.e. Healthy returns nil).
Most users never implement this interface — they use NewProcessFactory instead. Implement WorkerFactory only if you need custom spawn logic (e.g. Firecracker microVM, Docker container, remote SSH process).
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
herd
command
|
|
|
internal
|
|
|
Package observer provides lightweight, OS-level resource sampling.
|
Package observer provides lightweight, OS-level resource sampling. |
|
proto
|
|
|
Package proxy provides NewReverseProxy — the one-liner that turns a session-affine process pool into an HTTP gateway.
|
Package proxy provides NewReverseProxy — the one-liner that turns a session-affine process pool into an HTTP gateway. |