mcpworker

package
v0.28.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package mcpworker keeps MCP server connections alive across chain steps. Chains reach databases, Git hosts, and internal tools through here without reconnecting on every step.

Index

Constants

View Source
const (
	SubjectCreated = "mcp.servers.created"
	SubjectDeleted = "mcp.servers.deleted"
)

Variables

This section is empty.

Functions

func DecodeListToolsReply

func DecodeListToolsReply(data []byte) ([]runtimetypes.MCPTool, error)

DecodeListToolsReply decodes a NATS reply from mcp.{name}.list-tools.

func DecodeToolReply

func DecodeToolReply(data []byte) (any, error)

DecodeToolReply decodes a NATS reply from mcp.{name}.execute. Used by PersistentRepo — exported so the tools package can use it.

func SubjectExecute

func SubjectExecute(name string) string

SubjectExecute returns the NATS subject for tool execution on a named MCP server.

func SubjectListTools

func SubjectListTools(name string) string

SubjectListTools returns the NATS subject for listing tools on a named MCP server.

Types

type MCPDeletedEvent

type MCPDeletedEvent struct {
	Name string `json:"name"`
}

MCPDeletedEvent is published on SubjectDeleted.

type MCPToolListReply

type MCPToolListReply struct {
	Tools []runtimetypes.MCPTool `json:"tools"`
	Error string                 `json:"error,omitempty"`
}

MCPToolListReply is returned by mcp.{name}.list-tools.

type MCPToolReply

type MCPToolReply struct {
	Result any    `json:"result,omitempty"`
	Error  string `json:"error,omitempty"`
}

MCPToolReply is the JSON payload returned by a worker.

type MCPToolRequest

type MCPToolRequest struct {
	SessionID string         `json:"session_id,omitempty"` // Contextual isolation key
	Tool      string         `json:"tool"`                 // Empty for list-tools
	Args      map[string]any `json:"args"`
}

MCPToolRequest is the JSON payload sent to mcp.{name}.execute and list-tools.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager keeps a persistent MCPSessionPool per registered MCP server and exposes each via NATS Serve(). It watches lifecycle events so new workers are started and old ones are stopped without restarting the process.

func New

func (*Manager) ActiveWorkers

func (m *Manager) ActiveWorkers() []string

func (*Manager) StartWorker

func (m *Manager) StartWorker(ctx context.Context, srv *runtimetypes.MCPServer) error

StartWorker starts a multiplexing worker for a named server. Idempotent: if a worker already exists for that name it is stopped first.

func (*Manager) StopAll

func (m *Manager) StopAll()

StopAll stops all active workers and releases all MCP sessions and subprocesses. Must be called when the process is about to exit to ensure stdio child processes (e.g. npx-spawned MCP servers) are terminated.

func (*Manager) StopWorker

func (m *Manager) StopWorker(ctx context.Context, name string)

StopWorker stops the named worker and closes all its multiplexed sessions.

func (*Manager) WatchEvents

func (m *Manager) WatchEvents(ctx context.Context) error

WatchEvents subscribes to mcp.servers.created and mcp.servers.deleted. When a new server is created via the API on any node, every node picks up the event and starts its own worker (NATS queue groups balance calls across them).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL