Documentation
¶
Overview ¶
Package mcpworker manages persistent MCP server sessions across a distributed deployment. Each registered MCP server gets a long-lived MCPSessionPool on every node, exposed via NATS request-reply so that PersistentRepo can route tool calls without knowing which node owns the session.
NATS subjects:
mcp.{name}.execute — request: MCPToolRequest JSON → reply: MCPToolReply JSON
mcp.{name}.list-tools — request: empty → reply: []mcp.Tool JSON
mcp.servers.created — event: MCPServer JSON (new server registered)
mcp.servers.deleted — event: MCPDeletedEvent JSON (server removed)
Index ¶
- Constants
- func DecodeListToolsReply(data []byte) ([]runtimetypes.MCPTool, error)
- func DecodeToolReply(data []byte) (any, error)
- func SubjectExecute(name string) string
- func SubjectListTools(name string) string
- type MCPDeletedEvent
- type MCPToolListReply
- type MCPToolReply
- type MCPToolRequest
- type Manager
Constants ¶
const ( SubjectCreated = "mcp.servers.created" SubjectDeleted = "mcp.servers.deleted" )
Variables ¶
This section is empty.
Functions ¶
func DecodeListToolsReply ¶
func DecodeListToolsReply(data []byte) ([]runtimetypes.MCPTool, error)
DecodeListToolsReply decodes a NATS reply from mcp.{name}.list-tools.
func DecodeToolReply ¶
DecodeToolReply decodes a NATS reply from mcp.{name}.execute. Used by PersistentRepo — exported so the hooks package can use it.
func SubjectExecute ¶
SubjectExecute returns the NATS subject for tool execution on a named MCP server.
func SubjectListTools ¶
SubjectListTools returns the NATS subject for listing tools on a named MCP server.
Types ¶
type MCPDeletedEvent ¶
type MCPDeletedEvent struct {
Name string `json:"name"`
}
MCPDeletedEvent is published on SubjectDeleted.
type MCPToolListReply ¶
type MCPToolListReply struct {
Tools []runtimetypes.MCPTool `json:"tools"`
Error string `json:"error,omitempty"`
}
MCPToolListReply is returned by mcp.{name}.list-tools.
type MCPToolReply ¶
type MCPToolReply struct {
Result any `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
MCPToolReply is the JSON payload returned by a worker.
type MCPToolRequest ¶
type MCPToolRequest struct {
SessionID string `json:"session_id,omitempty"` // Contextual isolation key
Tool string `json:"tool"` // Empty for list-tools
Args map[string]any `json:"args"`
}
MCPToolRequest is the JSON payload sent to mcp.{name}.execute and list-tools.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager keeps a persistent MCPSessionPool per registered MCP server and exposes each via NATS Serve(). It watches lifecycle events so new workers are started and old ones are stopped without restarting the process.
func New ¶
func New(ctx context.Context, db runtimetypes.Store, messenger libbus.Messenger, tracker libtracker.ActivityTracker) (*Manager, error)
New creates a Manager, loads all MCP server configs from the DB, and starts a persisted session worker for each. It is safe to call in serverapi.New().
func (*Manager) ActiveWorkers ¶
func (*Manager) StartWorker ¶
StartWorker starts a multiplexing worker for a named server. Idempotent: if a worker already exists for that name it is stopped first.
func (*Manager) StopAll ¶
func (m *Manager) StopAll()
StopAll stops all active workers and releases all MCP sessions and subprocesses. Must be called when the process is about to exit to ensure stdio child processes (e.g. npx-spawned MCP servers) are terminated.
func (*Manager) StopWorker ¶
StopWorker stops the named worker and closes all its multiplexed sessions.
func (*Manager) WatchEvents ¶
WatchEvents subscribes to mcp.servers.created and mcp.servers.deleted. When a new server is created via the API on any node, every node picks up the event and starts its own worker (NATS queue groups balance calls across them).