Documentation
¶
Index ¶
- Constants
- Variables
- func MarshalResponse(resp *Response) ([]byte, error)
- type CapabilityConfig
- type Completion
- type CompletionArgument
- type CompletionContext
- type CompletionHook
- type CompletionRef
- type CompletionRequest
- type CompletionResult
- type ContentBlock
- type CreateTaskResult
- type DynamoSessionStore
- type DynamoStreamStore
- func (d *DynamoStreamStore) Append(ctx context.Context, sessionID, streamID string, data json.RawMessage) (string, error)
- func (d *DynamoStreamStore) Close(ctx context.Context, sessionID, streamID string) error
- func (d *DynamoStreamStore) Create(ctx context.Context, sessionID string) (string, error)
- func (d *DynamoStreamStore) DeleteSession(ctx context.Context, sessionID string) error
- func (d *DynamoStreamStore) StreamForEvent(ctx context.Context, sessionID, eventID string) (string, error)
- func (d *DynamoStreamStore) Subscribe(ctx context.Context, sessionID, streamID, afterEventID string) (<-chan StreamEvent, error)
- type DynamoTaskStore
- func (d *DynamoTaskStore) Cancel(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)
- func (d *DynamoTaskStore) Create(ctx context.Context, task TaskRecord) (*TaskRecord, error)
- func (d *DynamoTaskStore) DeleteSession(ctx context.Context, sessionID string) error
- func (d *DynamoTaskStore) Get(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)
- func (d *DynamoTaskStore) List(ctx context.Context, req TaskListRequest) (*TaskListResult, error)
- func (d *DynamoTaskStore) Update(ctx context.Context, task TaskRecord) (*TaskRecord, error)
- type Icon
- type InitialSessionListenerBudgetOptions
- type LoggingLevel
- type LoggingLevelHook
- type LoggingLevelRequest
- type MemorySessionStore
- type MemorySessionStoreOption
- type MemoryStreamStore
- func (m *MemoryStreamStore) Append(_ context.Context, sessionID, streamID string, data json.RawMessage) (string, error)
- func (m *MemoryStreamStore) Close(_ context.Context, sessionID, streamID string) error
- func (m *MemoryStreamStore) Create(_ context.Context, sessionID string) (string, error)
- func (m *MemoryStreamStore) DeleteSession(_ context.Context, sessionID string) error
- func (m *MemoryStreamStore) StreamForEvent(_ context.Context, sessionID, eventID string) (string, error)
- func (m *MemoryStreamStore) Subscribe(ctx context.Context, sessionID, streamID, afterEventID string) (<-chan StreamEvent, error)
- type MemoryStreamStoreOption
- type MemoryTaskStore
- func (m *MemoryTaskStore) Cancel(_ context.Context, lookup TaskLookup) (*TaskRecord, error)
- func (m *MemoryTaskStore) Create(_ context.Context, task TaskRecord) (*TaskRecord, error)
- func (m *MemoryTaskStore) DeleteSession(_ context.Context, sessionID string) error
- func (m *MemoryTaskStore) Get(_ context.Context, lookup TaskLookup) (*TaskRecord, error)
- func (m *MemoryTaskStore) List(_ context.Context, req TaskListRequest) (*TaskListResult, error)
- func (m *MemoryTaskStore) Update(_ context.Context, task TaskRecord) (*TaskRecord, error)
- type OriginValidator
- type PromptArgument
- type PromptDef
- type PromptHandler
- type PromptMessage
- type PromptRegistry
- type PromptResult
- type RPCError
- type RelatedTaskMetadata
- type Request
- type ResourceContent
- type ResourceDef
- type ResourceHandler
- type ResourceRegistry
- type ResourceSubscription
- type ResourceSubscriptionHook
- type Response
- type SSEEvent
- type Server
- type ServerOption
- func WithCapabilityConfig(config CapabilityConfig) ServerOption
- func WithCompletionHooks(promptHook, resourceHook CompletionHook) ServerOption
- func WithInitialSessionListenerBudget(opts InitialSessionListenerBudgetOptions) ServerOption
- func WithLogger(logger *slog.Logger) ServerOption
- func WithLoggingLevelHook(hook LoggingLevelHook) ServerOption
- func WithOriginValidator(v OriginValidator) ServerOption
- func WithResourceSubscriptionHooks(subscribe, unsubscribe ResourceSubscriptionHook) ServerOption
- func WithServerIDGenerator(gen apptheory.IDGenerator) ServerOption
- func WithSessionStore(store SessionStore) ServerOption
- func WithStreamStore(store StreamStore) ServerOption
- func WithTaskRuntime(opts TaskRuntimeOptions) ServerOption
- type Session
- type SessionStore
- type StreamEvent
- type StreamStore
- type StreamingToolHandler
- type Task
- type TaskListRequest
- type TaskListResult
- type TaskLookup
- type TaskMetadata
- type TaskRecord
- type TaskRuntimeOptions
- type TaskStatus
- type TaskStore
- type TaskSupport
- type ToolAnnotations
- type ToolDef
- type ToolExecution
- type ToolHandler
- type ToolRegistry
- func (r *ToolRegistry) Call(ctx context.Context, name string, args json.RawMessage) (*ToolResult, error)
- func (r *ToolRegistry) CallStreaming(ctx context.Context, name string, args json.RawMessage, emit func(SSEEvent)) (*ToolResult, error)
- func (r *ToolRegistry) Len() int
- func (r *ToolRegistry) List() []ToolDef
- func (r *ToolRegistry) RegisterStreamingTool(def ToolDef, handler StreamingToolHandler) error
- func (r *ToolRegistry) RegisterTool(def ToolDef, handler ToolHandler) error
- type ToolResult
Constants ¶
const ( CodeParseError = -32700 CodeInvalidRequest = -32600 CodeMethodNotFound = -32601 CodeInvalidParams = -32602 CodeInternalError = -32603 CodeServerError = -32000 )
Standard JSON-RPC 2.0 error codes.
Variables ¶
var ( ErrStreamNotFound = errors.New("stream not found") ErrEventNotFound = errors.New("event not found") ErrStreamEventTooLarge = errors.New("stream event too large") )
var ErrSessionNotFound = errors.New("session not found")
ErrSessionNotFound is returned when a session ID does not exist in the store.
var ErrTaskNotFound = errors.New("task not found")
ErrTaskNotFound is returned when a task does not exist in the caller's session scope.
var ErrTaskTerminal = errors.New("task already terminal")
ErrTaskTerminal is returned when a mutation targets a task that is already in a terminal state.
Functions ¶
func MarshalResponse ¶
MarshalResponse serializes a JSON-RPC 2.0 response to bytes. It ensures the jsonrpc field is always set to "2.0".
Types ¶
type CapabilityConfig ¶ added in v1.6.0
CapabilityConfig controls which implemented MCP server surfaces may be advertised during initialize.
The config is intentionally limited to surfaces that AppTheory currently implements. Unsupported MCP sub-capabilities such as listChanged and tasks are omitted until their concrete hooks exist. Resource subscription and logging are also omitted until AppTheory has a first-class outbound notification contract for notifications/resources/updated and notifications/message. Completion and task support are advertised only when their explicit hooks or stores are configured. That keeps capability negotiation fail-closed instead of allowing callers to overclaim unsupported behavior.
func DefaultCapabilityConfig ¶ added in v1.6.0
func DefaultCapabilityConfig() CapabilityConfig
DefaultCapabilityConfig returns the default MCP capability policy.
A surface is still advertised only when it is actually present on the server: tools require at least one registered tool, resources require at least one registered resource, and prompts require at least one registered prompt.
type Completion ¶ added in v1.6.0
type Completion struct {
Values []string `json:"values"`
Total *int `json:"total,omitempty"`
HasMore *bool `json:"hasMore,omitempty"`
}
Completion contains completion values returned to the client.
type CompletionArgument ¶ added in v1.6.0
CompletionArgument identifies the argument currently being completed.
type CompletionContext ¶ added in v1.6.0
CompletionContext carries optional previously-resolved arguments.
type CompletionHook ¶ added in v1.6.0
type CompletionHook func(ctx context.Context, req CompletionRequest) (*CompletionResult, error)
CompletionHook handles a prompt or resource completion request.
type CompletionRef ¶ added in v1.6.0
type CompletionRef struct {
Type string `json:"type"`
Name string `json:"name,omitempty"`
URI string `json:"uri,omitempty"`
}
CompletionRef identifies the prompt or resource template being completed.
type CompletionRequest ¶ added in v1.6.0
type CompletionRequest struct {
SessionID string `json:"sessionId"`
Ref CompletionRef `json:"ref"`
Argument CompletionArgument `json:"argument"`
Context CompletionContext `json:"context,omitempty"`
}
CompletionRequest is passed to completion hooks.
type CompletionResult ¶ added in v1.6.0
type CompletionResult struct {
Completion Completion `json:"completion"`
}
CompletionResult is the MCP completion/complete result.
type ContentBlock ¶
type ContentBlock struct {
Type string `json:"type"` // "text", "image", "audio", "resource_link", "resource"
// Text content (type = "text").
Text string `json:"text,omitempty"`
// Image/audio content (type = "image" or "audio").
Data string `json:"data,omitempty"` // base64-encoded
MimeType string `json:"mimeType,omitempty"` // e.g. "image/png"
// Resource link content (type = "resource_link").
URI string `json:"uri,omitempty"`
Name string `json:"name,omitempty"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Size int64 `json:"size,omitempty"`
// Embedded resource content (type = "resource").
Resource *ResourceContent `json:"resource,omitempty"`
}
type CreateTaskResult ¶ added in v1.6.0
type CreateTaskResult struct {
Meta map[string]any `json:"_meta,omitempty"`
Task Task `json:"task"`
}
CreateTaskResult is returned when AppTheory accepts a task-augmented request.
type DynamoSessionStore ¶
type DynamoSessionStore struct {
// contains filtered or unexported fields
}
DynamoSessionStore implements SessionStore using DynamoDB via TableTheory.
func (*DynamoSessionStore) Delete ¶
func (d *DynamoSessionStore) Delete(ctx context.Context, id string) error
Delete removes a session by ID.
type DynamoStreamStore ¶ added in v0.20.0
type DynamoStreamStore struct {
// contains filtered or unexported fields
}
DynamoStreamStore implements StreamStore using DynamoDB via TableTheory.
Subscribe uses strongly consistent reads plus short polling so separate server instances can replay and continue an active stream from shared state. Replay is bounded by each event record's ExpiresAt value at runtime; DynamoDB TTL and S3 lifecycle cleanup are backstops, not access checks. The strongest DeleteSession/Append race protection is enabled when the provided TableTheory DB implements TransactWrite, which is the production TableTheory path.
func (*DynamoStreamStore) Append ¶ added in v0.20.0
func (d *DynamoStreamStore) Append(ctx context.Context, sessionID, streamID string, data json.RawMessage) (string, error)
func (*DynamoStreamStore) Close ¶ added in v0.20.0
func (d *DynamoStreamStore) Close(ctx context.Context, sessionID, streamID string) error
func (*DynamoStreamStore) DeleteSession ¶ added in v0.20.0
func (d *DynamoStreamStore) DeleteSession(ctx context.Context, sessionID string) error
func (*DynamoStreamStore) StreamForEvent ¶ added in v0.20.0
func (*DynamoStreamStore) Subscribe ¶ added in v0.20.0
func (d *DynamoStreamStore) Subscribe(ctx context.Context, sessionID, streamID, afterEventID string) (<-chan StreamEvent, error)
type DynamoTaskStore ¶ added in v1.6.0
type DynamoTaskStore struct {
// contains filtered or unexported fields
}
DynamoTaskStore implements TaskStore using DynamoDB via TableTheory.
func (*DynamoTaskStore) Cancel ¶ added in v1.6.0
func (d *DynamoTaskStore) Cancel(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)
func (*DynamoTaskStore) Create ¶ added in v1.6.0
func (d *DynamoTaskStore) Create(ctx context.Context, task TaskRecord) (*TaskRecord, error)
func (*DynamoTaskStore) DeleteSession ¶ added in v1.6.0
func (d *DynamoTaskStore) DeleteSession(ctx context.Context, sessionID string) error
func (*DynamoTaskStore) Get ¶ added in v1.6.0
func (d *DynamoTaskStore) Get(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)
func (*DynamoTaskStore) List ¶ added in v1.6.0
func (d *DynamoTaskStore) List(ctx context.Context, req TaskListRequest) (*TaskListResult, error)
func (*DynamoTaskStore) Update ¶ added in v1.6.0
func (d *DynamoTaskStore) Update(ctx context.Context, task TaskRecord) (*TaskRecord, error)
type InitialSessionListenerBudgetOptions ¶ added in v0.25.0
type InitialSessionListenerBudgetOptions struct {
SafetyBuffer time.Duration
MaxDuration time.Duration
}
InitialSessionListenerBudgetOptions configures how the initial GET /mcp session listener is capped against the Lambda remaining-time budget.
The option is explicit opt-in and applies only when GET /mcp is used without Last-Event-ID (the keepalive listener path). Resume/replay GET requests keep their existing behavior.
Zero values use the framework defaults:
- SafetyBuffer: 5s
- MaxDuration: 25s
type LoggingLevel ¶ added in v1.6.0
type LoggingLevel string
LoggingLevel is an MCP logging level.
const ( LoggingLevelDebug LoggingLevel = "debug" LoggingLevelInfo LoggingLevel = "info" LoggingLevelNotice LoggingLevel = "notice" LoggingLevelWarning LoggingLevel = "warning" LoggingLevelError LoggingLevel = "error" LoggingLevelCritical LoggingLevel = "critical" LoggingLevelAlert LoggingLevel = "alert" LoggingLevelEmergency LoggingLevel = "emergency" )
type LoggingLevelHook ¶ added in v1.6.0
type LoggingLevelHook func(ctx context.Context, req LoggingLevelRequest) error
LoggingLevelHook handles a logging/setLevel request.
type LoggingLevelRequest ¶ added in v1.6.0
type LoggingLevelRequest struct {
SessionID string `json:"sessionId"`
Level LoggingLevel `json:"level"`
}
LoggingLevelRequest identifies a logging/setLevel request for a session.
type MemorySessionStore ¶
type MemorySessionStore struct {
// contains filtered or unexported fields
}
MemorySessionStore is an in-memory SessionStore for testing and local development.
func NewMemorySessionStore ¶
func NewMemorySessionStore(opts ...MemorySessionStoreOption) *MemorySessionStore
NewMemorySessionStore creates an in-memory session store.
func (*MemorySessionStore) Delete ¶
func (m *MemorySessionStore) Delete(_ context.Context, id string) error
Delete removes a session by ID. It is a no-op if the session does not exist.
type MemorySessionStoreOption ¶
type MemorySessionStoreOption func(*MemorySessionStore)
MemorySessionStoreOption configures a MemorySessionStore.
func WithClock ¶
func WithClock(c apptheory.Clock) MemorySessionStoreOption
WithClock sets the clock used for TTL expiration checks.
type MemoryStreamStore ¶ added in v0.11.1
type MemoryStreamStore struct {
// contains filtered or unexported fields
}
MemoryStreamStore is an in-memory StreamStore for testing and local development.
It assigns monotonically increasing numeric event IDs per session (as strings).
func NewMemoryStreamStore ¶ added in v0.11.1
func NewMemoryStreamStore(opts ...MemoryStreamStoreOption) *MemoryStreamStore
func (*MemoryStreamStore) Append ¶ added in v0.11.1
func (m *MemoryStreamStore) Append(_ context.Context, sessionID, streamID string, data json.RawMessage) (string, error)
func (*MemoryStreamStore) Close ¶ added in v0.11.1
func (m *MemoryStreamStore) Close(_ context.Context, sessionID, streamID string) error
func (*MemoryStreamStore) DeleteSession ¶ added in v0.11.1
func (m *MemoryStreamStore) DeleteSession(_ context.Context, sessionID string) error
func (*MemoryStreamStore) StreamForEvent ¶ added in v0.11.1
func (*MemoryStreamStore) Subscribe ¶ added in v0.11.1
func (m *MemoryStreamStore) Subscribe(ctx context.Context, sessionID, streamID, afterEventID string) (<-chan StreamEvent, error)
type MemoryStreamStoreOption ¶ added in v0.11.1
type MemoryStreamStoreOption func(*MemoryStreamStore)
func WithStreamIDGenerator ¶ added in v0.11.1
func WithStreamIDGenerator(gen apptheory.IDGenerator) MemoryStreamStoreOption
type MemoryTaskStore ¶ added in v1.6.0
type MemoryTaskStore struct {
// contains filtered or unexported fields
}
MemoryTaskStore is an in-memory TaskStore for testing and local development.
func NewMemoryTaskStore ¶ added in v1.6.0
func NewMemoryTaskStore() *MemoryTaskStore
NewMemoryTaskStore creates an empty in-memory task store.
func (*MemoryTaskStore) Cancel ¶ added in v1.6.0
func (m *MemoryTaskStore) Cancel(_ context.Context, lookup TaskLookup) (*TaskRecord, error)
func (*MemoryTaskStore) Create ¶ added in v1.6.0
func (m *MemoryTaskStore) Create(_ context.Context, task TaskRecord) (*TaskRecord, error)
func (*MemoryTaskStore) DeleteSession ¶ added in v1.6.0
func (m *MemoryTaskStore) DeleteSession(_ context.Context, sessionID string) error
func (*MemoryTaskStore) Get ¶ added in v1.6.0
func (m *MemoryTaskStore) Get(_ context.Context, lookup TaskLookup) (*TaskRecord, error)
func (*MemoryTaskStore) List ¶ added in v1.6.0
func (m *MemoryTaskStore) List(_ context.Context, req TaskListRequest) (*TaskListResult, error)
func (*MemoryTaskStore) Update ¶ added in v1.6.0
func (m *MemoryTaskStore) Update(_ context.Context, task TaskRecord) (*TaskRecord, error)
type OriginValidator ¶ added in v0.11.1
OriginValidator validates an HTTP Origin header value.
If a request includes an Origin header and the validator returns false, the server should reject the request (fail closed).
func AllowOrigins ¶ added in v0.11.1
func AllowOrigins(origins ...string) OriginValidator
type PromptArgument ¶ added in v0.11.0
type PromptArgument struct {
Name string `json:"name"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Required bool `json:"required,omitempty"`
}
PromptArgument defines an argument a prompt can accept.
type PromptDef ¶ added in v0.11.0
type PromptDef struct {
Name string `json:"name"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Arguments []PromptArgument `json:"arguments,omitempty"`
}
PromptDef defines an MCP prompt's metadata.
type PromptHandler ¶ added in v0.11.0
type PromptHandler func(ctx context.Context, args json.RawMessage) (*PromptResult, error)
PromptHandler renders a prompt given optional arguments.
type PromptMessage ¶ added in v0.11.0
type PromptMessage struct {
Role string `json:"role"`
Content ContentBlock `json:"content"`
}
PromptMessage is a single message returned from prompts/get.
type PromptRegistry ¶ added in v0.11.0
type PromptRegistry struct {
// contains filtered or unexported fields
}
PromptRegistry manages registered MCP prompts.
func NewPromptRegistry ¶ added in v0.11.0
func NewPromptRegistry() *PromptRegistry
NewPromptRegistry creates an empty prompt registry.
func (*PromptRegistry) Get ¶ added in v0.11.0
func (r *PromptRegistry) Get(ctx context.Context, name string, args json.RawMessage) (*PromptResult, error)
Get resolves a prompt by name and renders it.
func (*PromptRegistry) Len ¶ added in v0.11.0
func (r *PromptRegistry) Len() int
Len returns the number of registered prompts.
func (*PromptRegistry) List ¶ added in v0.11.0
func (r *PromptRegistry) List() []PromptDef
List returns all registered prompt definitions in registration order.
func (*PromptRegistry) RegisterPrompt ¶ added in v0.11.0
func (r *PromptRegistry) RegisterPrompt(def PromptDef, handler PromptHandler) error
RegisterPrompt registers a prompt by name.
type PromptResult ¶ added in v0.11.0
type PromptResult struct {
Description string `json:"description,omitempty"`
Messages []PromptMessage `json:"messages"`
}
PromptResult is the result payload returned from prompts/get.
type RPCError ¶
type RPCError struct {
Code int `json:"code"`
Message string `json:"message"`
Data any `json:"data,omitempty"`
}
RPCError is a JSON-RPC 2.0 error object.
type RelatedTaskMetadata ¶ added in v1.6.0
type RelatedTaskMetadata struct {
TaskID string `json:"taskId"`
}
RelatedTaskMetadata is the MCP _meta entry that associates messages with a task.
type Request ¶
type Request struct {
JSONRPC string `json:"jsonrpc"`
ID any `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}
Request is a JSON-RPC 2.0 request message.
func ParseBatchRequest ¶
ParseBatchRequest parses a JSON-RPC batch request (array of requests) from raw bytes. If the input is a single object (not an array), it returns a slice containing that single parsed request.
func ParseRequest ¶
ParseRequest parses a single JSON-RPC 2.0 request from raw bytes. It validates the required fields: jsonrpc must be "2.0" and method must be non-empty.
This function accepts both JSON-RPC requests and notifications: - Requests include an "id" field. - Notifications omit the "id" field.
If an "id" field is present, it MUST NOT be null.
type ResourceContent ¶ added in v0.11.0
type ResourceContent struct {
URI string `json:"uri"`
MimeType string `json:"mimeType,omitempty"`
Text string `json:"text,omitempty"`
Blob string `json:"blob,omitempty"`
}
ResourceContent is a single content item returned from resources/read.
Exactly one of Text or Blob should be set. Blob is expected to be a base64-encoded payload (client-decoded).
type ResourceDef ¶ added in v0.11.0
type ResourceDef struct {
URI string `json:"uri"`
Name string `json:"name"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
MimeType string `json:"mimeType,omitempty"`
Size int64 `json:"size,omitempty"`
}
ResourceDef defines an MCP resource's metadata.
type ResourceHandler ¶ added in v0.11.0
type ResourceHandler func(ctx context.Context) ([]ResourceContent, error)
ResourceHandler resolves and returns the content for a resource.
type ResourceRegistry ¶ added in v0.11.0
type ResourceRegistry struct {
// contains filtered or unexported fields
}
ResourceRegistry manages registered MCP resources.
func NewResourceRegistry ¶ added in v0.11.0
func NewResourceRegistry() *ResourceRegistry
NewResourceRegistry creates an empty resource registry.
func (*ResourceRegistry) Len ¶ added in v0.11.0
func (r *ResourceRegistry) Len() int
Len returns the number of registered resources.
func (*ResourceRegistry) List ¶ added in v0.11.0
func (r *ResourceRegistry) List() []ResourceDef
List returns all registered resource definitions in registration order.
func (*ResourceRegistry) Read ¶ added in v0.11.0
func (r *ResourceRegistry) Read(ctx context.Context, uri string) ([]ResourceContent, error)
Read resolves a resource by URI and returns its content.
func (*ResourceRegistry) RegisterResource ¶ added in v0.11.0
func (r *ResourceRegistry) RegisterResource(def ResourceDef, handler ResourceHandler) error
RegisterResource registers a resource by URI.
type ResourceSubscription ¶ added in v1.6.0
ResourceSubscription identifies a resource subscription request for a session.
type ResourceSubscriptionHook ¶ added in v1.6.0
type ResourceSubscriptionHook func(ctx context.Context, sub ResourceSubscription) error
ResourceSubscriptionHook handles a resources/subscribe or resources/unsubscribe request.
type Response ¶
type Response struct {
JSONRPC string `json:"jsonrpc"`
ID any `json:"id"`
Result any `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
}
Response is a JSON-RPC 2.0 response message.
func NewErrorResponse ¶
NewErrorResponse creates a JSON-RPC error response with the given request ID and error details.
func NewResultResponse ¶
NewResultResponse creates a JSON-RPC success response with the given request ID and result value.
func ParseResponse ¶ added in v0.11.1
ParseResponse parses a single JSON-RPC 2.0 response from raw bytes.
It validates: - jsonrpc must be "2.0" - id must be present (it may be null for certain error cases) - exactly one of result or error is present
type SSEEvent ¶
type SSEEvent struct {
Data any
}
SSEEvent is a progress event emitted by a streaming tool handler.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the MCP protocol handler. It dispatches JSON-RPC 2.0 messages to the appropriate MCP method handlers (initialize, tools/list, tools/call).
func NewServer ¶
func NewServer(name, version string, opts ...ServerOption) *Server
NewServer creates an MCP server with the given name, version, and options.
func (*Server) Handler ¶
Handler returns an apptheory.Handler that processes MCP JSON-RPC requests.
Streamable HTTP semantics: - POST /mcp accepts JSON-RPC requests, notifications, and responses. - GET /mcp replays/resumes previously interrupted SSE streams via Last-Event-ID. - DELETE /mcp terminates a session.
Notes:
- Clients must initialize first; the server issues Mcp-Session-Id on the initialize response and requires it thereafter.
- Disconnects are not treated as cancellation. Streaming tool execution is decoupled from the connection and can be resumed with GET.
func (*Server) Prompts ¶ added in v0.11.0
func (s *Server) Prompts() *PromptRegistry
Prompts returns the server's prompt registry for registering prompts.
func (*Server) Registry ¶
func (s *Server) Registry() *ToolRegistry
Registry returns the server's tool registry for registering tools.
func (*Server) Resources ¶ added in v0.11.0
func (s *Server) Resources() *ResourceRegistry
Resources returns the server's resource registry for registering resources.
type ServerOption ¶
type ServerOption func(*Server)
ServerOption configures a Server.
func WithCapabilityConfig ¶ added in v1.6.0
func WithCapabilityConfig(config CapabilityConfig) ServerOption
WithCapabilityConfig sets the server capability policy used for initialize responses.
func WithCompletionHooks ¶ added in v1.6.0
func WithCompletionHooks(promptHook, resourceHook CompletionHook) ServerOption
WithCompletionHooks enables completion/complete support through explicit prompt and resource hooks. At least one hook must be non-nil before the completions capability is advertised.
func WithInitialSessionListenerBudget ¶ added in v0.25.0
func WithInitialSessionListenerBudget(opts InitialSessionListenerBudgetOptions) ServerOption
WithInitialSessionListenerBudget caps the initial GET /mcp keepalive listener against the Lambda remaining-time budget when RemainingMS is available.
This option is explicit opt-in and applies only to GET /mcp requests without Last-Event-ID. Resume/replay requests continue to use the existing stream replay path unchanged.
func WithLogger ¶
func WithLogger(logger *slog.Logger) ServerOption
WithLogger sets the structured logger for the server.
func WithLoggingLevelHook ¶ added in v1.6.0
func WithLoggingLevelHook(hook LoggingLevelHook) ServerOption
WithLoggingLevelHook enables logging/setLevel support through an explicit hook.
func WithOriginValidator ¶ added in v0.11.1
func WithOriginValidator(v OriginValidator) ServerOption
WithOriginValidator sets the Origin validator for browser-based callers.
If an Origin header is present, the request is rejected unless it passes validation (fail closed).
func WithResourceSubscriptionHooks ¶ added in v1.6.0
func WithResourceSubscriptionHooks(subscribe, unsubscribe ResourceSubscriptionHook) ServerOption
WithResourceSubscriptionHooks enables resources/subscribe and resources/unsubscribe support through explicit hooks.
The methods fail closed if either hook is absent. The resource subscribe sub-capability remains omitted until AppTheory has an outbound resource update notification contract.
func WithServerIDGenerator ¶
func WithServerIDGenerator(gen apptheory.IDGenerator) ServerOption
WithIDGenerator sets the ID generator for session IDs.
func WithSessionStore ¶
func WithSessionStore(store SessionStore) ServerOption
WithSessionStore sets the session store for the server.
func WithStreamStore ¶ added in v0.11.1
func WithStreamStore(store StreamStore) ServerOption
WithStreamStore sets the stream store for the server.
func WithTaskRuntime ¶ added in v1.6.0
func WithTaskRuntime(opts TaskRuntimeOptions) ServerOption
WithTaskRuntime enables MCP task operations through an explicit TaskStore.
Tasks are experimental in MCP 2025-11-25 and remain opt-in. AppTheory only advertises task capabilities when this option supplies a store and at least one registered tool declares task support.
type Session ¶
type Session struct {
ID string `json:"id"`
CreatedAt time.Time `json:"createdAt"`
ExpiresAt time.Time `json:"expiresAt"`
Data map[string]string `json:"data,omitempty"`
}
Session holds per-session state for an MCP connection.
type SessionStore ¶
type SessionStore interface {
Get(ctx context.Context, id string) (*Session, error)
Put(ctx context.Context, session *Session) error
Delete(ctx context.Context, id string) error
}
SessionStore is the interface for session persistence backends.
func NewDynamoSessionStore ¶
func NewDynamoSessionStore(db tablecore.DB) SessionStore
NewDynamoSessionStore creates a DynamoDB-backed session store.
type StreamEvent ¶ added in v0.11.1
type StreamEvent struct {
ID string
Data json.RawMessage
}
StreamEvent is a single server->client JSON-RPC message framed as an SSE event.
ID is the SSE event id (used for resumability via Last-Event-ID). Data is the raw JSON payload (a single JSON-RPC message).
type StreamStore ¶ added in v0.11.1
type StreamStore interface {
Create(ctx context.Context, sessionID string) (streamID string, err error)
Append(ctx context.Context, sessionID, streamID string, data json.RawMessage) (eventID string, err error)
Close(ctx context.Context, sessionID, streamID string) error
// Subscribe streams events after afterEventID. If afterEventID is empty,
// it streams from the beginning. If afterEventID is present, it must belong
// to the requested stream.
Subscribe(ctx context.Context, sessionID, streamID, afterEventID string) (<-chan StreamEvent, error)
// StreamForEvent returns the stream id that the given event id belongs to.
StreamForEvent(ctx context.Context, sessionID, eventID string) (streamID string, err error)
// DeleteSession removes all stream state for a session.
DeleteSession(ctx context.Context, sessionID string) error
}
StreamStore persists stream events so a disconnected client can replay them via GET + Last-Event-ID.
func NewDynamoStreamStore ¶ added in v0.20.0
func NewDynamoStreamStore(db tablecore.DB) StreamStore
NewDynamoStreamStore creates a DynamoDB-backed StreamStore.
type StreamingToolHandler ¶
type StreamingToolHandler func(ctx context.Context, args json.RawMessage, emit func(SSEEvent)) (*ToolResult, error)
StreamingToolHandler is a tool handler that can emit progress events via SSE.
type Task ¶ added in v1.6.0
type Task struct {
TaskID string `json:"taskId"`
Status TaskStatus `json:"status"`
StatusMessage string `json:"statusMessage,omitempty"`
CreatedAt time.Time `json:"createdAt"`
LastUpdatedAt time.Time `json:"lastUpdatedAt"`
TTL *int64 `json:"ttl"`
PollInterval *int64 `json:"pollInterval,omitempty"`
}
Task is the MCP task state returned by task operations.
type TaskListRequest ¶ added in v1.6.0
type TaskListRequest struct {
SessionID string `json:"sessionId"`
Cursor string `json:"cursor,omitempty"`
Limit int `json:"limit,omitempty"`
}
TaskListRequest scopes a task listing to the active MCP session.
type TaskListResult ¶ added in v1.6.0
type TaskListResult struct {
Tasks []Task `json:"tasks"`
NextCursor string `json:"nextCursor,omitempty"`
}
TaskListResult is the MCP tasks/list result.
type TaskLookup ¶ added in v1.6.0
TaskLookup scopes a task lookup to the active MCP session.
type TaskMetadata ¶ added in v1.6.0
type TaskMetadata struct {
TTL *int64 `json:"ttl,omitempty"`
}
TaskMetadata is the MCP request parameter used to request task-augmented execution.
type TaskRecord ¶ added in v1.6.0
type TaskRecord struct {
SessionID string `json:"sessionId"`
Method string `json:"method"`
ToolName string `json:"toolName,omitempty"`
Task Task `json:"task"`
Result json.RawMessage `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
}
TaskRecord is the durable representation stored by TaskStore implementations.
type TaskRuntimeOptions ¶ added in v1.6.0
type TaskRuntimeOptions struct {
Store TaskStore
DefaultTTL time.Duration
MaxTTL time.Duration
PollInterval time.Duration
ListLimit int
ModelImmediateResponse string
}
TaskRuntimeOptions configures task-augmented MCP execution.
type TaskStatus ¶ added in v1.6.0
type TaskStatus string
TaskStatus is an MCP task lifecycle state.
const ( TaskStatusWorking TaskStatus = "working" TaskStatusInputRequired TaskStatus = "input_required" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" TaskStatusCanceled TaskStatus = "cancel" + "led" )
type TaskStore ¶ added in v1.6.0
type TaskStore interface {
Create(ctx context.Context, task TaskRecord) (*TaskRecord, error)
Get(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)
Update(ctx context.Context, task TaskRecord) (*TaskRecord, error)
List(ctx context.Context, req TaskListRequest) (*TaskListResult, error)
Cancel(ctx context.Context, lookup TaskLookup) (*TaskRecord, error)
DeleteSession(ctx context.Context, sessionID string) error
}
TaskStore persists MCP task state. Implementations must bind every operation to the supplied SessionID and fail closed if a task belongs to a different authorization context.
func NewDynamoTaskStore ¶ added in v1.6.0
NewDynamoTaskStore creates a DynamoDB-backed task store.
type TaskSupport ¶ added in v1.6.0
type TaskSupport string
TaskSupport declares whether a tool can be invoked through MCP task augmentation.
const ( TaskSupportForbidden TaskSupport = "forbidden" TaskSupportOptional TaskSupport = "optional" TaskSupportRequired TaskSupport = "required" )
type ToolAnnotations ¶ added in v0.14.1
type ToolDef ¶
type ToolDef struct {
Name string `json:"name"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Annotations *ToolAnnotations `json:"annotations,omitempty"`
Icons []Icon `json:"icons,omitempty"`
Execution *ToolExecution `json:"execution,omitempty"`
InputSchema json.RawMessage `json:"inputSchema"`
OutputSchema json.RawMessage `json:"outputSchema,omitempty"`
}
ToolDef defines an MCP tool's metadata and input schema.
type ToolExecution ¶ added in v0.14.1
type ToolExecution struct {
// TaskSupport indicates if the tool supports task-augmented execution.
// Values: "forbidden", "optional", "required".
TaskSupport TaskSupport `json:"taskSupport,omitempty"`
}
type ToolHandler ¶
type ToolHandler func(ctx context.Context, args json.RawMessage) (*ToolResult, error)
ToolHandler is the function signature for tool implementations.
type ToolRegistry ¶
type ToolRegistry struct {
// contains filtered or unexported fields
}
ToolRegistry manages registered MCP tools.
func NewToolRegistry ¶
func NewToolRegistry() *ToolRegistry
NewToolRegistry creates an empty tool registry.
func (*ToolRegistry) Call ¶
func (r *ToolRegistry) Call(ctx context.Context, name string, args json.RawMessage) (*ToolResult, error)
Call looks up a tool by name and invokes its handler with the given arguments. It returns an error if the tool is not found.
func (*ToolRegistry) CallStreaming ¶
func (r *ToolRegistry) CallStreaming(ctx context.Context, name string, args json.RawMessage, emit func(SSEEvent)) (*ToolResult, error)
CallStreaming looks up a tool by name and invokes its streaming handler if available, otherwise falls back to the regular handler (discarding emit).
func (*ToolRegistry) Len ¶ added in v1.6.0
func (r *ToolRegistry) Len() int
Len returns the number of registered tools.
func (*ToolRegistry) List ¶
func (r *ToolRegistry) List() []ToolDef
List returns all registered tool definitions in registration order.
func (*ToolRegistry) RegisterStreamingTool ¶
func (r *ToolRegistry) RegisterStreamingTool(def ToolDef, handler StreamingToolHandler) error
RegisterStreamingTool registers a tool that supports SSE streaming. When invoked by a strict Streamable HTTP POST with an Accept header that supports both application/json and text/event-stream, progress events are streamed. Buffered calls to the same tool still run to completion and return a JSON response when the server does not select the SSE path.
func (*ToolRegistry) RegisterTool ¶
func (r *ToolRegistry) RegisterTool(def ToolDef, handler ToolHandler) error
RegisterTool adds a tool to the registry. It returns an error if a tool with the same name is already registered.
type ToolResult ¶
type ToolResult struct {
Content []ContentBlock `json:"content"`
IsError bool `json:"isError,omitempty"`
StructuredContent map[string]any `json:"structuredContent,omitempty"`
}
ToolResult is the result of a tool invocation.