adapter

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package adapter holds the broker-facing pieces every adapter mode shares: the WebSocket client to the broker (this file), the reconnect/resume protocol (reconnect.go), the mandatory consumer-side dedupe cache (dedupe.go), and the additive --adapter mode dispatch table (mode.go).

The broker is 100% agent-agnostic; all per-mode behaviour (MCP stdio server for generic, claude/channel for cc) is layered ON TOP of this client in Tasks 10/11. Those modes reuse this client and this dedupe — they do not reimplement either.

Transport / frame model (mirrors internal/broker/server.go): each control frame and each Envelope is ONE WebSocket text message holding a single JSON object. coder/websocket is the client lib — the same library the broker and its tests use (pure-Go, no cgo, context-aware).

Index

Constants

View Source
const DefaultDedupeSize = 4096

DefaultDedupeSize is the default bound for the seen-id cache when a non-positive size is requested. It is comfortably larger than any plausible unacked in-flight window.

Variables

View Source
var (
	// ErrNotConnected is returned by send/broadcast/peers when the client
	// has no live broker connection.
	ErrNotConnected = errors.New("adapter: not connected to broker")
	// ErrInboundHMAC is returned (and the message dropped) when an inbound
	// delivered envelope fails HMAC verification — a compromised broker
	// cannot forge a peer, so a bad signature is rejected, never surfaced.
	ErrInboundHMAC = errors.New("adapter: inbound envelope failed HMAC verify")
	// ErrMissingDeliveryKey is returned when the broker sends a deliver frame
	// with an empty delivery_key. Every broker delivery path (direct,
	// broadcast, reconnect drain) MUST set it to the durable per-recipient
	// row key the recipient acks by; an empty key is a protocol violation,
	// not something the adapter silently papers over (a fallback to
	// Envelope.ID would ack the original signed id and never clear a
	// broadcast row, redelivering it forever).
	ErrMissingDeliveryKey = errors.New("adapter: broker deliver frame missing delivery_key")
)

Functions

func Modes

func Modes() []string

Modes returns the sorted list of registered mode names.

func NewGenericBus

func NewGenericBus(ctx context.Context, cfg ClientConfig, dedupeSize int, log *slog.Logger) (mcp.Bus, func())

NewGenericBus wires a broker-backed mcp.Bus over a fresh ResumingClient and starts its reconnect/resume + drain loop in the background. It is the embeddable entry point the generic Mode uses internally and the seam the MCP server integration tests drive (so the full real path — broker WS client + reconnect + shared dedupe + HMAC verify/sign — is exercised, not a fake). Call stop() to cancel the loop and release the broker connection (the adapter's stdio session owns this lifecycle).

func Register

func Register(name string, ctor Constructor)

Register adds (or replaces) the constructor for name. Calling it from an init() makes a new mode available with no edit to any dispatch switch. Re-registration is allowed and last-wins.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a single broker WebSocket connection for one peer. It performs the register handshake, signs/sends outbound envelopes, verifies inbound deliveries end-to-end, and acks only AFTER the host has consumed a message. Reconnect/resume is layered on top in reconnect.go; this type models exactly one connection attempt's lifecycle.

Concurrency: WS writes are serialised by wmu so a delivery-loop ack never interleaves with a host-driven send.

func NewClient

func NewClient(cfg ClientConfig) *Client

NewClient constructs a Client over cfg. It does not dial; call Connect.

func (*Client) Ack

func (c *Client) Ack(ctx context.Context, id string) error

Ack acknowledges that the host has CONSUMED the message with id. It is sent only after consumption: until the broker receives this ack the message stays unacked and WILL be redelivered on reconnect (which the dedupe cache then suppresses). This is the load-bearing ordering of the at-least-once model.

func (*Client) Broadcast

func (c *Client) Broadcast(ctx context.Context, id, ts, source string, body json.RawMessage) error

Broadcast signs and sends a to:* fan-out message. Same field ownership as Send; the broker fans it out to every currently-registered peer except this sender (no backfill).

func (*Client) Close

func (c *Client) Close()

Close tears the connection down (normal closure). Idempotent.

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

Connect dials the broker and performs the register handshake (token + chosen name). The HMAC secret is validated locally but never sent. On success the connection is live and Recv can be pumped; the broker's handshake reply (a wire.Peers frame) plus any immediately-flushed pending deliveries follow on the same connection and are surfaced through Recv like any other frame.

func (*Client) Peers

func (c *Client) Peers(ctx context.Context) (names []string, strays []wire.Deliver, err error)

Peers requests the broker's current registry and returns the peer names. It writes a peers control frame then reads frames until the peers reply arrives. Any deliver frames seen while waiting are returned so the caller does not drop them.

This reads the WS itself, so it MUST NOT be used while a Recv/resume loop is concurrently pumping the same connection (two readers split frames and deadlock). For that case use RequestPeers + SetPeersSink instead (see how genericBus.Peers does it). Safe only on a bare connection with no other reader — e.g. in tests.

func (*Client) Recv

func (c *Client) Recv(ctx context.Context) (wire.Deliver, error)

Recv blocks for the next inbound deliver frame, HMAC-verifies the carried envelope end-to-end, and returns the whole wire.Deliver (so the caller has BOTH the verified envelope and the broker's per-recipient DeliveryKey, which is what an ack must reference — see Ack). The envelope is exactly the bytes the sender signed (the broker never mutates it, even for a broadcast copy: the per-recipient routing identity rides on DeliveryKey, outside the HMAC). A frame that fails verification is REJECTED (never surfaced) — the function returns ErrInboundHMAC with the Deliver still populated so the caller can log/drop and keep pumping. The broker handshake ack is consumed in Connect, so non-deliver frames here are skipped (a peers reply can arrive if Peers raced).

func (*Client) RequestPeers

func (c *Client) RequestPeers(ctx context.Context) error

RequestPeers writes a peers control frame WITHOUT reading the reply (the resume loop is the sole reader and forwards the reply via the peers sink). Use this — not Peers — whenever a Recv loop is concurrently pumping the same connection.

func (*Client) Send

func (c *Client) Send(ctx context.Context, id, to, ts, source string, body json.RawMessage) error

Send signs and sends a direct message to peer `to`. id/ts/source are supplied by the caller (the adapter mode owns id generation); body is opaque application JSON, hashed verbatim.

func (*Client) SetPeersSink

func (c *Client) SetPeersSink(ch chan<- []string)

SetPeersSink installs (or clears, with nil) the channel the Recv loop forwards peers-reply names to. Used by the generic adapter so RequestPeers can get a reply without opening a competing reader on the WS.

type ClientConfig

type ClientConfig struct {
	// URL is the broker ws:// or wss:// endpoint.
	URL string
	// Token is the static bearer token presented at register.
	Token string
	// Name is the unique peer name to bind. Re-registering under the SAME
	// name after a drop is what triggers the broker's same-token takeover
	// + PendingFor flush (see reconnect.go).
	Name string
	// HMACSecret signs every outbound envelope and verifies every inbound
	// one. Must be at least hmac.MinSecretLen bytes.
	HMACSecret []byte
}

ClientConfig is the static configuration for a broker WS client. The HMAC secret is shared out-of-band (never sent on the wire — register carries only the bearer token + chosen name).

type Constructor

type Constructor func(cfg ClientConfig, dedupeSize int) (Mode, error)

Constructor builds a Mode from the static client config. dedupeSize bounds the shared seen-id cache (non-positive => DefaultDedupeSize).

func Resolve

func Resolve(name string) (Constructor, error)

Resolve returns the constructor registered for name, or a clear error (listing the known modes) for an unknown mode.

type Dedupe

type Dedupe struct {
	// contains filtered or unexported fields
}

Dedupe is a bounded, LRU-evicting set of message ids already surfaced to the host. Safe for concurrent use. Seen reports whether an id was already observed and records it; the most-recently-seen ids are kept and the oldest are evicted once the configured bound is exceeded.

func NewDedupe

func NewDedupe(size int) *Dedupe

NewDedupe constructs a Dedupe holding at most size ids. A non-positive size falls back to DefaultDedupeSize.

func (*Dedupe) Len

func (d *Dedupe) Len() int

Len returns the number of ids currently retained (bounded by the configured max). Primarily for tests/observability.

func (*Dedupe) Seen

func (d *Dedupe) Seen(id string) bool

Seen reports whether id was already recorded. On first sighting it records id (evicting the least-recently-seen id if the cache is full) and returns false; on a repeat it refreshes the id's recency and returns true. The caller surfaces the message to the host only when Seen returns false.

type HandlerFunc

type HandlerFunc func(ctx context.Context, env wire.Envelope) error

HandlerFunc consumes one already-deduped, HMAC-verified inbound envelope. It returns nil once the host has CONSUMED the message; the run loop acks only after a nil return (consume-then-ack ordering). A non-nil return means "not consumed" — the message is left unacked and will be redelivered (and re-deduped) on the next reconnect.

type Mode

type Mode interface {
	// Run drives the adapter until ctx is cancelled or a fatal error.
	Run(ctx context.Context) error
	// Name is the mode's --adapter token (for diagnostics).
	Name() string
}

Mode is a running adapter instance. The skeleton's contract is just Run(ctx) -> error so the binary can start it and exit on its return; concrete modes (Task 10/11) embed a ResumingClient and an MCP/channel server behind this same interface.

type ResumingClient

type ResumingClient struct {
	// contains filtered or unexported fields
}

ResumingClient wraps a Client with the reconnect/resume loop and the mandatory shared dedupe cache. Both adapter modes (generic, cc) embed this — neither reimplements reconnect or dedupe.

func NewResumingClient

func NewResumingClient(cfg ClientConfig, dedupeSize int) *ResumingClient

NewResumingClient builds a ResumingClient. dedupeSize bounds the shared seen-id cache (non-positive => DefaultDedupeSize). The same cache instance spans every reconnect — that is exactly what lets it suppress a duplicate that arrives only because of a reconnect redelivery.

func (*ResumingClient) Client

func (rc *ResumingClient) Client() *Client

Client returns the current live underlying Client (for outbound send/broadcast/peers). It may be nil between a drop and a successful redial; callers should treat ErrNotConnected as transient.

func (*ResumingClient) Dedupe

func (rc *ResumingClient) Dedupe() *Dedupe

Dedupe exposes the shared cache so a mode that drains on its own schedule (the generic adapter's bus.drain) filters through the SAME instance the reconnect loop uses.

func (*ResumingClient) Run

Run connects and pumps inbound deliveries through dedupe+handler until ctx is cancelled, transparently reconnecting (same-name re-register => broker takeover + unacked redelivery) on any drop. It blocks; run it in its own goroutine. It returns ctx.Err() on cancellation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL