Documentation
¶
Overview ¶
Package broker is peerbus's long-lived, agent-agnostic message broker.
This file implements the broker core: a WebSocket server, the static bearer-token register handshake, and binding the authenticated peer into the in-memory registry. Routing (direct/broadcast/ack/redelivery) is Task 8 and lives in router.go; this file deliberately stops at "peer is registered and its pending queue has been flushed".
WebSocket library choice: github.com/coder/websocket (formerly nhooyr.io/ websocket). Rationale: pure-Go (no cgo — keeps the modernc.org/sqlite pure-Go build story intact and cross-compilation trivial), minimal API, context-aware reads/writes, actively maintained, and idiomatic net/http.Handler integration that works directly with httptest for the in-process server tests. gorilla/websocket is heavier and its maintenance has been intermittent; coder/websocket is the better fit for a small, embeddable broker.
Frame model: each control frame and each Envelope is sent as ONE WebSocket text message containing a single JSON object. WS frames are already length-delimited so the newline-delimited wire.Codec framing is not layered on top here; the same JSON object shapes (wire.Register/Ack/Peers/Deliver, wire.Envelope) are used verbatim.
Index ¶
Constants ¶
const ( EnvListenAddr = "PEERBUS_LISTEN" EnvTokens = "PEERBUS_TOKENS" EnvHMACSecret = "PEERBUS_HMAC_SECRET" EnvDBPath = "PEERBUS_DB" )
Environment variable names. PEERBUS_TOKENS is a comma-separated list.
const DefaultListenAddr = "127.0.0.1:8080"
DefaultListenAddr is used when neither the struct nor the environment sets a listen address.
Variables ¶
var ErrNameClaimed = errors.New("broker: peer name already claimed under a different token")
ErrNameClaimed is returned by Bind when the name is already held under a DIFFERENT bearer token (a different-token claim is always rejected; only a same-token claim is a takeover).
Functions ¶
This section is empty.
Types ¶
type Authenticator ¶
type Authenticator struct {
// contains filtered or unexported fields
}
Authenticator validates static bearer tokens. A peer name is bindable only under a valid token (the registry additionally enforces that a takeover of an existing name must present the SAME token).
The set of accepted tokens is fixed at construction (config/env, see Config); there is no dynamic token issuance in v1.
func NewAuthenticator ¶
func NewAuthenticator(tokens []string) *Authenticator
NewAuthenticator returns an Authenticator over the given accepted tokens.
func (*Authenticator) Valid ¶
func (a *Authenticator) Valid(tok string) bool
Valid reports whether tok matches one of the configured bearer tokens. The comparison is constant-time per candidate so a caller cannot learn a valid token's length/prefix via timing.
type Config ¶
type Config struct {
// ListenAddr is the TCP address the WS server binds (host:port).
ListenAddr string
// Tokens is the set of accepted static bearer tokens. A peer name is
// bindable only under one of these.
Tokens []string
// HMACSecret is the shared end-to-end HMAC-SHA256 secret distributed to
// peers out-of-band. It is validated against hmac.MinSecretLen.
HMACSecret []byte
// DBPath is the durable SQLite store path (":memory:" for ephemeral).
DBPath string
}
Config holds the broker's runtime configuration.
Configuration sources and precedence (documented, load-bearing):
- A Config struct supplied programmatically (defaults / embedding host).
- Environment variables, which OVERRIDE any non-empty struct field.
Precedence is "env overrides struct": LoadConfig takes a base Config (which may be the zero value) and, for every recognised PEERBUS_* variable that is set and non-empty, replaces the corresponding field. This lets a deployment ship sane defaults in code while letting the operator override any of them from the environment without a rebuild.
func LoadConfig ¶
LoadConfig returns the effective configuration: it starts from base and lets any set, non-empty PEERBUS_* environment variable override the matching field (env-overrides-struct precedence — see Config). The HMAC secret is validated against hmac.MinSecretLen; a missing or short secret is an error (the broker refuses to start with a weak end-to-end key). At least one bearer token is required.
type Conn ¶
type Conn interface {
CloseTakenOver()
}
Conn is the registry's view of a bound peer connection. The concrete WS transport implements it; the registry only needs to be able to close a superseded connection during a same-token takeover.
CloseTakenOver is called by the registry, while holding no lock, on the OLD connection when its name is taken over by a new same-token Bind. It must be safe to call from any goroutine and idempotent.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is the in-memory peer-name → connection registry.
Binding rules (locked by the plan / Technical Details "Auth"):
- unique-name binding: one live connection per peer name.
- duplicate-name claim under the SAME token = takeover: the old connection is closed (CloseTakenOver) and the new one takes the name.
- duplicate-name claim under a DIFFERENT token = reject (ErrNameClaimed); the existing binding is left untouched.
All methods are safe for concurrent use (guarded by a single mutex). The superseded connection is closed OUTSIDE the lock so a slow Close cannot stall other registry operations.
func (*Registry) Bind ¶
Bind associates name with conn under token. On success it returns takenOver=true iff this was a same-token takeover of an existing binding (Bind has already called CloseTakenOver on the superseded connection). A different-token claim returns ErrNameClaimed and does not disturb the existing binding.
func (*Registry) Get ¶
Get returns the connection bound to name and ok=true, or ok=false if the name is not currently bound.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the broker WebSocket server. It is an http.Handler so it can be mounted on any net/http server (production) or httptest server (tests).
func NewServer ¶
NewServer constructs a broker Server over the given authenticator, registry and durable store. log may be nil (a discarding logger is used). Audit is derived from the same store via a single broker-owned Appender (the single-writer invariant the hash chain requires).
func (*Server) ListenAndServe ¶
ListenAndServe binds cfg.ListenAddr and serves the broker until ctx is cancelled. It is the production entrypoint; tests use httptest with the Server's ServeHTTP directly.
func (*Server) ServeHTTP ¶
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP upgrades the request to WebSocket and runs the connection lifecycle: register handshake → registry bind → pending-queue flush → read loop (routing of post-handshake frames is Task 8; here the loop simply keeps the connection alive and exits cleanly on close/takeover).