sup

package module
v0.0.35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: MIT Imports: 10 Imported by: 0

README

sup

Go Reference Test License

sup is a high-performance, low-allocation Actor Model library for Go.

It provides a robust foundation for building highly concurrent, distributed, and fault-tolerant stateful applications. It achieves zero-allocation for asynchronous messages (Cast) and minimizes overhead for synchronous requests (Call) by utilizing internal resource pooling. It embraces standard Go idioms (select, channels, and context) rather than hiding them behind heavy frameworks.

Features

  • Idiomatic Go — Actors are just goroutines running a select loop. No magic interfaces, no reflection, no global registries.
  • OTP-style supervision — Erlang-inspired supervisor trees with Permanent, Transient, and Temporary restart policies.
  • Panic recovery — Panics are caught, wrapped with a stack trace, and reported via WithOnError. The actor is then restarted according to the policy.
  • Restart limits — Optionally cap restarts within a sliding time window with WithRestartLimit.
  • No goroutine leakscontext.Context integration ensures all actors shut down cleanly when the parent context is canceled.
  • Supervisor observers — Lightweight lifecycle hooks so you can collect metrics, log events, or build diagnostics without changing supervision semantics.
  • Supervisor trees — Supervisors implement the Actor interface, so they can be nested inside other supervisors.

Installation

go get github.com/webermarci/sup

Quick Start

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/webermarci/sup"
)

// 1. Define internal messages
type incrementMsg struct{ amount int }
type getCountMsg struct{}

// 2. Define your Actor
type Counter struct {
	*sup.BaseActor
	mailbox *sup.Mailbox
	count int
}

func NewCounter() *Counter {
	return &Counter{
		BaseActor: sup.NewBaseActor("counter"),
		mailbox: sup.NewMailbox(10),
	}
}

// 3. Clean public API — callers never interact with the mailbox directly
func (c *Counter) Increment(amount int) {
	_ = sup.Cast(c.mailbox, incrementMsg{amount: amount})
}

func (c *Counter) Get() (int, error) {
	return sup.Call[getCountMsg, int](c.mailbox, getCountMsg{})
}

// 4. The Run loop is a standard Go select statement
func (c *Counter) Run(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case msg := <-c.mailbox.Receive():
			switch m := msg.(type) {
			case sup.CastRequest[incrementMsg]:
				c.count += m.Payload().amount
			case sup.CallRequest[getCountMsg, int]:
				m.Reply(c.count, nil)
			}
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	counter := NewCounter()

	supervisor := sup.NewSupervisor("root",
		sup.WithActor(counter),
		sup.WithPolicy(sup.Permanent),
		sup.WithRestartDelay(time.Second),
		sup.WithRestartLimit(5, 10*time.Second),
		sup.WithOnError(func(actor sup.Actor, err error) {
			fmt.Printf("Actor %s failed with error: %v\n", actor.Name(), err)
		}),
	)

	go supervisor.Run(ctx)

	counter.Increment(10)
	counter.Increment(32)

	count, err := counter.Get()
	if err != nil {
		panic(err)
	}

	fmt.Printf("Final count: %d\n", count) // Final count: 42

	cancel()
	supervisor.Wait()
}

Restart Policies

Policy Clean exit (nil) Error or panic
Permanent Restarts Restarts
Transient Stops Restarts
Temporary Stops Stops

Mailbox

A Mailbox is the actor's message queue. Messages are sent using Cast (fire-and-forget) or Call (request-reply), and received inside the actor's Run loop via Receive().

Sending variants
Function Behaviour on full mailbox Behaviour on closed mailbox
Cast Blocks until space is available Returns ErrMailboxClosed
CastContext Blocks until space or context expires Returns ErrMailboxClosed
TryCast Returns ErrMailboxFull immediately Returns ErrMailboxClosed
Call Blocks until reply is received Returns ErrMailboxClosed
CallContext Blocks until reply or context expires Returns ErrMailboxClosed
TryCall Returns ErrMailboxFull immediately Returns ErrMailboxClosed

Supervisor Trees

Supervisors implement the Actor interface, so they compose naturally into trees. When the root context is canceled, shutdown propagates recursively through the entire tree.

dbActor := NewDatabaseActor()
cacheActor := NewCacheActor()

// Child supervisor manages data-layer actors
dataSup := sup.NewSupervisor("data_supervisor",
	sup.WithActors(dbActor, cacheActor),
	sup.WithPolicy(sup.Permanent),
	sup.WithRestartDelay(500*time.Millisecond),
)

// Root supervisor treats the child supervisor as an actor
root := sup.NewSupervisor("root",
	sup.WithActor(dataSup),
	sup.WithPolicy(sup.Permanent),
)

go root.Run(ctx)

Stateless Actors

For actors that don't need a mailbox or internal state, use ActorFunc:

healthCheck := sup.ActorFunc("health", func(ctx context.Context) error {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-ticker.C:
			if err := ping(); err != nil {
				return err // supervisor will restart based on policy
			}
		}
	}
})

sup.NewSupervisor("health_supervisor",
	sup.WithActor(healthCheck),
	sup.WithPolicy(sup.Transient),
).Run(ctx)

Dynamic Spawning

Use Spawn to start actors dynamically after the supervisor is already running:

supervisor := sup.NewSupervisor("job_supervisor",
	sup.WithPolicy(sup.Temporary),
)
go supervisor.Run(ctx)

// Later, spawn actors on demand
for _, job := range jobs {
	supervisor.Spawn(ctx, newJobActor(job))
}

supervisor.Wait()

Observability

sup exposes a minimal, flexible observer mechanism via SupervisorObserver and the WithObserver option. Observers are small collections of optional callbacks that receive lifecycle events from the supervisor:

  • OnActorRegistered(actor Actor) — called when Spawn is invoked for an actor.
  • OnActorStarted(actor Actor) — called immediately before actor.Run(ctx) for each run.
  • OnActorStopped(actor Actor, err error) — called after actor.Run returns (error may be nil for clean exits).
  • OnActorRestarting(actor Actor, restartCount int, lastErr error) — called when the supervisor decides to restart an actor.
  • OnSupervisorTerminal(err error) — called when the supervisor escalates to a terminal error (e.g. restart limits exceeded).

Design notes:

  • All callbacks are optional — provide only the ones you need.
  • Callbacks are invoked asynchronously (each runs in its own goroutine) and any panic inside an observer is recovered. Observers cannot block or crash the supervisor.
  • Observers receive the Actor value so they may type-assert to access actor-specific fields (for example, bus actors may expose a Mailbox() accessor to inspect queue size).
package main

import (
    "fmt"

    "github.com/webermarci/sup"
)

func main() {
	observer := &sup.SupervisorObserver{
		OnActorRegistered: func(a sup.Actor) {
			fmt.Printf("registered: %s\n", a.Name())
		},
		OnActorStarted: func(a sup.Actor) {
			fmt.Printf("started: %s\n", a.Name())
		},
		OnActorStopped: func(a sup.Actor, err error) {
			fmt.Printf("stopped: %s err=%v\n", a.Name(), err)
		},
		OnActorRestarting: func(a sup.Actor, count int, lastErr error) {
			fmt.Printf("restarting: %s count=%d lastErr=%v\n", a.Name(), count, lastErr)
		},
		OnSupervisorTerminal: func(err error) {
			fmt.Printf("supervisor terminal: err=%v\n", err)
		},
	}

	supervisor := sup.NewSupervisor("root",
		sup.WithObserver(observer),
	)
}

Packages

  • sup — Core supervisor and mailbox implementation
  • sup/bus — Higher-level abstractions for polling and controlling with automatic scheduling and change notifications
  • sup/exec — Actor wrapper around os/exec for managing external processes as actors
  • sup/hub — Generic load balancer and distribution utility for grouping multiple function signatures and calling them with various strategies
  • sup/modbus — Actor wrapper around Modbus connections (TCP/RTU/ASCII) for thread-safe hardware access with automatic reconnection
  • sup/sse — Actor wrapper around Server-Sent Events (SSE) for consuming real-time event streams with automatic reconnection and last-event-id tracking
  • sup/ui — Real-time dashboard for visualizing and inspecting actors in your supervisor tree
  • sup/ws — Actor wrapper around WebSocket connections for thread-safe communication with automatic reconnection

Benchmark

goos: darwin
goarch: arm64
pkg: github.com/webermarci/sup
cpu: Apple M5
Benchmark_Cast-10                       20725142    57.4 ns/op     0 B/op   0 allocs/op
Benchmark_Cast_Concurrent-10            10265941   117.4 ns/op     0 B/op   0 allocs/op
Benchmark_CastContext-10                21805774    55.2 ns/op     0 B/op   0 allocs/op
Benchmark_CastContext_Concurrent-10     15253065    79.0 ns/op     0 B/op   0 allocs/op
Benchmark_CastContext_Expired-10        23459665    51.0 ns/op     0 B/op   0 allocs/op
Benchmark_TryCast-10                   198711219     6.0 ns/op     0 B/op   0 allocs/op
Benchmark_TryCast_Concurrent-10         89085205    13.9 ns/op     0 B/op   0 allocs/op
Benchmark_TryCast_Full-10              254923090     4.7 ns/op     0 B/op   0 allocs/op
Benchmark_Call-10                        3095778   390.2 ns/op   152 B/op   3 allocs/op
Benchmark_Call_Concurrent-10             2198695   506.8 ns/op   152 B/op   3 allocs/op
Benchmark_CallContext-10                 2545513   451.1 ns/op   152 B/op   3 allocs/op
Benchmark_CallContext_Concurrent-10      1301760   896.5 ns/op   152 B/op   3 allocs/op
Benchmark_CallContext_Expired-10        14855138    78.4 ns/op   152 B/op   3 allocs/op
Benchmark_TryCall-10                     3060210   392.7 ns/op   152 B/op   3 allocs/op
Benchmark_TryCall_Concurrent-10          2227587   527.4 ns/op   152 B/op   3 allocs/op
Benchmark_Supervisor_SpawnAndExit-10     4714188   236.9 ns/op    72 B/op   2 allocs/op

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrMailboxFull is returned by TryCast when the mailbox buffer is full.
	ErrMailboxFull = errors.New("mailbox is full")
	// ErrMailboxClosed is returned when trying to send to a closed mailbox.
	ErrMailboxClosed = errors.New("mailbox is closed")
)

Functions

func Call

func Call[T any, R any](mb *Mailbox, payload T) (R, error)

Call sends a message to an actor and waits indefinitely for a reply.

func CallContext added in v0.0.2

func CallContext[T any, R any](ctx context.Context, mb *Mailbox, payload T) (R, error)

CallContext sends a message to an actor and waits for a reply until the context expires.

func Cast added in v0.0.8

func Cast[T any](mb *Mailbox, payload T) error

Cast sends an asynchronous typed envelope, waiting until it can be enqueued or the mailbox is closed. It returns ErrMailboxClosed if the mailbox is closed.

func CastContext added in v0.0.8

func CastContext[T any](ctx context.Context, mb *Mailbox, payload T) error

CastContext sends an asynchronous typed envelope with context for enqueue cancellation. It returns ErrMailboxClosed if the mailbox is closed, or ctx.Err() if the context expires before the message is enqueued.

func TryCall added in v0.0.2

func TryCall[T any, R any](mb *Mailbox, payload T) (R, error)

TryCall attempts to enqueue a request without blocking.

func TryCallContext added in v0.0.6

func TryCallContext[T any, R any](ctx context.Context, mb *Mailbox, payload T) (R, error)

TryCallContext attempts to enqueue a request without blocking and waits for reply until ctx expires.

func TryCast added in v0.0.8

func TryCast[T any](mb *Mailbox, payload T) error

TryCast attempts to send an envelope without blocking. It returns ErrMailboxClosed if the mailbox is closed, or ErrMailboxFull immediately if the mailbox buffer is full.

func TryCastContext added in v0.0.8

func TryCastContext[T any](ctx context.Context, mb *Mailbox, payload T) error

TryCastContext attempts to send an envelope without blocking, but returns ctx.Err() if ctx is done. It returns ErrMailboxClosed if the mailbox is closed, or ErrMailboxFull immediately if the mailbox buffer is full.

Types

type Actor added in v0.0.13

type Actor interface {
	Name() string
	Run(context.Context) error
	// contains filtered or unexported methods
}

Actor represents a concurrent entity that can be supervised. It has a name and a Run method that executes its logic. The Run method should return an error if the actor needs to be restarted, or nil if it can exit cleanly. Panics will also trigger a restart. The setLogger method is used internally by the supervisor to inject a logger into the actor.

func ActorFunc added in v0.0.13

func ActorFunc(name string, fn func(ctx context.Context, logger *slog.Logger) error) Actor

ActorFunc creates a simple stateless actor from a function.

type BaseActor added in v0.0.20

type BaseActor struct {
	// contains filtered or unexported fields
}

BaseActor provides a simple implementation of the Actor interface with a name and a logger. It can be embedded in other structs to create more complex actors. The Name and Logger methods are safe to call from inside Run(), and the setLogger method is used internally by the supervisor to inject a logger into the actor.

func NewBaseActor added in v0.0.20

func NewBaseActor(name string) *BaseActor

NewBaseActor creates a new BaseActor with the given name. The logger is initialized to a no-op logger and will be set by the supervisor when the actor is spawned.

func (*BaseActor) Logger added in v0.0.33

func (a *BaseActor) Logger() *slog.Logger

Logger returns the actor's logger. It is safe to call from inside Run().

func (*BaseActor) Name added in v0.0.20

func (a *BaseActor) Name() string

Name returns the actor's name. It is safe to call from inside Run().

type CallRequest added in v0.0.8

type CallRequest[T any, R any] struct {
	// contains filtered or unexported fields
}

CallRequest wraps a payload with a reply channel for synchronous calls.

func (CallRequest[T, R]) Payload added in v0.0.8

func (r CallRequest[T, R]) Payload() T

Payload returns the request's payload.

func (CallRequest[T, R]) Reply added in v0.0.8

func (r CallRequest[T, R]) Reply(value R, err error)

Reply sends the response back to the caller. The actor should call this exactly once per request.

type CastRequest added in v0.0.8

type CastRequest[T any] struct {
	// contains filtered or unexported fields
}

CastRequest wraps a payload for asynchronous calls without expecting a reply.

func (CastRequest[T]) Payload added in v0.0.8

func (r CastRequest[T]) Payload() T

Payload returns the request's payload.

type Mailbox

type Mailbox struct {
	// contains filtered or unexported fields
}

Mailbox is a thread-safe message queue for actors.

func NewMailbox

func NewMailbox(size int) *Mailbox

NewMailbox creates a new mailbox with the specified buffer size. A size of 0 means unbuffered.

func (*Mailbox) Cap added in v0.0.3

func (m *Mailbox) Cap() int

Cap returns the capacity of the mailbox buffer.

func (*Mailbox) Close

func (m *Mailbox) Close()

Close safely closes the mailbox. Subsequent sends fail.

func (*Mailbox) IsClosed added in v0.0.4

func (m *Mailbox) IsClosed() bool

IsClosed checks if the mailbox has been closed.

func (*Mailbox) Len added in v0.0.3

func (m *Mailbox) Len() int

Len returns the number of messages currently in the mailbox.

func (*Mailbox) Receive

func (m *Mailbox) Receive() <-chan any

Receive returns the read-only channel to consume messages.

type RepliableRequest added in v0.0.9

type RepliableRequest[R any] interface {
	Reply(value R, err error)
}

RepliableRequest represents a request that can be replied to.

type RestartPolicy

type RestartPolicy uint8
const (
	Permanent RestartPolicy = iota // Always restart, even on clean exits
	Transient                      // Restart on errors/panics, but not on clean exits (nil)
	Temporary                      // Never restart
)

type Supervisor

type Supervisor struct {
	*BaseActor
	// contains filtered or unexported fields
}

Supervisor manages the lifecycle of actor Run loops.

func NewSupervisor added in v0.0.7

func NewSupervisor(name string, opts ...SupervisorOption) *Supervisor

NewSupervisor creates a new Supervisor with the given options. Panics if the provided options are invalid.

func (*Supervisor) Run added in v0.0.14

func (s *Supervisor) Run(ctx context.Context) error

Run starts all actors under supervision and blocks until the context is canceled or all actors have stopped.

func (*Supervisor) Spawn added in v0.0.14

func (s *Supervisor) Spawn(ctx context.Context, actor Actor)

Spawn starts the given actor under supervision. It will be restarted according to the supervisor's policy if it returns an error or panics.

func (*Supervisor) Wait

func (s *Supervisor) Wait()

Wait blocks until all supervised actors have stopped.

type SupervisorObserver added in v0.0.28

type SupervisorObserver struct {
	OnActorRegistered    func(actor Actor)
	OnActorStarted       func(actor Actor)
	OnActorStopped       func(actor Actor, err error)
	OnActorRestarting    func(actor Actor, restartCount int, lastErr error)
	OnSupervisorTerminal func(err error)
}

SupervisorObserver allows observing lifecycle events of supervised actors and the supervisor itself. This can be used for logging, monitoring, or triggering side effects based on actor behavior.

type SupervisorOption added in v0.0.7

type SupervisorOption func(*Supervisor)

SupervisorOption configures a Supervisor.

func WithActor added in v0.0.14

func WithActor(actor Actor) SupervisorOption

WithActor adds an actor to be supervised. Can be called multiple times to add multiple actors.

func WithActors added in v0.0.14

func WithActors(actors ...Actor) SupervisorOption

WithActors adds multiple actors to be supervised.

func WithLogger added in v0.0.33

func WithLogger(logger *slog.Logger) SupervisorOption

WithLogger sets a logger for the supervisor.

func WithObserver added in v0.0.28

func WithObserver(observer *SupervisorObserver) SupervisorOption

WithObserver sets a SupervisorObserver to receive lifecycle event notifications for supervised actors and the supervisor itself. This allows external monitoring of actor behavior and supervisor actions.

func WithOnError added in v0.0.7

func WithOnError(handler func(actor Actor, err error)) SupervisorOption

WithOnError sets a callback function that will be called whenever a supervised actor returns an error or panics. The callback receives the actor and the error as arguments.

func WithPolicy added in v0.0.7

func WithPolicy(policy RestartPolicy) SupervisorOption

WithPolicy sets the restart policy.

func WithRestartDelay added in v0.0.7

func WithRestartDelay(d time.Duration) SupervisorOption

WithRestartDelay sets the delay between restarts.

func WithRestartLimit added in v0.0.7

func WithRestartLimit(maxRestarts int, window time.Duration) SupervisorOption

WithRestartLimit sets the maximum number of restarts allowed within a window. Both maxRestarts and window must be positive; otherwise NewSupervisor panics.

Directories

Path Synopsis
mesh module
modbus module
ws module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL