mcpworker

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package mcpworker manages persistent MCP server sessions across a distributed deployment. Each registered MCP server gets a long-lived MCPSessionPool on every node, exposed via NATS request-reply so that PersistentRepo can route tool calls without knowing which node owns the session.

NATS subjects:

mcp.{name}.execute     — request: MCPToolRequest JSON → reply: MCPToolReply JSON
mcp.{name}.list-tools  — request: empty → reply: []mcp.Tool JSON
mcp.servers.created    — event: MCPServer JSON (new server registered)
mcp.servers.deleted    — event: MCPDeletedEvent JSON (server removed)

Index

Constants

View Source
const (
	SubjectCreated = "mcp.servers.created"
	SubjectDeleted = "mcp.servers.deleted"
)

Variables

This section is empty.

Functions

func DecodeListToolsReply

func DecodeListToolsReply(data []byte) ([]runtimetypes.MCPTool, error)

DecodeListToolsReply decodes a NATS reply from mcp.{name}.list-tools.

func DecodeToolReply

func DecodeToolReply(data []byte) (any, error)

DecodeToolReply decodes a NATS reply from mcp.{name}.execute. Used by PersistentRepo — exported so the hooks package can use it.

func SubjectExecute

func SubjectExecute(name string) string

SubjectExecute returns the NATS subject for tool execution on a named MCP server.

func SubjectListTools

func SubjectListTools(name string) string

SubjectListTools returns the NATS subject for listing tools on a named MCP server.

Types

type MCPDeletedEvent

type MCPDeletedEvent struct {
	Name string `json:"name"`
}

MCPDeletedEvent is published on SubjectDeleted.

type MCPToolListReply

type MCPToolListReply struct {
	Tools []runtimetypes.MCPTool `json:"tools"`
	Error string                 `json:"error,omitempty"`
}

MCPToolListReply is returned by mcp.{name}.list-tools.

type MCPToolReply

type MCPToolReply struct {
	Result any    `json:"result,omitempty"`
	Error  string `json:"error,omitempty"`
}

MCPToolReply is the JSON payload returned by a worker.

type MCPToolRequest

type MCPToolRequest struct {
	SessionID string         `json:"session_id,omitempty"` // Contextual isolation key
	Tool      string         `json:"tool"`                 // Empty for list-tools
	Args      map[string]any `json:"args"`
}

MCPToolRequest is the JSON payload sent to mcp.{name}.execute and list-tools.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager keeps a persistent MCPSessionPool per registered MCP server and exposes each via NATS Serve(). It watches lifecycle events so new workers are started and old ones are stopped without restarting the process.

func New

New creates a Manager, loads all MCP server configs from the DB, and starts a persisted session worker for each. It is safe to call in serverapi.New().

func (*Manager) ActiveWorkers

func (m *Manager) ActiveWorkers() []string

func (*Manager) StartWorker

func (m *Manager) StartWorker(ctx context.Context, srv *runtimetypes.MCPServer) error

StartWorker starts a multiplexing worker for a named server. Idempotent: if a worker already exists for that name it is stopped first.

func (*Manager) StopAll

func (m *Manager) StopAll()

StopAll stops all active workers and releases all MCP sessions and subprocesses. Must be called when the process is about to exit to ensure stdio child processes (e.g. npx-spawned MCP servers) are terminated.

func (*Manager) StopWorker

func (m *Manager) StopWorker(ctx context.Context, name string)

StopWorker stops the named worker and closes all its multiplexed sessions.

func (*Manager) WatchEvents

func (m *Manager) WatchEvents(ctx context.Context) error

WatchEvents subscribes to mcp.servers.created and mcp.servers.deleted. When a new server is created via the API on any node, every node picks up the event and starts its own worker (NATS queue groups balance calls across them).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL