broker

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package broker is peerbus's long-lived, agent-agnostic message broker.

This file implements the broker core: a WebSocket server, the static bearer-token register handshake, and binding the authenticated peer into the in-memory registry. Routing (direct/broadcast/ack/redelivery) is Task 8 and lives in router.go; this file deliberately stops at "peer is registered and its pending queue has been flushed".

WebSocket library choice: github.com/coder/websocket (formerly nhooyr.io/ websocket). Rationale: pure-Go (no cgo — keeps the modernc.org/sqlite pure-Go build story intact and cross-compilation trivial), minimal API, context-aware reads/writes, actively maintained, and idiomatic net/http.Handler integration that works directly with httptest for the in-process server tests. gorilla/websocket is heavier and its maintenance has been intermittent; coder/websocket is the better fit for a small, embeddable broker.

Frame model: each control frame and each Envelope is sent as ONE WebSocket text message containing a single JSON object. WS frames are already length-delimited so the newline-delimited wire.Codec framing is not layered on top here; the same JSON object shapes (wire.Register/Ack/Peers/Deliver, wire.Envelope) are used verbatim.

Index

Constants

View Source
const (
	EnvListenAddr = "PEERBUS_LISTEN"
	EnvTokens     = "PEERBUS_TOKENS"
	EnvHMACSecret = "PEERBUS_HMAC_SECRET"
	EnvDBPath     = "PEERBUS_DB"
)

Environment variable names. PEERBUS_TOKENS is a comma-separated list.

View Source
const DefaultListenAddr = "127.0.0.1:8080"

DefaultListenAddr is used when neither the struct nor the environment sets a listen address.

Variables

View Source
var ErrNameClaimed = errors.New("broker: peer name already claimed under a different token")

ErrNameClaimed is returned by Bind when the name is already held under a DIFFERENT bearer token (a different-token claim is always rejected; only a same-token claim is a takeover).

Functions

This section is empty.

Types

type Authenticator

type Authenticator struct {
	// contains filtered or unexported fields
}

Authenticator validates static bearer tokens. A peer name is bindable only under a valid token (the registry additionally enforces that a takeover of an existing name must present the SAME token).

The set of accepted tokens is fixed at construction (config/env, see Config); there is no dynamic token issuance in v1.

func NewAuthenticator

func NewAuthenticator(tokens []string) *Authenticator

NewAuthenticator returns an Authenticator over the given accepted tokens.

func (*Authenticator) Valid

func (a *Authenticator) Valid(tok string) bool

Valid reports whether tok matches one of the configured bearer tokens. The comparison is constant-time per candidate so a caller cannot learn a valid token's length/prefix via timing.

type Config

type Config struct {
	// ListenAddr is the TCP address the WS server binds (host:port).
	ListenAddr string
	// Tokens is the set of accepted static bearer tokens. A peer name is
	// bindable only under one of these.
	Tokens []string
	// HMACSecret is the shared end-to-end HMAC-SHA256 secret distributed to
	// peers out-of-band. It is validated against hmac.MinSecretLen.
	HMACSecret []byte
	// DBPath is the durable SQLite store path (":memory:" for ephemeral).
	DBPath string
}

Config holds the broker's runtime configuration.

Configuration sources and precedence (documented, load-bearing):

  1. A Config struct supplied programmatically (defaults / embedding host).
  2. Environment variables, which OVERRIDE any non-empty struct field.

Precedence is "env overrides struct": LoadConfig takes a base Config (which may be the zero value) and, for every recognised PEERBUS_* variable that is set and non-empty, replaces the corresponding field. This lets a deployment ship sane defaults in code while letting the operator override any of them from the environment without a rebuild.

func LoadConfig

func LoadConfig(base Config) (Config, error)

LoadConfig returns the effective configuration: it starts from base and lets any set, non-empty PEERBUS_* environment variable override the matching field (env-overrides-struct precedence — see Config). The HMAC secret is validated against hmac.MinSecretLen; a missing or short secret is an error (the broker refuses to start with a weak end-to-end key). At least one bearer token is required.

type Conn

type Conn interface {
	CloseTakenOver()
}

Conn is the registry's view of a bound peer connection. The concrete WS transport implements it; the registry only needs to be able to close a superseded connection during a same-token takeover.

CloseTakenOver is called by the registry, while holding no lock, on the OLD connection when its name is taken over by a new same-token Bind. It must be safe to call from any goroutine and idempotent.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is the in-memory peer-name → connection registry.

Binding rules (locked by the plan / Technical Details "Auth"):

  • unique-name binding: one live connection per peer name.
  • duplicate-name claim under the SAME token = takeover: the old connection is closed (CloseTakenOver) and the new one takes the name.
  • duplicate-name claim under a DIFFERENT token = reject (ErrNameClaimed); the existing binding is left untouched.

All methods are safe for concurrent use (guarded by a single mutex). The superseded connection is closed OUTSIDE the lock so a slow Close cannot stall other registry operations.

func NewRegistry

func NewRegistry() *Registry

NewRegistry returns an empty Registry.

func (*Registry) Bind

func (r *Registry) Bind(name, token string, conn Conn) (takenOver bool, err error)

Bind associates name with conn under token. On success it returns takenOver=true iff this was a same-token takeover of an existing binding (Bind has already called CloseTakenOver on the superseded connection). A different-token claim returns ErrNameClaimed and does not disturb the existing binding.

func (*Registry) Get

func (r *Registry) Get(name string) (Conn, bool)

Get returns the connection bound to name and ok=true, or ok=false if the name is not currently bound.

func (*Registry) List

func (r *Registry) List() []string

List returns the currently-bound peer names, sorted ascending.

func (*Registry) Remove

func (r *Registry) Remove(name string, conn Conn)

Remove unbinds name iff it is currently bound to conn. The conn-identity guard prevents a stale connection (already superseded by a takeover) from evicting the live binding when it later notices it was closed.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the broker WebSocket server. It is an http.Handler so it can be mounted on any net/http server (production) or httptest server (tests).

func NewServer

func NewServer(auth *Authenticator, reg *Registry, st *store.Store, log *slog.Logger) *Server

NewServer constructs a broker Server over the given authenticator, registry and durable store. log may be nil (a discarding logger is used). Audit is derived from the same store via a single broker-owned Appender (the single-writer invariant the hash chain requires).

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context, addr string) error

ListenAndServe binds cfg.ListenAddr and serves the broker until ctx is cancelled. It is the production entrypoint; tests use httptest with the Server's ServeHTTP directly.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP upgrades the request to WebSocket and runs the connection lifecycle: register handshake → registry bind → pending-queue flush → read loop (routing of post-handshake frames is Task 8; here the loop simply keeps the connection alive and exits cleanly on close/takeover).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL