server

package
v1.40.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 35 Imported by: 0

Documentation

Overview

make build-release # Production build make install # Install to GOPATH/bin

Package server provides A2A protocol server implementation for Hector v2.

The server package implements the a2asrv.AgentExecutor interface to expose Hector agents via the A2A protocol (JSON-RPC, gRPC, HTTP).

Usage

executor := server.NewExecutor(server.ExecutorConfig{
    RunnerConfig: runner.Config{
        AppName:        "my-app",
        Agent:          myAgent,
        SessionService: session.InMemoryService(),
    },
})

handler := a2asrv.NewHandler(executor)
http.Handle("/a2a", a2asrv.NewJSONRPCHandler(handler))

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AppContextMiddleware added in v1.21.0

func AppContextMiddleware(appStore app.Store) func(http.Handler) http.Handler

AppContextMiddleware extracts tenant_id from auth claims and injects it into the request context as app context. This enables downstream handlers and stores to access the current app's namespace.

The middleware should be applied after auth middleware so claims are available.

func AppContextMiddlewareWithValidation added in v1.21.0

func AppContextMiddlewareWithValidation(appStore app.Store) func(http.Handler) http.Handler

AppContextMiddlewareWithValidation is like AppContextMiddleware but also validates that the app exists in the store. Returns 403 if app not found.

Types

type AdminHandler added in v1.21.0

type AdminHandler struct {
	// contains filtered or unexported fields
}

AdminHandler handles admin API endpoints for app management.

func NewAdminHandler added in v1.21.0

func NewAdminHandler(cfg AdminHandlerConfig) (*AdminHandler, error)

NewAdminHandler creates a new admin handler.

func (*AdminHandler) ServeHTTP added in v1.21.0

func (h *AdminHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP routes admin requests.

type AdminHandlerConfig added in v1.21.0

type AdminHandlerConfig struct {
	// AppStore is the app storage backend.
	AppStore app.Store

	// TokenIssuer issues JWTs for apps.
	TokenIssuer *auth.TokenIssuer

	// AppManager manages app runtimes.
	AppManager *AppManager

	// SessionService manages app sessions.
	SessionService session.Service

	// VectorProvider manages app vector data.
	VectorProvider vector.Provider

	// TaskDeleter manages task cleanup (optional).
	TaskDeleter TaskDeleter

	// TaskQueue is the task queue for queue admin endpoints (optional).
	TaskQueue native.Queue

	// AdminKey is the secret key for admin authentication (from --admin-key CLI arg).
	AdminKey string

	// RootDir is the root directory for app data (default: .).
	RootDir string

	// ReloadFunc triggers a full config reload (same as SIGHUP). Optional.
	ReloadFunc func()
}

AdminHandlerConfig configures the admin handler.

type AgentRuntime added in v1.21.0

type AgentRuntime struct {
	Executor   *Executor
	Card       *a2a.AgentCard // A2A agent card metadata
	Visibility string         // "public", "internal", "private"
}

AgentRuntime contains the runtime components for an agent.

type AppInstance added in v1.21.0

type AppInstance struct {
	Config  *config.AppConfig
	Runtime Runtime
	Agents  map[string]*AgentRuntime
}

AppInstance holds the runtime and cached agent executors for an app.

type AppManager added in v1.21.0

type AppManager struct {
	// contains filtered or unexported fields
}

AppManager manages the lifecycle of loaded apps and their executors. It handles on-demand loading of app configurations and caching of executors.

func NewAppManager added in v1.21.0

func NewAppManager(store app.Store, factory RuntimeFactory, taskService task.Service, metrics *observability.Metrics) *AppManager

NewAppManager creates a new AppManager.

func (*AppManager) Close added in v1.21.0

func (m *AppManager) Close() error

Close closes all managed runtimes.

func (*AppManager) DocumentStores added in v1.21.0

func (m *AppManager) DocumentStores() map[string]*rag.DocumentStore

DocumentStores returns the document stores from the default app. Implements DocumentStoreProvider for backward compatibility.

func (*AppManager) GetAppConfig added in v1.21.0

func (m *AppManager) GetAppConfig(ctx context.Context, appID string) (*config.AppConfig, error)

GetAppConfig returns the configuration for an app. Loads the app if not active.

func (*AppManager) GetRuntime added in v1.21.0

func (m *AppManager) GetRuntime(ctx context.Context, appID, agentName string) (*AgentRuntime, error)

GetRuntime returns the runtime components for a specific agent in a specific app.

func (*AppManager) Invalidate added in v1.21.0

func (m *AppManager) Invalidate(appID string)

Invalidate clears the cache for an app and closes its runtime. Should be called when app config is updated.

func (*AppManager) ListAgents added in v1.21.0

func (m *AppManager) ListAgents(ctx context.Context, appID string) ([]*AgentRuntime, error)

ListAgents returns all configured agent runtimes for an app. Used for discovery and default agent resolution.

func (*AppManager) LookupWebhookInConfig added in v1.21.0

func (m *AppManager) LookupWebhookInConfig(path string, cfg *config.AppConfig) (string, bool)

LookupWebhookInConfig searches for a matching webhook trigger in an app config.

func (*AppManager) Preload added in v1.21.0

func (m *AppManager) Preload(ctx context.Context, appID string) error

Preload ensures an app is loaded into memory. Used for default app bootstrapping.

func (*AppManager) ResolveWebhook added in v1.21.0

func (m *AppManager) ResolveWebhook(ctx context.Context, path string) (appID, agentName string, err error)

ResolveWebhook resolves an app and agent from a webhook path.

func (*AppManager) UnloadApp added in v1.21.0

func (m *AppManager) UnloadApp(ctx context.Context, appID string) error

UnloadApp synchronously closes an app's runtime and removes it from cache. Checks if the app is currently running before attempting to close.

type ApprovalResponse

type ApprovalResponse struct {
	// Decision is "approve" or "deny"
	Decision string
	// ToolCallID is the ID of the tool call being approved/denied
	ToolCallID string
	// TaskID is the task this approval is for
	TaskID string
}

ApprovalResponse represents an approval decision from the user.

func ExtractApprovalResponse

func ExtractApprovalResponse(msg *a2a.Message) *ApprovalResponse

ExtractApprovalResponse checks if a message contains an approval response. Returns nil if the message is not an approval response.

Approval responses can be: 1. A DataPart with type: "tool_approval" 2. A TextPart with "approve" or "deny" (for simple approvals)

type DocumentStoreProvider added in v1.15.2

type DocumentStoreProvider interface {
	DocumentStores() map[string]*rag.DocumentStore
}

DocumentStoreProvider provides access to document stores for status reporting. This interface avoids importing the runtime package directly.

type DynamicAgentHandler added in v1.21.0

type DynamicAgentHandler struct {
	// contains filtered or unexported fields
}

DynamicAgentHandler routes requests to agent executors dynamically based on the current app context (tenant_id) and the agent name in the URL path.

func NewDynamicAgentHandler added in v1.21.0

func NewDynamicAgentHandler(
	appManager *AppManager,
	serverConfig *config.ServerConfig,
	authInterceptor *auth.Interceptor,
	authValidator auth.TokenValidator,
	taskStore a2asrv.TaskStore,
	execProvider execution.Provider,
) *DynamicAgentHandler

NewDynamicAgentHandler creates a new dynamic agent handler.

func (*DynamicAgentHandler) ServeHTTP added in v1.21.0

func (h *DynamicAgentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP routes requests dynamically. Configured at path "/agents/".

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor implements a2asrv.AgentExecutor to bridge Hector agents to A2A.

Event translation follows these rules:

  • New task: emit TaskStatusUpdateEvent with TaskStateSubmitted
  • Before runner invocation: emit TaskStatusUpdateEvent with TaskStateWorking
  • For each agent.Event: emit TaskArtifactUpdateEvent with translated parts
  • After last event: emit TaskArtifactUpdateEvent with LastChunk=true
  • On LLM error: emit TaskStatusUpdateEvent with TaskStateFailed
  • On long-running tool: emit TaskStatusUpdateEvent with TaskStateInputRequired
  • On success: emit TaskStatusUpdateEvent with TaskStateCompleted

func NewExecutor

func NewExecutor(config ExecutorConfig) *Executor

NewExecutor creates a new A2A executor.

func (*Executor) Cancel

func (e *Executor) Cancel(ctx context.Context, reqCtx *a2asrv.RequestContext, queue eventqueue.Queue) error

Cancel implements a2asrv.AgentExecutor.

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, queue eventqueue.Queue) error

Execute implements a2asrv.AgentExecutor.

func (*Executor) ResumeFromCheckpoint added in v1.16.1

func (e *Executor) ResumeFromCheckpoint(ctx context.Context, state *checkpoint.State) error

ResumeFromCheckpoint resumes agent execution from a checkpoint. This is called by the checkpoint recovery manager on startup.

The session already contains the conversation history from before the crash/shutdown. We re-invoke the agent with the original query, and it will continue from where it left off.

type ExecutorConfig

type ExecutorConfig struct {
	// RunnerConfig is used to create a runner for each execution.
	RunnerConfig runner.Config

	// RunConfig contains runtime configuration for agent execution.
	RunConfig agent.RunConfig

	// TaskService provides task management for cascade cancellation.
	// If nil, cascade cancellation will not work.
	TaskService task.Service

	// Notifier dispatches outbound notifications on task events.
	// If nil, notifications are disabled.
	Notifier *notification.Notifier
}

ExecutorConfig contains the configuration for the A2A executor.

type HTTPServer

type HTTPServer struct {
	// contains filtered or unexported fields
}

HTTPServer is the Hector HTTP server. Uses a2a-go native handlers for A2A protocol compliance.

func NewHTTPServer

func NewHTTPServer(serverCfg *config.ServerConfig, appCfg *config.AppConfig, appManager *AppManager, opts ...HTTPServerOption) *HTTPServer

NewHTTPServer creates a new HTTP server from config. appManager handles loading apps and creating executors.

func (*HTTPServer) Address

func (s *HTTPServer) Address() string

Address returns the HTTP server address.

func (*HTTPServer) GRPCAddress

func (s *HTTPServer) GRPCAddress() string

GRPCAddress returns the gRPC server address (if enabled). Transport and gRPC features temporarily disabled during config refactoring.

func (*HTTPServer) GetAgentA2AInvoker added in v1.15.1

func (s *HTTPServer) GetAgentA2AInvoker() trigger.AgentInvoker

GetAgentA2AInvoker returns an AgentInvoker that uses A2A handlers for invocation. This enables webhooks to invoke agents through A2A protocol with automatic TaskStore registration.

func (*HTTPServer) GetTaskResultProvider added in v1.15.1

func (s *HTTPServer) GetTaskResultProvider() func(ctx context.Context, taskID string) (string, error)

GetTaskResultProvider returns a function that fetches task result from TaskStore.

func (*HTTPServer) Reload added in v1.21.0

func (s *HTTPServer) Reload(newCfg *config.AppConfig, newAppManager *AppManager)

Reload updates the server configuration and state. Routes are dynamic, so this simply updates the internal config reference.

func (*HTTPServer) Shutdown

func (s *HTTPServer) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the server(s).

func (*HTTPServer) Start

func (s *HTTPServer) Start(ctx context.Context) error

Start starts the HTTP server.

type HTTPServerOption

type HTTPServerOption func(*HTTPServer)

HTTPServerOption configures the HTTP server.

func WithAdminHandler added in v1.21.0

func WithAdminHandler(handler *AdminHandler) HTTPServerOption

WithAdminHandler sets the admin handler for app management.

func WithAuthValidator

func WithAuthValidator(validator auth.TokenValidator) HTTPServerOption

WithAuthValidator sets the JWT validator for authentication. When set, HTTP requests will be validated and claims passed to agents.

func WithDocumentStoreProvider added in v1.15.2

func WithDocumentStoreProvider(p DocumentStoreProvider) HTTPServerOption

WithDocumentStoreProvider sets the provider for document store status.

func WithExecutionProvider added in v1.21.0

func WithExecutionProvider(p execution.Provider) HTTPServerOption

WithExecutionProvider sets the execution provider for async execution.

func WithObservability

func WithObservability(obs *observability.Manager) HTTPServerOption

WithObservability sets the observability manager for tracing and metrics.

func WithRateLimiter added in v1.21.0

func WithRateLimiter(limiter ratelimit.RateLimiter) HTTPServerOption

WithRateLimiter sets the rate limiter for request rate limiting.

func WithSessionService added in v1.21.0

func WithSessionService(ss session.Service) HTTPServerOption

WithSessionService sets the session service.

func WithTaskService

func WithTaskService(ts task.Service) HTTPServerOption

WithTaskService sets the task service for task-scoped cancellation.

func WithTaskStore

func WithTaskStore(store a2asrv.TaskStore) HTTPServerOption

WithTaskStore sets the task store for persistent task storage. If not set, a2a-go uses its internal in-memory store.

func WithUIHandler added in v1.21.0

func WithUIHandler(handler http.Handler) HTTPServerOption

WithUIHandler sets the handler for serving the web UI.

type QueuedRequestHandler added in v1.21.0

type QueuedRequestHandler struct {
	// contains filtered or unexported fields
}

QueuedRequestHandler wraps a a2asrv.RequestHandler and intercepts async execution requests to enqueue them for durable processing. It implements a2asrv.RequestHandler.

func NewQueuedRequestHandler added in v1.21.0

func NewQueuedRequestHandler(delegate a2asrv.RequestHandler, provider execution.Provider, appID, agentName string) *QueuedRequestHandler

NewQueuedRequestHandler creates a new QueuedRequestHandler.

func (*QueuedRequestHandler) OnCancelTask added in v1.21.0

func (h *QueuedRequestHandler) OnCancelTask(ctx context.Context, params *a2a.TaskIDParams) (*a2a.Task, error)

OnCancelTask delegates to the underlying handler.

func (*QueuedRequestHandler) OnDeleteTaskPushConfig added in v1.21.0

func (h *QueuedRequestHandler) OnDeleteTaskPushConfig(ctx context.Context, params *a2a.DeleteTaskPushConfigParams) error

OnDeleteTaskPushConfig delegates to the underlying handler.

func (*QueuedRequestHandler) OnGetExtendedAgentCard added in v1.21.0

func (h *QueuedRequestHandler) OnGetExtendedAgentCard(ctx context.Context) (*a2a.AgentCard, error)

OnGetExtendedAgentCard delegates to the underlying handler.

func (*QueuedRequestHandler) OnGetTask added in v1.21.0

func (h *QueuedRequestHandler) OnGetTask(ctx context.Context, params *a2a.TaskQueryParams) (*a2a.Task, error)

OnGetTask delegates to the underlying handler.

func (*QueuedRequestHandler) OnGetTaskPushConfig added in v1.21.0

func (h *QueuedRequestHandler) OnGetTaskPushConfig(ctx context.Context, params *a2a.GetTaskPushConfigParams) (*a2a.TaskPushConfig, error)

OnGetTaskPushConfig delegates to the underlying handler.

func (*QueuedRequestHandler) OnListTaskPushConfig added in v1.21.0

func (h *QueuedRequestHandler) OnListTaskPushConfig(ctx context.Context, params *a2a.ListTaskPushConfigParams) ([]*a2a.TaskPushConfig, error)

OnListTaskPushConfig delegates to the underlying handler.

func (*QueuedRequestHandler) OnResubscribeToTask added in v1.21.0

func (h *QueuedRequestHandler) OnResubscribeToTask(ctx context.Context, params *a2a.TaskIDParams) iter.Seq2[a2a.Event, error]

OnResubscribeToTask delegates to the underlying handler.

func (*QueuedRequestHandler) OnSendMessage added in v1.21.0

OnSendMessage handles message sending requests. Intercepts requests with blocking=false and enqueues them.

func (*QueuedRequestHandler) OnSendMessageStream added in v1.21.0

func (h *QueuedRequestHandler) OnSendMessageStream(ctx context.Context, params *a2a.MessageSendParams) iter.Seq2[a2a.Event, error]

OnSendMessageStream delegates to the underlying handler.

func (*QueuedRequestHandler) OnSetTaskPushConfig added in v1.21.0

func (h *QueuedRequestHandler) OnSetTaskPushConfig(ctx context.Context, params *a2a.TaskPushConfig) (*a2a.TaskPushConfig, error)

OnSetTaskPushConfig delegates to the underlying handler.

type Runtime added in v1.21.0

type Runtime interface {
	// Closer closes the runtime and releases resources.
	Close() error
	// ListAgents returns the names of all configured agents.
	ListAgents() []string
	// RunnerConfig creating a runner config for an agent.
	RunnerConfig(agentName string) (*runner.Config, error)
	// DocumentStores returns the document stores for RAG status.
	DocumentStores() map[string]*rag.DocumentStore
	// Notifier returns the notifier service.
	Notifier() *notification.Notifier
}

Runtime defines the interface for an agent runtime. Implemented by pkg/runtime.Runtime.

type RuntimeFactory added in v1.21.0

type RuntimeFactory func(ctx context.Context, appID string, cfg *config.AppConfig) (Runtime, error)

RuntimeFactory functions create a runtime for a given app configuration.

type TaskDeleter added in v1.21.0

type TaskDeleter interface {
	DeleteByApp(ctx context.Context, appName string) error
}

TaskDeleter defines the interface for deleting app-specific tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL