Documentation
¶
Overview ¶
make build-release # Production build make install # Install to GOPATH/bin
Package server provides A2A protocol server implementation for Hector v2.
The server package implements the a2asrv.AgentExecutor interface to expose Hector agents via the A2A protocol (JSON-RPC, gRPC, HTTP).
Usage ¶
executor := server.NewExecutor(server.ExecutorConfig{
RunnerConfig: runner.Config{
AppName: "my-app",
Agent: myAgent,
SessionService: session.InMemoryService(),
},
})
handler := a2asrv.NewHandler(executor)
http.Handle("/a2a", a2asrv.NewJSONRPCHandler(handler))
Index ¶
- func AppContextMiddleware(appStore app.Store) func(http.Handler) http.Handler
- func AppContextMiddlewareWithValidation(appStore app.Store) func(http.Handler) http.Handler
- type AdminHandler
- type AdminHandlerConfig
- type AgentRuntime
- type AppInstance
- type AppManager
- func (m *AppManager) Close() error
- func (m *AppManager) DocumentStores() map[string]*rag.DocumentStore
- func (m *AppManager) GetAppConfig(ctx context.Context, appID string) (*config.AppConfig, error)
- func (m *AppManager) GetRuntime(ctx context.Context, appID, agentName string) (*AgentRuntime, error)
- func (m *AppManager) Invalidate(appID string)
- func (m *AppManager) ListAgents(ctx context.Context, appID string) ([]*AgentRuntime, error)
- func (m *AppManager) LookupWebhookInConfig(path string, cfg *config.AppConfig) (string, bool)
- func (m *AppManager) Preload(ctx context.Context, appID string) error
- func (m *AppManager) ResolveWebhook(ctx context.Context, path string) (appID, agentName string, err error)
- func (m *AppManager) UnloadApp(ctx context.Context, appID string) error
- type ApprovalResponse
- type DocumentStoreProvider
- type DynamicAgentHandler
- type Executor
- func (e *Executor) Cancel(ctx context.Context, reqCtx *a2asrv.RequestContext, queue eventqueue.Queue) error
- func (e *Executor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, queue eventqueue.Queue) error
- func (e *Executor) ResumeFromCheckpoint(ctx context.Context, state *checkpoint.State) error
- type ExecutorConfig
- type HTTPServer
- func (s *HTTPServer) Address() string
- func (s *HTTPServer) GRPCAddress() string
- func (s *HTTPServer) GetAgentA2AInvoker() trigger.AgentInvoker
- func (s *HTTPServer) GetTaskResultProvider() func(ctx context.Context, taskID string) (string, error)
- func (s *HTTPServer) Reload(newCfg *config.AppConfig, newAppManager *AppManager)
- func (s *HTTPServer) Shutdown(ctx context.Context) error
- func (s *HTTPServer) Start(ctx context.Context) error
- type HTTPServerOption
- func WithAdminHandler(handler *AdminHandler) HTTPServerOption
- func WithAuthValidator(validator auth.TokenValidator) HTTPServerOption
- func WithDocumentStoreProvider(p DocumentStoreProvider) HTTPServerOption
- func WithExecutionProvider(p execution.Provider) HTTPServerOption
- func WithObservability(obs *observability.Manager) HTTPServerOption
- func WithRateLimiter(limiter ratelimit.RateLimiter) HTTPServerOption
- func WithSessionService(ss session.Service) HTTPServerOption
- func WithTaskService(ts task.Service) HTTPServerOption
- func WithTaskStore(store a2asrv.TaskStore) HTTPServerOption
- func WithUIHandler(handler http.Handler) HTTPServerOption
- type QueuedRequestHandler
- func (h *QueuedRequestHandler) OnCancelTask(ctx context.Context, params *a2a.TaskIDParams) (*a2a.Task, error)
- func (h *QueuedRequestHandler) OnDeleteTaskPushConfig(ctx context.Context, params *a2a.DeleteTaskPushConfigParams) error
- func (h *QueuedRequestHandler) OnGetExtendedAgentCard(ctx context.Context) (*a2a.AgentCard, error)
- func (h *QueuedRequestHandler) OnGetTask(ctx context.Context, params *a2a.TaskQueryParams) (*a2a.Task, error)
- func (h *QueuedRequestHandler) OnGetTaskPushConfig(ctx context.Context, params *a2a.GetTaskPushConfigParams) (*a2a.TaskPushConfig, error)
- func (h *QueuedRequestHandler) OnListTaskPushConfig(ctx context.Context, params *a2a.ListTaskPushConfigParams) ([]*a2a.TaskPushConfig, error)
- func (h *QueuedRequestHandler) OnResubscribeToTask(ctx context.Context, params *a2a.TaskIDParams) iter.Seq2[a2a.Event, error]
- func (h *QueuedRequestHandler) OnSendMessage(ctx context.Context, params *a2a.MessageSendParams) (a2a.SendMessageResult, error)
- func (h *QueuedRequestHandler) OnSendMessageStream(ctx context.Context, params *a2a.MessageSendParams) iter.Seq2[a2a.Event, error]
- func (h *QueuedRequestHandler) OnSetTaskPushConfig(ctx context.Context, params *a2a.TaskPushConfig) (*a2a.TaskPushConfig, error)
- type Runtime
- type RuntimeFactory
- type TaskDeleter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AppContextMiddleware ¶ added in v1.21.0
AppContextMiddleware extracts tenant_id from auth claims and injects it into the request context as app context. This enables downstream handlers and stores to access the current app's namespace.
The middleware should be applied after auth middleware so claims are available.
Types ¶
type AdminHandler ¶ added in v1.21.0
type AdminHandler struct {
// contains filtered or unexported fields
}
AdminHandler handles admin API endpoints for app management.
func NewAdminHandler ¶ added in v1.21.0
func NewAdminHandler(cfg AdminHandlerConfig) (*AdminHandler, error)
NewAdminHandler creates a new admin handler.
func (*AdminHandler) ServeHTTP ¶ added in v1.21.0
func (h *AdminHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP routes admin requests.
type AdminHandlerConfig ¶ added in v1.21.0
type AdminHandlerConfig struct {
// AppStore is the app storage backend.
AppStore app.Store
// TokenIssuer issues JWTs for apps.
TokenIssuer *auth.TokenIssuer
// AppManager manages app runtimes.
AppManager *AppManager
// SessionService manages app sessions.
SessionService session.Service
// VectorProvider manages app vector data.
VectorProvider vector.Provider
// TaskDeleter manages task cleanup (optional).
TaskDeleter TaskDeleter
// TaskQueue is the task queue for queue admin endpoints (optional).
TaskQueue native.Queue
// AdminKey is the secret key for admin authentication (from --admin-key CLI arg).
AdminKey string
// RootDir is the root directory for app data (default: .).
RootDir string
}
AdminHandlerConfig configures the admin handler.
type AgentRuntime ¶ added in v1.21.0
type AgentRuntime struct {
Executor *Executor
Card *a2a.AgentCard // A2A agent card metadata
Visibility string // "public", "internal", "private"
}
AgentRuntime contains the runtime components for an agent.
type AppInstance ¶ added in v1.21.0
type AppInstance struct {
Config *config.AppConfig
Runtime Runtime
Agents map[string]*AgentRuntime
}
AppInstance holds the runtime and cached agent executors for an app.
type AppManager ¶ added in v1.21.0
type AppManager struct {
// contains filtered or unexported fields
}
AppManager manages the lifecycle of loaded apps and their executors. It handles on-demand loading of app configurations and caching of executors.
func NewAppManager ¶ added in v1.21.0
func NewAppManager(store app.Store, factory RuntimeFactory, taskService task.Service, metrics *observability.Metrics) *AppManager
NewAppManager creates a new AppManager.
func (*AppManager) Close ¶ added in v1.21.0
func (m *AppManager) Close() error
Close closes all managed runtimes.
func (*AppManager) DocumentStores ¶ added in v1.21.0
func (m *AppManager) DocumentStores() map[string]*rag.DocumentStore
DocumentStores returns the document stores from the default app. Implements DocumentStoreProvider for backward compatibility.
func (*AppManager) GetAppConfig ¶ added in v1.21.0
GetAppConfig returns the configuration for an app. Loads the app if not active.
func (*AppManager) GetRuntime ¶ added in v1.21.0
func (m *AppManager) GetRuntime(ctx context.Context, appID, agentName string) (*AgentRuntime, error)
GetRuntime returns the runtime components for a specific agent in a specific app.
func (*AppManager) Invalidate ¶ added in v1.21.0
func (m *AppManager) Invalidate(appID string)
Invalidate clears the cache for an app and closes its runtime. Should be called when app config is updated.
func (*AppManager) ListAgents ¶ added in v1.21.0
func (m *AppManager) ListAgents(ctx context.Context, appID string) ([]*AgentRuntime, error)
ListAgents returns all configured agent runtimes for an app. Used for discovery and default agent resolution.
func (*AppManager) LookupWebhookInConfig ¶ added in v1.21.0
LookupWebhookInConfig searches for a matching webhook trigger in an app config.
func (*AppManager) Preload ¶ added in v1.21.0
func (m *AppManager) Preload(ctx context.Context, appID string) error
Preload ensures an app is loaded into memory. Used for default app bootstrapping.
func (*AppManager) ResolveWebhook ¶ added in v1.21.0
func (m *AppManager) ResolveWebhook(ctx context.Context, path string) (appID, agentName string, err error)
ResolveWebhook resolves an app and agent from a webhook path.
type ApprovalResponse ¶
type ApprovalResponse struct {
// Decision is "approve" or "deny"
Decision string
// ToolCallID is the ID of the tool call being approved/denied
ToolCallID string
// TaskID is the task this approval is for
TaskID string
}
ApprovalResponse represents an approval decision from the user.
func ExtractApprovalResponse ¶
func ExtractApprovalResponse(msg *a2a.Message) *ApprovalResponse
ExtractApprovalResponse checks if a message contains an approval response. Returns nil if the message is not an approval response.
Approval responses can be: 1. A DataPart with type: "tool_approval" 2. A TextPart with "approve" or "deny" (for simple approvals)
type DocumentStoreProvider ¶ added in v1.15.2
type DocumentStoreProvider interface {
DocumentStores() map[string]*rag.DocumentStore
}
DocumentStoreProvider provides access to document stores for status reporting. This interface avoids importing the runtime package directly.
type DynamicAgentHandler ¶ added in v1.21.0
type DynamicAgentHandler struct {
// contains filtered or unexported fields
}
DynamicAgentHandler routes requests to agent executors dynamically based on the current app context (tenant_id) and the agent name in the URL path.
func NewDynamicAgentHandler ¶ added in v1.21.0
func NewDynamicAgentHandler( appManager *AppManager, serverConfig *config.ServerConfig, authInterceptor *auth.Interceptor, authValidator auth.TokenValidator, taskStore a2asrv.TaskStore, execProvider execution.Provider, ) *DynamicAgentHandler
NewDynamicAgentHandler creates a new dynamic agent handler.
func (*DynamicAgentHandler) ServeHTTP ¶ added in v1.21.0
func (h *DynamicAgentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP routes requests dynamically. Configured at path "/agents/".
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor implements a2asrv.AgentExecutor to bridge Hector agents to A2A.
Event translation follows these rules:
- New task: emit TaskStatusUpdateEvent with TaskStateSubmitted
- Before runner invocation: emit TaskStatusUpdateEvent with TaskStateWorking
- For each agent.Event: emit TaskArtifactUpdateEvent with translated parts
- After last event: emit TaskArtifactUpdateEvent with LastChunk=true
- On LLM error: emit TaskStatusUpdateEvent with TaskStateFailed
- On long-running tool: emit TaskStatusUpdateEvent with TaskStateInputRequired
- On success: emit TaskStatusUpdateEvent with TaskStateCompleted
func NewExecutor ¶
func NewExecutor(config ExecutorConfig) *Executor
NewExecutor creates a new A2A executor.
func (*Executor) Cancel ¶
func (e *Executor) Cancel(ctx context.Context, reqCtx *a2asrv.RequestContext, queue eventqueue.Queue) error
Cancel implements a2asrv.AgentExecutor.
func (*Executor) Execute ¶
func (e *Executor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, queue eventqueue.Queue) error
Execute implements a2asrv.AgentExecutor.
func (*Executor) ResumeFromCheckpoint ¶ added in v1.16.1
ResumeFromCheckpoint resumes agent execution from a checkpoint. This is called by the checkpoint recovery manager on startup.
The session already contains the conversation history from before the crash/shutdown. We re-invoke the agent with the original query, and it will continue from where it left off.
type ExecutorConfig ¶
type ExecutorConfig struct {
// RunnerConfig is used to create a runner for each execution.
RunnerConfig runner.Config
// RunConfig contains runtime configuration for agent execution.
RunConfig agent.RunConfig
// TaskService provides task management for cascade cancellation.
// If nil, cascade cancellation will not work.
TaskService task.Service
// Notifier dispatches outbound notifications on task events.
// If nil, notifications are disabled.
Notifier *notification.Notifier
}
ExecutorConfig contains the configuration for the A2A executor.
type HTTPServer ¶
type HTTPServer struct {
// contains filtered or unexported fields
}
HTTPServer is the Hector HTTP server. Uses a2a-go native handlers for A2A protocol compliance.
func NewHTTPServer ¶
func NewHTTPServer(serverCfg *config.ServerConfig, appCfg *config.AppConfig, appManager *AppManager, opts ...HTTPServerOption) *HTTPServer
NewHTTPServer creates a new HTTP server from config. appManager handles loading apps and creating executors.
func (*HTTPServer) Address ¶
func (s *HTTPServer) Address() string
Address returns the HTTP server address.
func (*HTTPServer) GRPCAddress ¶
func (s *HTTPServer) GRPCAddress() string
GRPCAddress returns the gRPC server address (if enabled). Transport and gRPC features temporarily disabled during config refactoring.
func (*HTTPServer) GetAgentA2AInvoker ¶ added in v1.15.1
func (s *HTTPServer) GetAgentA2AInvoker() trigger.AgentInvoker
GetAgentA2AInvoker returns an AgentInvoker that uses A2A handlers for invocation. This enables webhooks to invoke agents through A2A protocol with automatic TaskStore registration.
func (*HTTPServer) GetTaskResultProvider ¶ added in v1.15.1
func (s *HTTPServer) GetTaskResultProvider() func(ctx context.Context, taskID string) (string, error)
GetTaskResultProvider returns a function that fetches task result from TaskStore.
func (*HTTPServer) Reload ¶ added in v1.21.0
func (s *HTTPServer) Reload(newCfg *config.AppConfig, newAppManager *AppManager)
Reload updates the server configuration and state. Routes are dynamic, so this simply updates the internal config reference.
type HTTPServerOption ¶
type HTTPServerOption func(*HTTPServer)
HTTPServerOption configures the HTTP server.
func WithAdminHandler ¶ added in v1.21.0
func WithAdminHandler(handler *AdminHandler) HTTPServerOption
WithAdminHandler sets the admin handler for app management.
func WithAuthValidator ¶
func WithAuthValidator(validator auth.TokenValidator) HTTPServerOption
WithAuthValidator sets the JWT validator for authentication. When set, HTTP requests will be validated and claims passed to agents.
func WithDocumentStoreProvider ¶ added in v1.15.2
func WithDocumentStoreProvider(p DocumentStoreProvider) HTTPServerOption
WithDocumentStoreProvider sets the provider for document store status.
func WithExecutionProvider ¶ added in v1.21.0
func WithExecutionProvider(p execution.Provider) HTTPServerOption
WithExecutionProvider sets the execution provider for async execution.
func WithObservability ¶
func WithObservability(obs *observability.Manager) HTTPServerOption
WithObservability sets the observability manager for tracing and metrics.
func WithRateLimiter ¶ added in v1.21.0
func WithRateLimiter(limiter ratelimit.RateLimiter) HTTPServerOption
WithRateLimiter sets the rate limiter for request rate limiting.
func WithSessionService ¶ added in v1.21.0
func WithSessionService(ss session.Service) HTTPServerOption
WithSessionService sets the session service.
func WithTaskService ¶
func WithTaskService(ts task.Service) HTTPServerOption
WithTaskService sets the task service for task-scoped cancellation.
func WithTaskStore ¶
func WithTaskStore(store a2asrv.TaskStore) HTTPServerOption
WithTaskStore sets the task store for persistent task storage. If not set, a2a-go uses its internal in-memory store.
func WithUIHandler ¶ added in v1.21.0
func WithUIHandler(handler http.Handler) HTTPServerOption
WithUIHandler sets the handler for serving the web UI.
type QueuedRequestHandler ¶ added in v1.21.0
type QueuedRequestHandler struct {
// contains filtered or unexported fields
}
QueuedRequestHandler wraps a a2asrv.RequestHandler and intercepts async execution requests to enqueue them for durable processing. It implements a2asrv.RequestHandler.
func NewQueuedRequestHandler ¶ added in v1.21.0
func NewQueuedRequestHandler(delegate a2asrv.RequestHandler, provider execution.Provider, appID, agentName string) *QueuedRequestHandler
NewQueuedRequestHandler creates a new QueuedRequestHandler.
func (*QueuedRequestHandler) OnCancelTask ¶ added in v1.21.0
func (h *QueuedRequestHandler) OnCancelTask(ctx context.Context, params *a2a.TaskIDParams) (*a2a.Task, error)
OnCancelTask delegates to the underlying handler.
func (*QueuedRequestHandler) OnDeleteTaskPushConfig ¶ added in v1.21.0
func (h *QueuedRequestHandler) OnDeleteTaskPushConfig(ctx context.Context, params *a2a.DeleteTaskPushConfigParams) error
OnDeleteTaskPushConfig delegates to the underlying handler.
func (*QueuedRequestHandler) OnGetExtendedAgentCard ¶ added in v1.21.0
OnGetExtendedAgentCard delegates to the underlying handler.
func (*QueuedRequestHandler) OnGetTask ¶ added in v1.21.0
func (h *QueuedRequestHandler) OnGetTask(ctx context.Context, params *a2a.TaskQueryParams) (*a2a.Task, error)
OnGetTask delegates to the underlying handler.
func (*QueuedRequestHandler) OnGetTaskPushConfig ¶ added in v1.21.0
func (h *QueuedRequestHandler) OnGetTaskPushConfig(ctx context.Context, params *a2a.GetTaskPushConfigParams) (*a2a.TaskPushConfig, error)
OnGetTaskPushConfig delegates to the underlying handler.
func (*QueuedRequestHandler) OnListTaskPushConfig ¶ added in v1.21.0
func (h *QueuedRequestHandler) OnListTaskPushConfig(ctx context.Context, params *a2a.ListTaskPushConfigParams) ([]*a2a.TaskPushConfig, error)
OnListTaskPushConfig delegates to the underlying handler.
func (*QueuedRequestHandler) OnResubscribeToTask ¶ added in v1.21.0
func (h *QueuedRequestHandler) OnResubscribeToTask(ctx context.Context, params *a2a.TaskIDParams) iter.Seq2[a2a.Event, error]
OnResubscribeToTask delegates to the underlying handler.
func (*QueuedRequestHandler) OnSendMessage ¶ added in v1.21.0
func (h *QueuedRequestHandler) OnSendMessage(ctx context.Context, params *a2a.MessageSendParams) (a2a.SendMessageResult, error)
OnSendMessage handles message sending requests. Intercepts requests with blocking=false and enqueues them.
func (*QueuedRequestHandler) OnSendMessageStream ¶ added in v1.21.0
func (h *QueuedRequestHandler) OnSendMessageStream(ctx context.Context, params *a2a.MessageSendParams) iter.Seq2[a2a.Event, error]
OnSendMessageStream delegates to the underlying handler.
func (*QueuedRequestHandler) OnSetTaskPushConfig ¶ added in v1.21.0
func (h *QueuedRequestHandler) OnSetTaskPushConfig(ctx context.Context, params *a2a.TaskPushConfig) (*a2a.TaskPushConfig, error)
OnSetTaskPushConfig delegates to the underlying handler.
type Runtime ¶ added in v1.21.0
type Runtime interface {
// Closer closes the runtime and releases resources.
Close() error
// ListAgents returns the names of all configured agents.
ListAgents() []string
// RunnerConfig creating a runner config for an agent.
RunnerConfig(agentName string) (*runner.Config, error)
// DocumentStores returns the document stores for RAG status.
DocumentStores() map[string]*rag.DocumentStore
// Notifier returns the notifier service.
Notifier() *notification.Notifier
}
Runtime defines the interface for an agent runtime. Implemented by pkg/runtime.Runtime.
type RuntimeFactory ¶ added in v1.21.0
RuntimeFactory functions create a runtime for a given app configuration.