mcp

package
v1.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CodeParseError     = -32700
	CodeInvalidRequest = -32600
	CodeMethodNotFound = -32601
	CodeInvalidParams  = -32602
	CodeInternalError  = -32603
	CodeServerError    = -32000
)

Standard JSON-RPC 2.0 error codes.

Variables

View Source
var (
	ErrStreamNotFound      = errors.New("stream not found")
	ErrEventNotFound       = errors.New("event not found")
	ErrStreamEventTooLarge = errors.New("stream event too large")
)
View Source
var ErrSessionNotFound = errors.New("session not found")

ErrSessionNotFound is returned when a session ID does not exist in the store.

View Source
var ErrTaskNotFound = errors.New("task not found")

ErrTaskNotFound is returned when a task does not exist in the caller's session scope.

View Source
var ErrTaskTerminal = errors.New("task already terminal")

ErrTaskTerminal is returned when a mutation targets a task that is already in a terminal state.

Functions

func MarshalResponse

func MarshalResponse(resp *Response) ([]byte, error)

MarshalResponse serializes a JSON-RPC 2.0 response to bytes. It ensures the jsonrpc field is always set to "2.0".

Types

type CapabilityConfig added in v1.6.0

type CapabilityConfig struct {
	Tools       bool
	Resources   bool
	Prompts     bool
	Completions bool
	Tasks       bool
}

CapabilityConfig controls which implemented MCP server surfaces may be advertised during initialize.

The config is intentionally limited to surfaces that AppTheory currently implements. Unsupported MCP sub-capabilities such as listChanged and tasks are omitted until their concrete hooks exist. Resource subscription and logging are also omitted until AppTheory has a first-class outbound notification contract for notifications/resources/updated and notifications/message. Completion and task support are advertised only when their explicit hooks or stores are configured. That keeps capability negotiation fail-closed instead of allowing callers to overclaim unsupported behavior.

func DefaultCapabilityConfig added in v1.6.0

func DefaultCapabilityConfig() CapabilityConfig

DefaultCapabilityConfig returns the default MCP capability policy.

A surface is still advertised only when it is actually present on the server: tools require at least one registered tool, resources require at least one registered resource, and prompts require at least one registered prompt.

type Completion added in v1.6.0

type Completion struct {
	Values  []string `json:"values"`
	Total   *int     `json:"total,omitempty"`
	HasMore *bool    `json:"hasMore,omitempty"`
}

Completion contains completion values returned to the client.

type CompletionArgument added in v1.6.0

type CompletionArgument struct {
	Name  string `json:"name"`
	Value string `json:"value"`
}

CompletionArgument identifies the argument currently being completed.

type CompletionContext added in v1.6.0

type CompletionContext struct {
	Arguments map[string]string `json:"arguments,omitempty"`
}

CompletionContext carries optional previously-resolved arguments.

type CompletionHook added in v1.6.0

type CompletionHook func(ctx context.Context, req CompletionRequest) (*CompletionResult, error)

CompletionHook handles a prompt or resource completion request.

type CompletionRef added in v1.6.0

type CompletionRef struct {
	Type string `json:"type"`
	Name string `json:"name,omitempty"`
	URI  string `json:"uri,omitempty"`
}

CompletionRef identifies the prompt or resource template being completed.

type CompletionRequest added in v1.6.0

type CompletionRequest struct {
	SessionID string             `json:"sessionId"`
	Ref       CompletionRef      `json:"ref"`
	Argument  CompletionArgument `json:"argument"`
	Context   CompletionContext  `json:"context,omitempty"`
}

CompletionRequest is passed to completion hooks.

type CompletionResult added in v1.6.0

type CompletionResult struct {
	Completion Completion `json:"completion"`
}

CompletionResult is the MCP completion/complete result.

type ContentBlock

type ContentBlock struct {
	Type string `json:"type"` // "text", "image", "audio", "resource_link", "resource"

	// Text content (type = "text").
	Text string `json:"text,omitempty"`

	// Image/audio content (type = "image" or "audio").
	Data     string `json:"data,omitempty"`     // base64-encoded
	MimeType string `json:"mimeType,omitempty"` // e.g. "image/png"

	// Resource link content (type = "resource_link").
	URI         string `json:"uri,omitempty"`
	Name        string `json:"name,omitempty"`
	Title       string `json:"title,omitempty"`
	Description string `json:"description,omitempty"`
	Size        int64  `json:"size,omitempty"`

	// Embedded resource content (type = "resource").
	Resource *ResourceContent `json:"resource,omitempty"`
}

type CreateTaskResult added in v1.6.0

type CreateTaskResult struct {
	Meta map[string]any `json:"_meta,omitempty"`
	Task Task           `json:"task"`
}

CreateTaskResult is returned when AppTheory accepts a task-augmented request.

type DynamoSessionStore

type DynamoSessionStore struct {
	// contains filtered or unexported fields
}

DynamoSessionStore implements SessionStore using DynamoDB via TableTheory.

func (*DynamoSessionStore) Delete

func (d *DynamoSessionStore) Delete(ctx context.Context, id string) error

Delete removes a session by ID.

func (*DynamoSessionStore) Get

func (d *DynamoSessionStore) Get(ctx context.Context, id string) (*Session, error)

Get retrieves a session by ID. Returns ErrSessionNotFound if the session does not exist.

func (*DynamoSessionStore) Put

func (d *DynamoSessionStore) Put(ctx context.Context, session *Session) error

Put stores a session. Upserts any existing session with the same ID so sliding session refreshes update the TTL instead of failing on duplicate keys.

type DynamoStreamStore added in v0.20.0

type DynamoStreamStore struct {
	// contains filtered or unexported fields
}

DynamoStreamStore implements StreamStore using DynamoDB via TableTheory.

Subscribe uses strongly consistent reads plus short polling so separate server instances can replay and continue an active stream from shared state. Replay is bounded by each event record's ExpiresAt value at runtime; DynamoDB TTL and S3 lifecycle cleanup are backstops, not access checks. The strongest DeleteSession/Append race protection is enabled when the provided TableTheory DB implements TransactWrite, which is the production TableTheory path.

func (*DynamoStreamStore) Append added in v0.20.0

func (d *DynamoStreamStore) Append(ctx context.Context, sessionID, streamID string, data json.RawMessage) (string, error)

func (*DynamoStreamStore) Close added in v0.20.0

func (d *DynamoStreamStore) Close(ctx context.Context, sessionID, streamID string) error

func (*DynamoStreamStore) Create added in v0.20.0

func (d *DynamoStreamStore) Create(ctx context.Context, sessionID string) (string, error)

func (*DynamoStreamStore) DeleteSession added in v0.20.0

func (d *DynamoStreamStore) DeleteSession(ctx context.Context, sessionID string) error

func (*DynamoStreamStore) StreamForEvent added in v0.20.0

func (d *DynamoStreamStore) StreamForEvent(ctx context.Context, sessionID, eventID string) (string, error)

func (*DynamoStreamStore) Subscribe added in v0.20.0

func (d *DynamoStreamStore) Subscribe(ctx context.Context, sessionID, streamID, afterEventID string) (<-chan StreamEvent, error)

type DynamoTaskStore added in v1.6.0

type DynamoTaskStore struct {
	// contains filtered or unexported fields
}

DynamoTaskStore implements TaskStore using DynamoDB via TableTheory.

func (*DynamoTaskStore) Cancel added in v1.6.0

func (d *DynamoTaskStore) Cancel(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)

func (*DynamoTaskStore) Create added in v1.6.0

func (d *DynamoTaskStore) Create(ctx context.Context, task TaskRecord) (*TaskRecord, error)

func (*DynamoTaskStore) DeleteSession added in v1.6.0

func (d *DynamoTaskStore) DeleteSession(ctx context.Context, sessionID string) error

func (*DynamoTaskStore) Get added in v1.6.0

func (d *DynamoTaskStore) Get(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)

func (*DynamoTaskStore) List added in v1.6.0

func (*DynamoTaskStore) Update added in v1.6.0

func (d *DynamoTaskStore) Update(ctx context.Context, task TaskRecord) (*TaskRecord, error)

type Icon added in v0.14.1

type Icon struct {
	Src      string   `json:"src"`
	MimeType string   `json:"mimeType,omitempty"`
	Sizes    []string `json:"sizes,omitempty"`
	Theme    string   `json:"theme,omitempty"` // "light" or "dark"
}

type InitialSessionListenerBudgetOptions added in v0.25.0

type InitialSessionListenerBudgetOptions struct {
	SafetyBuffer time.Duration
	MaxDuration  time.Duration
}

InitialSessionListenerBudgetOptions configures how the initial GET /mcp session listener is capped against the Lambda remaining-time budget.

The option is explicit opt-in and applies only when GET /mcp is used without Last-Event-ID (the keepalive listener path). Resume/replay GET requests keep their existing behavior.

Zero values use the framework defaults:

  • SafetyBuffer: 5s
  • MaxDuration: 25s

type LoggingLevel added in v1.6.0

type LoggingLevel string

LoggingLevel is an MCP logging level.

const (
	LoggingLevelDebug     LoggingLevel = "debug"
	LoggingLevelInfo      LoggingLevel = "info"
	LoggingLevelNotice    LoggingLevel = "notice"
	LoggingLevelWarning   LoggingLevel = "warning"
	LoggingLevelError     LoggingLevel = "error"
	LoggingLevelCritical  LoggingLevel = "critical"
	LoggingLevelAlert     LoggingLevel = "alert"
	LoggingLevelEmergency LoggingLevel = "emergency"
)

type LoggingLevelHook added in v1.6.0

type LoggingLevelHook func(ctx context.Context, req LoggingLevelRequest) error

LoggingLevelHook handles a logging/setLevel request.

type LoggingLevelRequest added in v1.6.0

type LoggingLevelRequest struct {
	SessionID string       `json:"sessionId"`
	Level     LoggingLevel `json:"level"`
}

LoggingLevelRequest identifies a logging/setLevel request for a session.

type MemorySessionStore

type MemorySessionStore struct {
	// contains filtered or unexported fields
}

MemorySessionStore is an in-memory SessionStore for testing and local development.

func NewMemorySessionStore

func NewMemorySessionStore(opts ...MemorySessionStoreOption) *MemorySessionStore

NewMemorySessionStore creates an in-memory session store.

func (*MemorySessionStore) Delete

func (m *MemorySessionStore) Delete(_ context.Context, id string) error

Delete removes a session by ID. It is a no-op if the session does not exist.

func (*MemorySessionStore) Get

Get retrieves a session by ID. It returns ErrSessionNotFound if the session does not exist or has expired.

func (*MemorySessionStore) Put

func (m *MemorySessionStore) Put(_ context.Context, session *Session) error

Put stores a session. It overwrites any existing session with the same ID.

type MemorySessionStoreOption

type MemorySessionStoreOption func(*MemorySessionStore)

MemorySessionStoreOption configures a MemorySessionStore.

func WithClock

WithClock sets the clock used for TTL expiration checks.

type MemoryStreamStore added in v0.11.1

type MemoryStreamStore struct {
	// contains filtered or unexported fields
}

MemoryStreamStore is an in-memory StreamStore for testing and local development.

It assigns monotonically increasing numeric event IDs per session (as strings).

func NewMemoryStreamStore added in v0.11.1

func NewMemoryStreamStore(opts ...MemoryStreamStoreOption) *MemoryStreamStore

func (*MemoryStreamStore) Append added in v0.11.1

func (m *MemoryStreamStore) Append(_ context.Context, sessionID, streamID string, data json.RawMessage) (string, error)

func (*MemoryStreamStore) Close added in v0.11.1

func (m *MemoryStreamStore) Close(_ context.Context, sessionID, streamID string) error

func (*MemoryStreamStore) Create added in v0.11.1

func (m *MemoryStreamStore) Create(_ context.Context, sessionID string) (string, error)

func (*MemoryStreamStore) DeleteSession added in v0.11.1

func (m *MemoryStreamStore) DeleteSession(_ context.Context, sessionID string) error

func (*MemoryStreamStore) StreamForEvent added in v0.11.1

func (m *MemoryStreamStore) StreamForEvent(_ context.Context, sessionID, eventID string) (string, error)

func (*MemoryStreamStore) Subscribe added in v0.11.1

func (m *MemoryStreamStore) Subscribe(ctx context.Context, sessionID, streamID, afterEventID string) (<-chan StreamEvent, error)

type MemoryStreamStoreOption added in v0.11.1

type MemoryStreamStoreOption func(*MemoryStreamStore)

func WithStreamIDGenerator added in v0.11.1

func WithStreamIDGenerator(gen apptheory.IDGenerator) MemoryStreamStoreOption

type MemoryTaskStore added in v1.6.0

type MemoryTaskStore struct {
	// contains filtered or unexported fields
}

MemoryTaskStore is an in-memory TaskStore for testing and local development.

func NewMemoryTaskStore added in v1.6.0

func NewMemoryTaskStore() *MemoryTaskStore

NewMemoryTaskStore creates an empty in-memory task store.

func (*MemoryTaskStore) Cancel added in v1.6.0

func (m *MemoryTaskStore) Cancel(_ context.Context, lookup TaskLookup) (*TaskRecord, error)

func (*MemoryTaskStore) Create added in v1.6.0

func (m *MemoryTaskStore) Create(_ context.Context, task TaskRecord) (*TaskRecord, error)

func (*MemoryTaskStore) DeleteSession added in v1.6.0

func (m *MemoryTaskStore) DeleteSession(_ context.Context, sessionID string) error

func (*MemoryTaskStore) Get added in v1.6.0

func (m *MemoryTaskStore) Get(_ context.Context, lookup TaskLookup) (*TaskRecord, error)

func (*MemoryTaskStore) List added in v1.6.0

func (*MemoryTaskStore) Update added in v1.6.0

func (m *MemoryTaskStore) Update(_ context.Context, task TaskRecord) (*TaskRecord, error)

type OriginValidator added in v0.11.1

type OriginValidator func(origin string) bool

OriginValidator validates an HTTP Origin header value.

If a request includes an Origin header and the validator returns false, the server should reject the request (fail closed).

func AllowOrigins added in v0.11.1

func AllowOrigins(origins ...string) OriginValidator

type PromptArgument added in v0.11.0

type PromptArgument struct {
	Name        string `json:"name"`
	Title       string `json:"title,omitempty"`
	Description string `json:"description,omitempty"`
	Required    bool   `json:"required,omitempty"`
}

PromptArgument defines an argument a prompt can accept.

type PromptDef added in v0.11.0

type PromptDef struct {
	Name        string           `json:"name"`
	Title       string           `json:"title,omitempty"`
	Description string           `json:"description,omitempty"`
	Arguments   []PromptArgument `json:"arguments,omitempty"`
}

PromptDef defines an MCP prompt's metadata.

type PromptHandler added in v0.11.0

type PromptHandler func(ctx context.Context, args json.RawMessage) (*PromptResult, error)

PromptHandler renders a prompt given optional arguments.

type PromptMessage added in v0.11.0

type PromptMessage struct {
	Role    string       `json:"role"`
	Content ContentBlock `json:"content"`
}

PromptMessage is a single message returned from prompts/get.

type PromptRegistry added in v0.11.0

type PromptRegistry struct {
	// contains filtered or unexported fields
}

PromptRegistry manages registered MCP prompts.

func NewPromptRegistry added in v0.11.0

func NewPromptRegistry() *PromptRegistry

NewPromptRegistry creates an empty prompt registry.

func (*PromptRegistry) Get added in v0.11.0

func (r *PromptRegistry) Get(ctx context.Context, name string, args json.RawMessage) (*PromptResult, error)

Get resolves a prompt by name and renders it.

func (*PromptRegistry) Len added in v0.11.0

func (r *PromptRegistry) Len() int

Len returns the number of registered prompts.

func (*PromptRegistry) List added in v0.11.0

func (r *PromptRegistry) List() []PromptDef

List returns all registered prompt definitions in registration order.

func (*PromptRegistry) RegisterPrompt added in v0.11.0

func (r *PromptRegistry) RegisterPrompt(def PromptDef, handler PromptHandler) error

RegisterPrompt registers a prompt by name.

type PromptResult added in v0.11.0

type PromptResult struct {
	Description string          `json:"description,omitempty"`
	Messages    []PromptMessage `json:"messages"`
}

PromptResult is the result payload returned from prompts/get.

type RPCError

type RPCError struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
	Data    any    `json:"data,omitempty"`
}

RPCError is a JSON-RPC 2.0 error object.

type RelatedTaskMetadata added in v1.6.0

type RelatedTaskMetadata struct {
	TaskID string `json:"taskId"`
}

RelatedTaskMetadata is the MCP _meta entry that associates messages with a task.

type Request

type Request struct {
	JSONRPC string          `json:"jsonrpc"`
	ID      any             `json:"id,omitempty"`
	Method  string          `json:"method"`
	Params  json.RawMessage `json:"params,omitempty"`
}

Request is a JSON-RPC 2.0 request message.

func ParseBatchRequest

func ParseBatchRequest(data []byte) ([]*Request, error)

ParseBatchRequest parses a JSON-RPC batch request (array of requests) from raw bytes. If the input is a single object (not an array), it returns a slice containing that single parsed request.

func ParseRequest

func ParseRequest(data []byte) (*Request, error)

ParseRequest parses a single JSON-RPC 2.0 request from raw bytes. It validates the required fields: jsonrpc must be "2.0" and method must be non-empty.

This function accepts both JSON-RPC requests and notifications: - Requests include an "id" field. - Notifications omit the "id" field.

If an "id" field is present, it MUST NOT be null.

type ResourceContent added in v0.11.0

type ResourceContent struct {
	URI      string `json:"uri"`
	MimeType string `json:"mimeType,omitempty"`
	Text     string `json:"text,omitempty"`
	Blob     string `json:"blob,omitempty"`
}

ResourceContent is a single content item returned from resources/read.

Exactly one of Text or Blob should be set. Blob is expected to be a base64-encoded payload (client-decoded).

type ResourceDef added in v0.11.0

type ResourceDef struct {
	URI         string `json:"uri"`
	Name        string `json:"name"`
	Title       string `json:"title,omitempty"`
	Description string `json:"description,omitempty"`
	MimeType    string `json:"mimeType,omitempty"`
	Size        int64  `json:"size,omitempty"`
}

ResourceDef defines an MCP resource's metadata.

type ResourceHandler added in v0.11.0

type ResourceHandler func(ctx context.Context) ([]ResourceContent, error)

ResourceHandler resolves and returns the content for a resource.

type ResourceRegistry added in v0.11.0

type ResourceRegistry struct {
	// contains filtered or unexported fields
}

ResourceRegistry manages registered MCP resources.

func NewResourceRegistry added in v0.11.0

func NewResourceRegistry() *ResourceRegistry

NewResourceRegistry creates an empty resource registry.

func (*ResourceRegistry) Len added in v0.11.0

func (r *ResourceRegistry) Len() int

Len returns the number of registered resources.

func (*ResourceRegistry) List added in v0.11.0

func (r *ResourceRegistry) List() []ResourceDef

List returns all registered resource definitions in registration order.

func (*ResourceRegistry) Read added in v0.11.0

Read resolves a resource by URI and returns its content.

func (*ResourceRegistry) RegisterResource added in v0.11.0

func (r *ResourceRegistry) RegisterResource(def ResourceDef, handler ResourceHandler) error

RegisterResource registers a resource by URI.

type ResourceSubscription added in v1.6.0

type ResourceSubscription struct {
	SessionID string `json:"sessionId"`
	URI       string `json:"uri"`
}

ResourceSubscription identifies a resource subscription request for a session.

type ResourceSubscriptionHook added in v1.6.0

type ResourceSubscriptionHook func(ctx context.Context, sub ResourceSubscription) error

ResourceSubscriptionHook handles a resources/subscribe or resources/unsubscribe request.

type Response

type Response struct {
	JSONRPC string    `json:"jsonrpc"`
	ID      any       `json:"id"`
	Result  any       `json:"result,omitempty"`
	Error   *RPCError `json:"error,omitempty"`
}

Response is a JSON-RPC 2.0 response message.

func NewErrorResponse

func NewErrorResponse(id any, code int, message string) *Response

NewErrorResponse creates a JSON-RPC error response with the given request ID and error details.

func NewResultResponse

func NewResultResponse(id any, result any) *Response

NewResultResponse creates a JSON-RPC success response with the given request ID and result value.

func ParseResponse added in v0.11.1

func ParseResponse(data []byte) (*Response, error)

ParseResponse parses a single JSON-RPC 2.0 response from raw bytes.

It validates: - jsonrpc must be "2.0" - id must be present (it may be null for certain error cases) - exactly one of result or error is present

type SSEEvent

type SSEEvent struct {
	Data any
}

SSEEvent is a progress event emitted by a streaming tool handler.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the MCP protocol handler. It dispatches JSON-RPC 2.0 messages to the appropriate MCP method handlers (initialize, tools/list, tools/call).

func NewServer

func NewServer(name, version string, opts ...ServerOption) *Server

NewServer creates an MCP server with the given name, version, and options.

func (*Server) Handler

func (s *Server) Handler() apptheory.Handler

Handler returns an apptheory.Handler that processes MCP JSON-RPC requests.

Streamable HTTP semantics: - POST /mcp accepts JSON-RPC requests, notifications, and responses. - GET /mcp replays/resumes previously interrupted SSE streams via Last-Event-ID. - DELETE /mcp terminates a session.

Notes:

  • Clients must initialize first; the server issues Mcp-Session-Id on the initialize response and requires it thereafter.
  • Disconnects are not treated as cancellation. Streaming tool execution is decoupled from the connection and can be resumed with GET.

func (*Server) Prompts added in v0.11.0

func (s *Server) Prompts() *PromptRegistry

Prompts returns the server's prompt registry for registering prompts.

func (*Server) Registry

func (s *Server) Registry() *ToolRegistry

Registry returns the server's tool registry for registering tools.

func (*Server) Resources added in v0.11.0

func (s *Server) Resources() *ResourceRegistry

Resources returns the server's resource registry for registering resources.

type ServerOption

type ServerOption func(*Server)

ServerOption configures a Server.

func WithCapabilityConfig added in v1.6.0

func WithCapabilityConfig(config CapabilityConfig) ServerOption

WithCapabilityConfig sets the server capability policy used for initialize responses.

func WithCompletionHooks added in v1.6.0

func WithCompletionHooks(promptHook, resourceHook CompletionHook) ServerOption

WithCompletionHooks enables completion/complete support through explicit prompt and resource hooks. At least one hook must be non-nil before the completions capability is advertised.

func WithInitialSessionListenerBudget added in v0.25.0

func WithInitialSessionListenerBudget(opts InitialSessionListenerBudgetOptions) ServerOption

WithInitialSessionListenerBudget caps the initial GET /mcp keepalive listener against the Lambda remaining-time budget when RemainingMS is available.

This option is explicit opt-in and applies only to GET /mcp requests without Last-Event-ID. Resume/replay requests continue to use the existing stream replay path unchanged.

func WithLogger

func WithLogger(logger *slog.Logger) ServerOption

WithLogger sets the structured logger for the server.

func WithLoggingLevelHook added in v1.6.0

func WithLoggingLevelHook(hook LoggingLevelHook) ServerOption

WithLoggingLevelHook enables logging/setLevel support through an explicit hook.

func WithOriginValidator added in v0.11.1

func WithOriginValidator(v OriginValidator) ServerOption

WithOriginValidator sets the Origin validator for browser-based callers.

If an Origin header is present, the request is rejected unless it passes validation (fail closed).

func WithResourceSubscriptionHooks added in v1.6.0

func WithResourceSubscriptionHooks(subscribe, unsubscribe ResourceSubscriptionHook) ServerOption

WithResourceSubscriptionHooks enables resources/subscribe and resources/unsubscribe support through explicit hooks.

The methods fail closed if either hook is absent. The resource subscribe sub-capability remains omitted until AppTheory has an outbound resource update notification contract.

func WithServerIDGenerator

func WithServerIDGenerator(gen apptheory.IDGenerator) ServerOption

WithIDGenerator sets the ID generator for session IDs.

func WithSessionStore

func WithSessionStore(store SessionStore) ServerOption

WithSessionStore sets the session store for the server.

func WithStreamStore added in v0.11.1

func WithStreamStore(store StreamStore) ServerOption

WithStreamStore sets the stream store for the server.

func WithTaskRuntime added in v1.6.0

func WithTaskRuntime(opts TaskRuntimeOptions) ServerOption

WithTaskRuntime enables MCP task operations through an explicit TaskStore.

Tasks are experimental in MCP 2025-11-25 and remain opt-in. AppTheory only advertises task capabilities when this option supplies a store and at least one registered tool declares task support.

type Session

type Session struct {
	ID        string            `json:"id"`
	CreatedAt time.Time         `json:"createdAt"`
	ExpiresAt time.Time         `json:"expiresAt"`
	Data      map[string]string `json:"data,omitempty"`
}

Session holds per-session state for an MCP connection.

type SessionStore

type SessionStore interface {
	Get(ctx context.Context, id string) (*Session, error)
	Put(ctx context.Context, session *Session) error
	Delete(ctx context.Context, id string) error
}

SessionStore is the interface for session persistence backends.

func NewDynamoSessionStore

func NewDynamoSessionStore(db tablecore.DB) SessionStore

NewDynamoSessionStore creates a DynamoDB-backed session store.

type StreamEvent added in v0.11.1

type StreamEvent struct {
	ID   string
	Data json.RawMessage
}

StreamEvent is a single server->client JSON-RPC message framed as an SSE event.

ID is the SSE event id (used for resumability via Last-Event-ID). Data is the raw JSON payload (a single JSON-RPC message).

type StreamStore added in v0.11.1

type StreamStore interface {
	Create(ctx context.Context, sessionID string) (streamID string, err error)
	Append(ctx context.Context, sessionID, streamID string, data json.RawMessage) (eventID string, err error)
	Close(ctx context.Context, sessionID, streamID string) error

	// Subscribe streams events after afterEventID. If afterEventID is empty,
	// it streams from the beginning. If afterEventID is present, it must belong
	// to the requested stream.
	Subscribe(ctx context.Context, sessionID, streamID, afterEventID string) (<-chan StreamEvent, error)

	// StreamForEvent returns the stream id that the given event id belongs to.
	StreamForEvent(ctx context.Context, sessionID, eventID string) (streamID string, err error)

	// DeleteSession removes all stream state for a session.
	DeleteSession(ctx context.Context, sessionID string) error
}

StreamStore persists stream events so a disconnected client can replay them via GET + Last-Event-ID.

func NewDynamoStreamStore added in v0.20.0

func NewDynamoStreamStore(db tablecore.DB) StreamStore

NewDynamoStreamStore creates a DynamoDB-backed StreamStore.

type StreamingToolHandler

type StreamingToolHandler func(ctx context.Context, args json.RawMessage, emit func(SSEEvent)) (*ToolResult, error)

StreamingToolHandler is a tool handler that can emit progress events via SSE.

type Task added in v1.6.0

type Task struct {
	TaskID        string     `json:"taskId"`
	Status        TaskStatus `json:"status"`
	StatusMessage string     `json:"statusMessage,omitempty"`
	CreatedAt     time.Time  `json:"createdAt"`
	LastUpdatedAt time.Time  `json:"lastUpdatedAt"`
	TTL           *int64     `json:"ttl"`
	PollInterval  *int64     `json:"pollInterval,omitempty"`
}

Task is the MCP task state returned by task operations.

type TaskListRequest added in v1.6.0

type TaskListRequest struct {
	SessionID string `json:"sessionId"`
	Cursor    string `json:"cursor,omitempty"`
	Limit     int    `json:"limit,omitempty"`
}

TaskListRequest scopes a task listing to the active MCP session.

type TaskListResult added in v1.6.0

type TaskListResult struct {
	Tasks      []Task `json:"tasks"`
	NextCursor string `json:"nextCursor,omitempty"`
}

TaskListResult is the MCP tasks/list result.

type TaskLookup added in v1.6.0

type TaskLookup struct {
	SessionID string `json:"sessionId"`
	TaskID    string `json:"taskId"`
}

TaskLookup scopes a task lookup to the active MCP session.

type TaskMetadata added in v1.6.0

type TaskMetadata struct {
	TTL *int64 `json:"ttl,omitempty"`
}

TaskMetadata is the MCP request parameter used to request task-augmented execution.

type TaskRecord added in v1.6.0

type TaskRecord struct {
	SessionID string          `json:"sessionId"`
	Method    string          `json:"method"`
	ToolName  string          `json:"toolName,omitempty"`
	Task      Task            `json:"task"`
	Result    json.RawMessage `json:"result,omitempty"`
	Error     *RPCError       `json:"error,omitempty"`
}

TaskRecord is the durable representation stored by TaskStore implementations.

type TaskRuntimeOptions added in v1.6.0

type TaskRuntimeOptions struct {
	Store                  TaskStore
	DefaultTTL             time.Duration
	MaxTTL                 time.Duration
	PollInterval           time.Duration
	ListLimit              int
	ModelImmediateResponse string
}

TaskRuntimeOptions configures task-augmented MCP execution.

type TaskStatus added in v1.6.0

type TaskStatus string

TaskStatus is an MCP task lifecycle state.

const (
	TaskStatusWorking       TaskStatus = "working"
	TaskStatusInputRequired TaskStatus = "input_required"
	TaskStatusCompleted     TaskStatus = "completed"
	TaskStatusFailed        TaskStatus = "failed"
	TaskStatusCanceled      TaskStatus = "cancel" + "led"
)

type TaskStore added in v1.6.0

type TaskStore interface {
	Create(ctx context.Context, task TaskRecord) (*TaskRecord, error)
	Get(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)
	Update(ctx context.Context, task TaskRecord) (*TaskRecord, error)
	List(ctx context.Context, req TaskListRequest) (*TaskListResult, error)
	Cancel(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)
	DeleteSession(ctx context.Context, sessionID string) error
}

TaskStore persists MCP task state. Implementations must bind every operation to the supplied SessionID and fail closed if a task belongs to a different authorization context.

func NewDynamoTaskStore added in v1.6.0

func NewDynamoTaskStore(db tablecore.DB) TaskStore

NewDynamoTaskStore creates a DynamoDB-backed task store.

type TaskSupport added in v1.6.0

type TaskSupport string

TaskSupport declares whether a tool can be invoked through MCP task augmentation.

const (
	TaskSupportForbidden TaskSupport = "forbidden"
	TaskSupportOptional  TaskSupport = "optional"
	TaskSupportRequired  TaskSupport = "required"
)

type ToolAnnotations added in v0.14.1

type ToolAnnotations struct {
	Title           string `json:"title,omitempty"`
	ReadOnlyHint    *bool  `json:"readOnlyHint,omitempty"`
	DestructiveHint *bool  `json:"destructiveHint,omitempty"`
	IdempotentHint  *bool  `json:"idempotentHint,omitempty"`
	OpenWorldHint   *bool  `json:"openWorldHint,omitempty"`
}

type ToolDef

type ToolDef struct {
	Name        string           `json:"name"`
	Title       string           `json:"title,omitempty"`
	Description string           `json:"description,omitempty"`
	Annotations *ToolAnnotations `json:"annotations,omitempty"`

	Icons        []Icon          `json:"icons,omitempty"`
	Execution    *ToolExecution  `json:"execution,omitempty"`
	InputSchema  json.RawMessage `json:"inputSchema"`
	OutputSchema json.RawMessage `json:"outputSchema,omitempty"`
}

ToolDef defines an MCP tool's metadata and input schema.

type ToolExecution added in v0.14.1

type ToolExecution struct {
	// TaskSupport indicates if the tool supports task-augmented execution.
	// Values: "forbidden", "optional", "required".
	TaskSupport TaskSupport `json:"taskSupport,omitempty"`
}

type ToolHandler

type ToolHandler func(ctx context.Context, args json.RawMessage) (*ToolResult, error)

ToolHandler is the function signature for tool implementations.

type ToolRegistry

type ToolRegistry struct {
	// contains filtered or unexported fields
}

ToolRegistry manages registered MCP tools.

func NewToolRegistry

func NewToolRegistry() *ToolRegistry

NewToolRegistry creates an empty tool registry.

func (*ToolRegistry) Call

func (r *ToolRegistry) Call(ctx context.Context, name string, args json.RawMessage) (*ToolResult, error)

Call looks up a tool by name and invokes its handler with the given arguments. It returns an error if the tool is not found.

func (*ToolRegistry) CallStreaming

func (r *ToolRegistry) CallStreaming(ctx context.Context, name string, args json.RawMessage, emit func(SSEEvent)) (*ToolResult, error)

CallStreaming looks up a tool by name and invokes its streaming handler if available, otherwise falls back to the regular handler (discarding emit).

func (*ToolRegistry) Len added in v1.6.0

func (r *ToolRegistry) Len() int

Len returns the number of registered tools.

func (*ToolRegistry) List

func (r *ToolRegistry) List() []ToolDef

List returns all registered tool definitions in registration order.

func (*ToolRegistry) RegisterStreamingTool

func (r *ToolRegistry) RegisterStreamingTool(def ToolDef, handler StreamingToolHandler) error

RegisterStreamingTool registers a tool that supports SSE streaming. When invoked by a strict Streamable HTTP POST with an Accept header that supports both application/json and text/event-stream, progress events are streamed. Buffered calls to the same tool still run to completion and return a JSON response when the server does not select the SSE path.

func (*ToolRegistry) RegisterTool

func (r *ToolRegistry) RegisterTool(def ToolDef, handler ToolHandler) error

RegisterTool adds a tool to the registry. It returns an error if a tool with the same name is already registered.

type ToolResult

type ToolResult struct {
	Content           []ContentBlock `json:"content"`
	IsError           bool           `json:"isError,omitempty"`
	StructuredContent map[string]any `json:"structuredContent,omitempty"`
}

ToolResult is the result of a tool invocation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL