cluster

package
v0.0.36 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package cluster implements llmbox's hub-and-spoke model: a single hub (the MCP front-end the chatbot talks to) drives box operations on one or more spokes, each of which owns a local Docker daemon. A spoke dials the hub over a WebSocket and the hub pushes box verbs down that connection; the spoke executes them against its local *docker.Manager and replies.

The wire surface is deliberately the seven box verbs (BoxManager) and nothing more: a spoke is never a generic Docker proxy. See docs/hub-and-spoke.md.

Index

Constants

This section is empty.

Variables

View Source
var ErrEnrollRejected = errors.New("enrollment rejected by hub")

ErrEnrollRejected reports that the hub refused enrollment (bad/expired/used join token, or a credential that no longer matches). It is terminal: retrying with the same token will not succeed, so a caller should stop rather than reconnect.

Functions

func CreateJoinToken

func CreateJoinToken(store Store, name string, ttl time.Duration, now time.Time) (string, error)

CreateJoinToken mints a one-time join token for a spoke name, stores its hash with the given expiry, and returns the plaintext token to show the operator once. ttl<=0 is rejected so a token always expires.

@arg store The cluster store to persist the token in. @arg name The spoke name baked into the token; required. @arg ttl How long the token stays valid; must be positive. @arg now The current time (for the expiry). @return string The plaintext join token (shown once, never recoverable). @error error if the name is empty, ttl is non-positive, the secret cannot be generated, or the store write fails.

@testcase TestCreateJoinTokenStoresHash stores the token hash and returns a usable secret. @testcase TestCreateJoinTokenRejectsEmptyName rejects an empty spoke name. @testcase TestCreateJoinTokenRejectsTTL rejects a non-positive ttl.

func Run

func Run(ctx context.Context, dial Dialer, mgr BoxManager, joinToken string, creds *Credentials, save func(Credentials) error, policy ValidationPolicy) error

Run connects a spoke to the hub and serves box verbs against mgr until ctx is cancelled or the connection drops. It enrolls using joinToken when creds is nil; otherwise it reconnects with the saved creds. On first enrollment it invokes save with the minted credentials so the caller can persist them. Run returns when the connection ends; the caller decides whether to retry.

@arg ctx Context whose cancellation stops the spoke. @arg dial The dialer establishing the transport to the hub. @arg mgr The local box manager verbs are executed against. @arg joinToken The one-time join token for first enrollment; ignored when creds is set. @arg creds Saved credentials for reconnect; nil for first enrollment. @arg save Callback invoked with freshly minted credentials on first enrollment; may be nil. @arg policy The admission policy the spoke applies to box-creation requests. @error error if dialing, enrollment, or the serve loop fails.

@testcase TestSpokeRunEnrollsAndServes enrolls with a join token and serves a verb. @testcase TestSpokeRunReconnectsWithCreds reconnects using saved credentials. @testcase TestSpokeRunEnrollRejected returns the hub's rejection error.

Types

type BoxManager

type BoxManager interface {
	Create(ctx context.Context, opts docker.CreateOptions) (id, authorizeURL string, err error)
	SubmitCode(ctx context.Context, idOrName, code string) (sessionURL string, err error)
	List(ctx context.Context) ([]docker.Box, error)
	Destroy(ctx context.Context, idOrName string) error
	Logs(ctx context.Context, idOrName string, tail int) (string, error)
	Exec(ctx context.Context, idOrName string, cmd []string) (docker.ExecResult, error)
	ReapOrphans(ctx context.Context, ttl time.Duration) ([]string, error)
}

BoxManager is the box-lifecycle surface the hub needs from a spoke. The local in-process implementation is *docker.Manager; the remote implementation (remoteSpoke) round-trips each call over a transport to a spoke process. It is the complete RPC allowlist of the cluster protocol — no operation outside it can cross the hub/spoke boundary.

type Credentials

