Documentation
¶
Overview ¶
Package mcpworker keeps MCP server connections alive across chain steps. Chains reach databases, Git hosts, and internal tools through here without reconnecting on every step.
Index ¶
- Constants
- func DecodeListToolsReply(data []byte) ([]runtimetypes.MCPTool, error)
- func DecodeToolReply(data []byte) (any, error)
- func SubjectExecute(name string) string
- func SubjectListTools(name string) string
- type MCPDeletedEvent
- type MCPToolListReply
- type MCPToolReply
- type MCPToolRequest
- type Manager
Constants ¶
const ( SubjectCreated = "mcp.servers.created" SubjectDeleted = "mcp.servers.deleted" )
Variables ¶
This section is empty.
Functions ¶
func DecodeListToolsReply ¶
func DecodeListToolsReply(data []byte) ([]runtimetypes.MCPTool, error)
DecodeListToolsReply decodes a NATS reply from mcp.{name}.list-tools.
func DecodeToolReply ¶
DecodeToolReply decodes a NATS reply from mcp.{name}.execute. Used by PersistentRepo — exported so the tools package can use it.
func SubjectExecute ¶
SubjectExecute returns the NATS subject for tool execution on a named MCP server.
func SubjectListTools ¶
SubjectListTools returns the NATS subject for listing tools on a named MCP server.
Types ¶
type MCPDeletedEvent ¶
type MCPDeletedEvent struct {
Name string `json:"name"`
}
MCPDeletedEvent is published on SubjectDeleted.
type MCPToolListReply ¶
type MCPToolListReply struct {
Tools []runtimetypes.MCPTool `json:"tools"`
Error string `json:"error,omitempty"`
}
MCPToolListReply is returned by mcp.{name}.list-tools.
type MCPToolReply ¶
type MCPToolReply struct {
Result any `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
MCPToolReply is the JSON payload returned by a worker.
type MCPToolRequest ¶
type MCPToolRequest struct {
SessionID string `json:"session_id,omitempty"` // Contextual isolation key
Tool string `json:"tool"` // Empty for list-tools
Args map[string]any `json:"args"`
}
MCPToolRequest is the JSON payload sent to mcp.{name}.execute and list-tools.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager keeps a persistent MCPSessionPool per registered MCP server and exposes each via NATS Serve(). It watches lifecycle events so new workers are started and old ones are stopped without restarting the process.
func New ¶
func New(ctx context.Context, db runtimetypes.Store, messenger libbus.Messenger, tracker libtracker.ActivityTracker) (*Manager, error)
func (*Manager) ActiveWorkers ¶
func (*Manager) StartWorker ¶
StartWorker starts a multiplexing worker for a named server. Idempotent: if a worker already exists for that name it is stopped first.
func (*Manager) StopAll ¶
func (m *Manager) StopAll()
StopAll stops all active workers and releases all MCP sessions and subprocesses. Must be called when the process is about to exit to ensure stdio child processes (e.g. npx-spawned MCP servers) are terminated.
func (*Manager) StopWorker ¶
StopWorker stops the named worker and closes all its multiplexed sessions.
func (*Manager) WatchEvents ¶
WatchEvents subscribes to mcp.servers.created and mcp.servers.deleted. When a new server is created via the API on any node, every node picks up the event and starts its own worker (NATS queue groups balance calls across them).