Documentation
¶
Overview ¶
Package mcp is the stdio MCP server for the generic adapter, exposing the bus.* tools (bus.send / bus.broadcast / bus.peers / bus.drain).
Transport / dependency choice (documented per Task 10):
peerbus has NO MCP Go SDK as a dependency and we deliberately do not add one. The MCP wire protocol we need is a thin slice of JSON-RPC 2.0 over stdio — exactly three request methods (initialize, tools/list, tools/call) plus the initialized notification — so the minimal server is implemented directly here. Adding a heavy SDK to cover four tools would be unjustified weight for an open-source adapter binary; the protocol is small and stable enough to own. This keeps the binary dependency-light (rationale mirrors the coder/websocket choice documented in internal/broker/server.go).
Framing: MCP stdio transport frames each JSON-RPC message. We accept both supported framings on input and emit newline-delimited JSON on output:
- newline-delimited JSON (one compact JSON object per line) — the framing this server emits and the common stdio framing; and
- LSP-style "Content-Length: N\r\n\r\n<body>" headers — accepted on input for hosts that prefer it.
Concurrency: requests are handled one at a time off a single stdin reader (a stdio MCP server has exactly one peer, the host). Tool calls may block (bus.drain waits for the broker round-trip the host asked for); that is intentional and matches the host's synchronous tools/call expectation.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bus ¶
type Bus interface {
// Send signs (via the shared client's HMAC) and sends a direct message
// to peer `to`. body is opaque application JSON, hashed verbatim.
Send(ctx context.Context, to string, body json.RawMessage) error
// Broadcast signs and fans a message out to every currently-registered
// peer except this one (no backfill).
Broadcast(ctx context.Context, body json.RawMessage) error
// Peers returns this adapter's bound peer name (self) AND the
// broker's current peer registry sans self. bus.peers shapes the
// result as {self, peers} so the consuming agent immediately knows
// its own bus identity without a separate bus.whoami round-trip.
Peers(ctx context.Context) (self string, peers []string, err error)
// Drain returns every inbound message buffered since the last drain —
// already HMAC-verified and already filtered through the SHARED dedupe
// cache — and acks each one back to the broker. A repeat delivery of an
// id the host already drained is suppressed by the shared dedupe and
// never reappears here.
Drain(ctx context.Context) ([]InboundMessage, error)
}
Bus is the broker-facing behaviour the MCP tools delegate to. The generic adapter implements it over the shared resuming broker client + shared dedupe cache (internal/adapter/generic.go) — the MCP layer never talks to the broker or touches HMAC/dedupe itself; it is purely the JSON-RPC tool surface. Keeping this an interface keeps the server unit-testable with a fake bus and keeps the "reuse, do not reimplement" boundary explicit.
type InboundMessage ¶
type InboundMessage struct {
ID string `json:"id"`
From string `json:"from"`
Source string `json:"source"`
Body json.RawMessage `json:"body"`
}
InboundMessage is one drained message as the host sees it. body is the opaque application JSON verbatim; source/from carry the envelope's provenance (source is e.g. "peer-bus" — the tag the consuming agent's own prompt keys its escalation policy off; peerbus itself has no such logic).
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the minimal stdio MCP server. It owns the JSON-RPC framing and dispatch; all bus behaviour is delegated to the injected Bus (the generic adapter wires a broker-backed Bus in internal/adapter/generic.go).
func NewServer ¶
NewServer builds a Server reading framed JSON-RPC from in and writing newline-delimited JSON-RPC to out, delegating tool calls to bus. Options are additive; with none it is the original generic-adapter server.
func (*Server) Initialized ¶ added in v0.3.0
func (s *Server) Initialized() <-chan struct{}
Initialized returns a channel that is closed once the client has sent the MCP notifications/initialized message (the standard MCP handshake completion signal). The channel is lazily allocated and idempotent: every caller observes the same channel and a second initialized notification is a no-op. The cc adapter waits on this before emitting its claude/channel self-announce notification — Claude Code silently drops server-initiated notifications that arrive before the client has signalled initialized (CHANNELS_SCHEMA.md §3). The generic adapter never asks and is unaffected.
func (*Server) Notify ¶
Notify emits a server -> client JSON-RPC notification (no id). It shares the Server's single serialised writer, so a push never interleaves with a tools/call reply. This is the additive server->client path the cc adapter's claude/channel push uses; the generic adapter never calls it.
func (*Server) Serve ¶
Serve runs the read/dispatch loop until ctx is cancelled, stdin reaches EOF, or an unrecoverable framing error occurs. A clean EOF (host closed the pipe) returns nil; ctx cancellation returns ctx.Err().
readMessage blocks on stdin and is NOT itself ctx-aware, so the loop must not call it inline and only check ctx at the top — on SIGTERM with stdin held open that would hang until SIGKILL (the adapter shutdown-hang bug). Instead each read runs in a goroutine feeding a channel and Serve selects on ctx.Done(); on cancellation it closes the underlying input (os.Stdin) so the in-flight read unblocks, then returns ctx.Err() promptly. The per-read goroutine is safe because reads are strictly sequential (one peer, one outstanding read at a time) — the next read is only started after the previous result is consumed.
type ServerOption ¶
type ServerOption func(*Server)
ServerOption configures a Server at construction. Options are purely additive — with no options the Server behaves exactly as the generic adapter always has (tools-only capability, generic serverInfo name).
func WithChannelCapability ¶
func WithChannelCapability() ServerOption
WithChannelCapability advertises experimental["claude/channel"]={} in the initialize result's `capabilities` object (in addition to the always-present `tools`). The cc adapter uses this so Claude Code registers its claude/channel push-wake listener. The generic adapter never sets it (tools-only).
func WithServerName ¶
func WithServerName(name string) ServerOption
WithServerName overrides the serverInfo.name advertised at initialize.
func WithoutDrain ¶
func WithoutDrain() ServerOption
WithoutDrain omits bus.drain from tools/list. The cc adapter (push-driven via claude/channel notifications) uses this so it does not advertise a no-op drain tool.