type Credentials struct {
	Name       string `json:"name"`
	Credential string `json:"credential"`
}

Credentials is a spoke's persisted enrollment state: the name the hub assigned and the bearer credential it minted. A spoke saves this after first enrollment and presents it to reconnect without the (one-time) join token.

type Dialer

type Dialer func(ctx context.Context) (transport, error)

Dialer establishes a transport to the hub. It exists so Run can be tested over an in-memory transport instead of a real WebSocket.

func WebSocketDialer

func WebSocketDialer(url string) Dialer

WebSocketDialer dials the hub's spoke-connect URL over a WebSocket.

SECURITY — transport confidentiality and integrity are the DEPLOYMENT's responsibility, not this code's. The enrollment handshake sends the one-time join token and then the long-lived bearer credential in the first frames, and every box verb (including the user's code and session details) flows over this socket. Use wss:// in production and terminate TLS at a trusted reverse proxy in front of the hub; a ws:// URL sends the credential and all traffic in cleartext and MUST only be used on a fully trusted network (e.g. loopback or a private mesh). This dialer accepts whichever scheme the operator passes — it does not (and cannot) verify the link is encrypted. The bearer credential is a static secret presented verbatim on every reconnect, so anyone who captures it (on the wire or at rest) can impersonate this spoke until it is revoked.

@arg url The hub's spoke-connect URL (ws:// or wss://). @return Dialer A dialer that opens a WebSocket transport to that URL.

@testcase TestSpokeRunEnrollsAndServes uses an in-memory dialer in place of this.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is the hub-side of the cluster: it accepts spoke connections on an HTTP route, authenticates their enrollment, and keeps a registry of connected spokes that the server routes box verbs to. A spoke connection is one long-lived WebSocket; the hub pushes verb requests down it via a remoteSpoke.

func NewHub

func NewHub(ctx context.Context, store Store, now func() time.Time, log *slog.Logger) *Hub

NewHub builds a Hub over the given store. ctx bounds the lifetime of accepted spoke connections (cancelling it closes them). now defaults to time.Now and log to slog.Default when nil.

@arg ctx Base context; its cancellation closes all spoke connections. @arg store The cluster store holding join tokens and enrolled spokes. @arg now Clock for token-expiry checks; nil uses time.Now. @arg log Logger for connection lifecycle; nil uses slog.Default. @return *Hub A ready hub with an empty connected-spoke registry.

@testcase TestHubEnrollAndRoute enrolls a spoke and routes a verb to it.

func (*Hub) ConnectHandler

func (h *Hub) ConnectHandler(w http.ResponseWriter, r *http.Request)

ConnectHandler is the HTTP handler for the spoke connection route (/spoke/connect). It upgrades to a WebSocket, performs the enrollment handshake, registers the spoke, and serves verb requests over the connection until it drops or the hub's context is cancelled.

SECURITY — like the spoke dialer (see WebSocketDialer), this endpoint relies on the DEPLOYMENT for transport security. The route is unauthenticated until the enrollment frame arrives (the handshake IS the auth), so it must be served over TLS (terminate wss:// at a trusted reverse proxy in front of the hub) so the join token / bearer credential a spoke presents are not exposed on the wire. The credential is compared timing-safely against a stored hash, but it is a static bearer secret: protect it in transit and at rest, and revoke a spoke if it may have leaked.

@arg w The response writer (upgraded to a WebSocket). @arg r The upgrade request.

@testcase TestHubEnrollAndRoute drives a real spoke through this handler over loopback. @testcase TestHubRejectsBadEnrollment closes the connection when enrollment is rejected.

func (*Hub) Disconnect

func (h *Hub) Disconnect(name string)

Disconnect force-closes the live connection for a named spoke, if any, so the spoke is dropped immediately (e.g. after an admin revokes its enrollment). The read loop tears down and unregisters the connection as a result; disconnecting an unknown or already-gone spoke is a no-op. It does not delete the spoke's enrolled record — the caller does that so the spoke cannot simply reconnect.

@arg name The spoke name whose live connection should be closed.

@testcase TestHubDisconnectClosesConnection closes a connected spoke's link.

func (*Hub) Spoke

func (h *Hub) Spoke(name string) (BoxManager, bool)

Spoke returns the connected spoke with the given name as a BoxManager.

@arg name The spoke name. @return BoxManager The connected spoke, or nil when not connected. @return bool True when a spoke with that name is currently connected.

@testcase TestHubEnrollAndRoute looks up the enrolled spoke by name.

func (*Hub) Spokes

func (h *Hub) Spokes() map[string]BoxManager

Spokes returns a snapshot of the currently connected spokes keyed by name.

@return map[string]BoxManager One entry per connected spoke.

@testcase TestHubEnrollAndRoute lists the connected spokes after enrollment.

type JoinTokenInfo

type JoinTokenInfo struct {
	ID        string
	Name      string
	ExpiresAt time.Time
}

JoinTokenInfo describes an outstanding join token for listing/revocation. ID is the token's hash (an opaque handle the operator can revoke by); the secret is never recoverable.

type JoinTokenRecord

type JoinTokenRecord struct {
	Name      string    `json:"name"`
	ExpiresAt time.Time `json:"expires_at"`
}

JoinTokenRecord is the stored form of a one-time join token: the spoke name baked into it and when it expires. The secret itself is not stored (only its hash, which is the key).

type SpokeRecord

type SpokeRecord struct {
	Name           string    `json:"name"`
	CredentialHash string    `json:"credential_hash"`
	EnrolledAt     time.Time `json:"enrolled_at"`
}

SpokeRecord is an enrolled spoke: its name, the hash of its bearer credential, and when it enrolled.

type Store

type Store interface {
	// PutJoinToken stores a join token record keyed by the hash of its secret.
	PutJoinToken(hash string, rec JoinTokenRecord) error
	// TakeJoinToken atomically reads and removes the record for a token hash
	// (one-time use); found is false when no token matches.
	TakeJoinToken(hash string) (rec JoinTokenRecord, found bool, err error)
	// ListJoinTokens returns every outstanding join token (its hash as an opaque
	// ID, its spoke name, and its expiry — never the secret, which is not stored).
	ListJoinTokens() ([]JoinTokenInfo, error)
	// DeleteJoinToken removes a join token by its hash ID; deleting a missing one
	// is a no-op.
	DeleteJoinToken(hash string) error
	// PutSpoke stores (creating or replacing) an enrolled spoke keyed by name.
	PutSpoke(name string, rec SpokeRecord) error
	// GetSpoke returns the spoke record for name; found is false when none matches.
	GetSpoke(name string) (rec SpokeRecord, found bool, err error)
	// ListSpokes returns every enrolled spoke.
	ListSpokes() ([]SpokeRecord, error)
	// DeleteSpoke removes an enrolled spoke; deleting a missing name is a no-op.
	DeleteSpoke(name string) error
}

Store persists cluster enrollment state: one-time join tokens and the per-spoke bearer credentials minted from them. Secrets are only ever stored hashed (the plaintext join token is shown to the operator once; the plaintext credential is held only by the spoke). All methods must be safe for concurrent use. The bolt-backed implementation lives in the server package.

type ValidationPolicy

type ValidationPolicy struct {
	// AllowedImages is the set of images the spoke will launch. Empty means no
	// image restriction (any image the hub sends is accepted).
	AllowedImages []string
}

ValidationPolicy is the spoke-side admission policy applied to box-creation requests arriving over the wire — defense-in-depth on top of the verb allowlist, so a spoke validates inputs itself rather than trusting the hub. The box-id format is always enforced, and so is the presence of an image: a spoke holds no default image of its own, so the hub must name one on every create. AllowedImages, when non-empty, further restricts which images the spoke will launch.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL