handlers

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: MIT Imports: 13 Imported by: 0

README

API Handlers

这个包提供了 AgentFlow HTTP API 的处理器实现。

📁 文件结构

api/handlers/
├── common.go   # 通用响应函数和错误处理
├── health.go   # 健康检查处理器
├── chat.go     # 聊天接口处理器
├── agent.go    # Agent 管理处理器
└── README.md   # 本文档

🎯 设计原则

1. 统一错误处理

所有 handler 使用 types.Error 进行错误处理,通过 WriteError() 函数统一返回错误响应。

err := types.NewError(types.ErrInvalidRequest, "model is required")
WriteError(w, err, logger)
2. 统一响应格式

所有 API 响应使用统一的 Response 结构:

{
  "success": true,
  "data": {...},
  "timestamp": "2026-02-20T10:00:00Z"
}

错误响应:

{
  "success": false,
  "error": {
    "code": "INVALID_REQUEST",
    "message": "model is required",
    "retryable": false
  },
  "timestamp": "2026-02-20T10:00:00Z"
}
3. 类型安全
  • 使用 DecodeJSONBody() 解码请求,自动验证 JSON 格式
  • 使用 ValidateContentType() 验证 Content-Type
  • 所有请求/响应都有明确的类型定义

📖 使用示例

健康检查
healthHandler := handlers.NewHealthHandler(logger)

// 注册健康检查
healthHandler.RegisterCheck(handlers.NewDatabaseHealthCheck("postgres", db.Ping))
healthHandler.RegisterCheck(handlers.NewRedisHealthCheck("redis", redis.Ping))

// 注册路由
http.HandleFunc("/health", healthHandler.HandleHealth)
http.HandleFunc("/healthz", healthHandler.HandleHealthz)
http.HandleFunc("/ready", healthHandler.HandleReady)
http.HandleFunc("/version", healthHandler.HandleVersion(version, buildTime, gitCommit))
聊天接口
chatHandler := handlers.NewChatHandler(provider, logger)

// 注册路由
http.HandleFunc("/v1/chat/completions", chatHandler.HandleCompletion)
http.HandleFunc("/v1/chat/completions/stream", chatHandler.HandleStream)
Agent 管理
agentHandler := handlers.NewAgentHandler(discoveryRegistry, agentRegistry, logger)

// 注册路由
http.HandleFunc("/v1/agents", agentHandler.HandleListAgents)
http.HandleFunc("/v1/agents/execute", agentHandler.HandleExecuteAgent)
http.HandleFunc("/v1/agents/plan", agentHandler.HandlePlanAgent)
http.HandleFunc("/v1/agents/health", agentHandler.HandleAgentHealth)

🔧 辅助函数

WriteJSON

写入 JSON 响应(带正确的 Content-Type 和安全头)

WriteJSON(w, http.StatusOK, data)
WriteSuccess

写入成功响应(自动包装为 Response 结构)

WriteSuccess(w, data)
WriteError

写入错误响应(从 types.Error 转换)

err := types.NewError(types.ErrInvalidRequest, "invalid input")
WriteError(w, err, logger)
WriteErrorMessage

写入简单错误消息

WriteErrorMessage(w, http.StatusBadRequest, types.ErrInvalidRequest, "invalid input", logger)
DecodeJSONBody

解码 JSON 请求体(带验证)

var req ChatRequest
if err := DecodeJSONBody(w, r, &req, logger); err != nil {
    return // 错误已自动写入响应
}
ValidateContentType

验证 Content-Type 是否为 application/json

if !ValidateContentType(w, r, logger) {
    return // 错误已自动写入响应
}

🎨 最佳实践

1. Handler 结构

每个 handler 应该包含:

  • 依赖注入(logger, provider, registry 等)
  • 请求验证
  • 业务逻辑调用
  • 响应转换
  • 错误处理
func (h *ChatHandler) HandleCompletion(w http.ResponseWriter, r *http.Request) {
    // 1. 验证 Content-Type
    if !ValidateContentType(w, r, h.logger) {
        return
    }

    // 2. 解码请求
    var req api.ChatRequest
    if err := DecodeJSONBody(w, r, &req, h.logger); err != nil {
        return
    }

    // 3. 验证请求
    if err := h.validateChatRequest(&req); err != nil {
        WriteError(w, err, h.logger)
        return
    }

    // 4. 调用业务逻辑
    resp, err := h.provider.Completion(ctx, llmReq)
    if err != nil {
        h.handleProviderError(w, err)
        return
    }

    // 5. 返回响应
    WriteSuccess(w, resp)
}
2. 错误处理
  • 使用 types.Error 而不是 fmt.Errorf
  • 设置正确的 HTTP 状态码
  • 标记是否可重试
  • 记录详细日志
err := types.NewError(types.ErrInvalidRequest, "model is required").
    WithHTTPStatus(http.StatusBadRequest).
    WithRetryable(false)
WriteError(w, err, h.logger)
3. 日志记录
  • 使用结构化日志(zap)
  • 记录关键信息(请求 ID、耗时、Token 使用等)
  • 错误日志包含完整上下文
h.logger.Info("chat completion",
    zap.String("model", req.Model),
    zap.Int("tokens_used", resp.Usage.TotalTokens),
    zap.Duration("duration", duration),
)
4. 类型转换
  • API 类型 ↔ 内部类型转换应该在 handler 层完成
  • 使用专门的转换函数(如 convertToLLMRequest
  • 保持类型安全
func (h *ChatHandler) convertToLLMRequest(req *api.ChatRequest) *llm.ChatRequest {
    messages := make([]types.Message, len(req.Messages))
    for i, msg := range req.Messages {
        messages[i] = types.Message(msg)
    }
    return &llm.ChatRequest{
        Model:    req.Model,
        Messages: messages,
        // ...
    }
}

🔒 安全考虑

  1. 输入验证:所有输入都应该验证
  2. Content-Type 检查:防止 MIME 类型混淆攻击
  3. 未知字段拒绝DisallowUnknownFields() 防止参数污染
  4. 安全响应头X-Content-Type-Options: nosniff
  5. 错误信息脱敏:不暴露内部实现细节

📊 性能优化

  1. 响应包装器:使用 ResponseWriter 捕获状态码,避免重复写入
  2. 流式响应:大数据量使用 SSE 流式传输
  3. 上下文超时:所有请求都应该设置超时
  4. 连接复用:HTTP/2 支持

🧪 测试

每个 handler 都应该有对应的测试文件:

handlers/
├── common_test.go
├── health_test.go
├── chat_test.go
└── agent_test.go

测试应该覆盖:

  • 正常流程
  • 错误处理
  • 边界条件
  • 并发安全

📝 TODO

  • 添加单元测试
  • 添加集成测试
  • 添加 OpenAPI 文档生成
  • 添加请求限流
  • 添加请求追踪(OpenTelemetry)
  • 添加指标收集(Prometheus)

🔗 相关文档

Documentation

Overview

Package handlers 提供 AgentFlow HTTP API 的处理器实现。

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DecodeJSONBody

func DecodeJSONBody(w http.ResponseWriter, r *http.Request, dst any, logger *zap.Logger) error

DecodeJSONBody 解码 JSON 请求体

func ValidateContentType

func ValidateContentType(w http.ResponseWriter, r *http.Request, logger *zap.Logger) bool

ValidateContentType 验证 Content-Type

func WriteError

func WriteError(w http.ResponseWriter, err *types.Error, logger *zap.Logger)

WriteError 写入错误响应(从 types.Error)

func WriteErrorMessage

func WriteErrorMessage(w http.ResponseWriter, status int, code types.ErrorCode, message string, logger *zap.Logger)

WriteErrorMessage 写入简单错误消息

func WriteJSON

func WriteJSON(w http.ResponseWriter, status int, data any)

WriteJSON 写入 JSON 响应

func WriteSuccess

func WriteSuccess(w http.ResponseWriter, data any)

WriteSuccess 写入成功响应

Types

type AgentExecuteRequest

type AgentExecuteRequest struct {
	AgentID   string            `json:"agent_id" binding:"required"`
	Content   string            `json:"content" binding:"required"`
	Context   map[string]any    `json:"context,omitempty"`
	Variables map[string]string `json:"variables,omitempty"`
}

AgentExecuteRequest Agent execution request

type AgentExecuteResponse

type AgentExecuteResponse struct {
	TraceID      string         `json:"trace_id"`
	Content      string         `json:"content"`
	Metadata     map[string]any `json:"metadata,omitempty"`
	TokensUsed   int            `json:"tokens_used,omitempty"`
	Cost         float64        `json:"cost,omitempty"`
	Duration     string         `json:"duration"`
	FinishReason string         `json:"finish_reason,omitempty"`
}

AgentExecuteResponse Agent execution response

type AgentHandler

type AgentHandler struct {
	// contains filtered or unexported fields
}

AgentHandler Agent management handler

func NewAgentHandler

func NewAgentHandler(registry discovery.Registry, agentRegistry *agent.AgentRegistry, logger *zap.Logger) *AgentHandler

NewAgentHandler creates an Agent handler

func (*AgentHandler) HandleAgentHealth

func (h *AgentHandler) HandleAgentHealth(w http.ResponseWriter, r *http.Request)

HandleAgentHealth checks agent health status @Summary Agent health check @Description Check if an agent is healthy and ready @Tags agent @Produce json @Param id query string true "Agent ID" @Success 200 {object} Response{data=AgentHealthResponse} "Agent health" @Failure 404 {object} Response "Agent not found" @Failure 503 {object} Response "Agent not ready" @Security ApiKeyAuth @Router /v1/agents/health [get]

func (*AgentHandler) HandleExecuteAgent

func (h *AgentHandler) HandleExecuteAgent(w http.ResponseWriter, r *http.Request)

HandleExecuteAgent executes an agent @Summary Execute agent @Description Execute an agent with the given input @Tags agent @Accept json @Produce json @Param request body AgentExecuteRequest true "Execution request" @Success 200 {object} Response{data=AgentExecuteResponse} "Execution result" @Failure 400 {object} Response "Invalid request" @Failure 404 {object} Response "Agent not found" @Failure 500 {object} Response "Execution failed" @Security ApiKeyAuth @Router /v1/agents/execute [post]

func (*AgentHandler) HandleGetAgent

func (h *AgentHandler) HandleGetAgent(w http.ResponseWriter, r *http.Request)

HandleGetAgent gets a single agent's information @Summary Get agent @Description Get information about a specific agent @Tags agent @Produce json @Param id path string true "Agent ID" @Success 200 {object} Response{data=AgentInfo} "Agent info" @Failure 404 {object} Response "Agent not found" @Security ApiKeyAuth @Router /v1/agents/{id} [get]

func (*AgentHandler) HandleListAgents

func (h *AgentHandler) HandleListAgents(w http.ResponseWriter, r *http.Request)

HandleListAgents lists all registered agents @Summary List agents @Description Get a list of all registered agents @Tags agent @Produce json @Success 200 {object} Response{data=[]AgentInfo} "Agent list" @Failure 500 {object} Response "Internal error" @Security ApiKeyAuth @Router /v1/agents [get]

func (*AgentHandler) HandlePlanAgent

func (h *AgentHandler) HandlePlanAgent(w http.ResponseWriter, r *http.Request)

HandlePlanAgent plans agent execution @Summary Plan agent execution @Description Get an execution plan for an agent @Tags agent @Accept json @Produce json @Param request body AgentExecuteRequest true "Plan request" @Success 200 {object} Response{data=map[string]any} "Execution plan" @Failure 400 {object} Response "Invalid request" @Failure 404 {object} Response "Agent not found" @Failure 500 {object} Response "Plan failed" @Security ApiKeyAuth @Router /v1/agents/plan [post]

type AgentHealthResponse

type AgentHealthResponse struct {
	AgentID   string  `json:"agent_id"`
	Status    string  `json:"status"`
	Healthy   bool    `json:"healthy"`
	Endpoint  string  `json:"endpoint,omitempty"`
	Load      float64 `json:"load"`
	CheckedAt string  `json:"checked_at"`
}

AgentHealthResponse Agent health check response

type AgentInfo

type AgentInfo struct {
	ID          string          `json:"id"`
	Name        string          `json:"name"`
	Type        agent.AgentType `json:"type"`
	State       string          `json:"state"`
	Description string          `json:"description,omitempty"`
	Model       string          `json:"model,omitempty"`
	CreatedAt   string          `json:"created_at,omitempty"`
}

AgentInfo Agent information returned by the API

type ChatHandler

type ChatHandler struct {
	// contains filtered or unexported fields
}

ChatHandler 聊天接口处理器

func NewChatHandler

func NewChatHandler(provider llm.Provider, logger *zap.Logger) *ChatHandler

NewChatHandler 创建聊天处理器

func (*ChatHandler) HandleCompletion

func (h *ChatHandler) HandleCompletion(w http.ResponseWriter, r *http.Request)

HandleCompletion 处理聊天补全请求 @Summary 聊天完成 @Description 发送聊天完成请求 @Tags 聊天 @Accept json @Produce json @Param request body api.ChatRequest true "聊天请求" @Success 200 {object} api.ChatResponse "聊天响应" @Failure 400 {object} Response "无效请求" @Failure 500 {object} Response "内部错误" @Security ApiKeyAuth @Router /v1/chat/completions [post]

func (*ChatHandler) HandleStream

func (h *ChatHandler) HandleStream(w http.ResponseWriter, r *http.Request)

HandleStream 处理流式聊天请求 @Summary 流式聊天完成 @Description 发送流式聊天完成请求 @Tags 聊天 @Accept json @Produce text/event-stream @Param request body api.ChatRequest true "聊天请求" @Success 200 {string} string "SSE 流" @Failure 400 {object} Response "无效请求" @Failure 500 {object} Response "内部错误" @Security ApiKeyAuth @Router /v1/chat/completions/stream [post]

type CheckResult

type CheckResult struct {
	Status  string `json:"status"` // "pass", "fail"
	Message string `json:"message,omitempty"`
	Latency string `json:"latency,omitempty"`
}

CheckResult 单个检查结果

type DatabaseHealthCheck

type DatabaseHealthCheck struct {
	// contains filtered or unexported fields
}

DatabaseHealthCheck 数据库健康检查

func NewDatabaseHealthCheck

func NewDatabaseHealthCheck(name string, ping func(ctx context.Context) error) *DatabaseHealthCheck

NewDatabaseHealthCheck 创建数据库健康检查

func (*DatabaseHealthCheck) Check

func (c *DatabaseHealthCheck) Check(ctx context.Context) error

func (*DatabaseHealthCheck) Name

func (c *DatabaseHealthCheck) Name() string

type ErrorInfo

type ErrorInfo struct {
	Code       string `json:"code"`
	Message    string `json:"message"`
	Details    string `json:"details,omitempty"`
	Retryable  bool   `json:"retryable,omitempty"`
	HTTPStatus int    `json:"-"` // 不序列化到 JSON
}

ErrorInfo 错误信息结构

type HealthCheck

type HealthCheck interface {
	Name() string
	Check(ctx context.Context) error
}

HealthCheck 健康检查接口

type HealthHandler

type HealthHandler struct {
	// contains filtered or unexported fields
}

HealthHandler 健康检查处理器

func NewHealthHandler

func NewHealthHandler(logger *zap.Logger) *HealthHandler

NewHealthHandler 创建健康检查处理器

func (*HealthHandler) HandleHealth

func (h *HealthHandler) HandleHealth(w http.ResponseWriter, r *http.Request)

HandleHealth 处理 /health 请求(简单健康检查) @Summary 健康检查 @Description 简单的健康检查端点 @Tags 健康 @Produce json @Success 200 {object} ServiceHealthResponse "服务正常" @Failure 503 {object} ServiceHealthResponse "服务不健康" @Router /health [get]

func (*HealthHandler) HandleHealthz

func (h *HealthHandler) HandleHealthz(w http.ResponseWriter, r *http.Request)

HandleHealthz 处理 /healthz 请求(Kubernetes 风格) @Summary Kubernetes 活跃度探针 @Description Kubernetes 的活跃度探针 @Tags 健康 @Produce json @Success 200 {object} ServiceHealthResponse "服务处于活动状态" @Router /healthz [get]

func (*HealthHandler) HandleReady

func (h *HealthHandler) HandleReady(w http.ResponseWriter, r *http.Request)

HandleReady 处理 /ready 或 /readyz 请求(就绪检查) @Summary 准备情况检查 @Description 检查服务是否准备好接受流量 @Tags 健康 @Produce json @Success 200 {object} ServiceHealthResponse "服务已准备就绪" @Failure 503 {object} ServiceHealthResponse "服务尚未准备好" @Router /ready [get]

func (*HealthHandler) HandleVersion

func (h *HealthHandler) HandleVersion(version, buildTime, gitCommit string) http.HandlerFunc

HandleVersion 处理 /version 请求 @Summary 版本信息 @Description 返回版本信息 @Tags 健康 @Produce json @Success 200 {object} map[string]string "版本信息" @Router /version [get]

func (*HealthHandler) RegisterCheck

func (h *HealthHandler) RegisterCheck(check HealthCheck)

RegisterCheck 注册健康检查

type RedisHealthCheck

type RedisHealthCheck struct {
	// contains filtered or unexported fields
}

RedisHealthCheck Redis 健康检查

func NewRedisHealthCheck

func NewRedisHealthCheck(name string, ping func(ctx context.Context) error) *RedisHealthCheck

NewRedisHealthCheck 创建 Redis 健康检查

func (*RedisHealthCheck) Check

func (c *RedisHealthCheck) Check(ctx context.Context) error

func (*RedisHealthCheck) Name

func (c *RedisHealthCheck) Name() string

type Response

type Response struct {
	Success   bool       `json:"success"`
	Data      any        `json:"data,omitempty"`
	Error     *ErrorInfo `json:"error,omitempty"`
	Timestamp time.Time  `json:"timestamp"`
	RequestID string     `json:"request_id,omitempty"`
}

Response 统一 API 响应结构

type ResponseWriter

type ResponseWriter struct {
	http.ResponseWriter
	StatusCode int
	Written    bool
}

ResponseWriter 包装 http.ResponseWriter 以捕获状态码

func NewResponseWriter

func NewResponseWriter(w http.ResponseWriter) *ResponseWriter

NewResponseWriter 创建新的 ResponseWriter

func (*ResponseWriter) Write

func (rw *ResponseWriter) Write(b []byte) (int, error)

Write 重写 Write 以标记已写入

func (*ResponseWriter) WriteHeader

func (rw *ResponseWriter) WriteHeader(code int)

WriteHeader 重写 WriteHeader 以捕获状态码

type ServiceHealthResponse

type ServiceHealthResponse struct {
	Status    string                 `json:"status"` // "healthy", "degraded", "unhealthy"
	Timestamp time.Time              `json:"timestamp"`
	Version   string                 `json:"version,omitempty"`
	Checks    map[string]CheckResult `json:"checks,omitempty"`
}

ServiceHealthResponse 服务级别的健康状态响应(HTTP 健康端点 DTO)。 注意:这是 HTTP 健康检查端点的响应类型,与 llm.HealthStatus(Provider 级别) 和 agent.HealthStatus(Agent 级别)是不同概念。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL