Documentation
¶
Overview ¶
包 batch 提供 LLM 请求的批处理与调度能力,通过将多个独立请求 合并为批次统一发送,降低网络开销并提升吞吐量。
概述 ¶
在高并发场景下,逐条发送 LLM 请求会产生大量网络往返开销。 本包通过 BatchProcessor 将短时间内到达的请求自动聚合为批次, 由后台 Worker 池统一处理,从而显著提升整体吞吐效率。
核心接口 ¶
- BatchHandler:批量请求处理回调,接收一组 Request 并返回对应 Response。
- BatchProcessor:核心批处理器,管理请求队列、Worker 池与批次调度。
- BatchConfig:配置批大小上限、等待时间、队列容量与 Worker 数量。
主要能力 ¶
- 自动聚合:按 MaxBatchSize 或 MaxWaitTime 触发批次提交。
- 异步提交:Submit 返回 channel,调用方可非阻塞等待结果。
- 同步提交:SubmitSync 提供阻塞式调用便捷方法。
- 多 Worker 并行:支持配置多个 Worker 并发消费队列。
- 运行统计:通过 Stats 获取提交数、完成数、失败数与批次效率。
使用方式 ¶
cfg := batch.DefaultBatchConfig()
bp := batch.NewBatchProcessor(cfg, func(ctx context.Context, reqs []*batch.Request) []*batch.Response {
// 调用下游 LLM 批量接口
return responses
})
defer bp.Close()
resp, err := bp.SubmitSync(ctx, &batch.Request{ID: "1", Model: "gpt-4"})
Index ¶
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type BatchConfig ¶
type BatchConfig struct {
MaxBatchSize int `json:"max_batch_size"`
MaxWaitTime time.Duration `json:"max_wait_time"`
QueueSize int `json:"queue_size"`
Workers int `json:"workers"`
RetryOnFailure bool `json:"retry_on_failure"`
}
BatchConfig 配置批处理器。
type BatchHandler ¶
BatchHandler处理一批请求.
type BatchProcessor ¶
type BatchProcessor struct {
// contains filtered or unexported fields
}
批量处理器为高效处理而批出多个LLM请求.
func NewBatchProcessor ¶
func NewBatchProcessor(config BatchConfig, handler BatchHandler) *BatchProcessor
NewBatchProcessor创建了新的分批处理器.
func (*BatchProcessor) Submit ¶
func (bp *BatchProcessor) Submit(ctx context.Context, req *Request) <-chan *Response
提交请求并返回响应的通道 。
func (*BatchProcessor) SubmitSync ¶
SontentSync 提交请求并等待回复.
type BatchStats ¶
type BatchStats struct {
Submitted int64 `json:"submitted"`
Batched int64 `json:"batched"`
Completed int64 `json:"completed"`
Failed int64 `json:"failed"`
Queued int `json:"queued"`
}
BatchStats包含处理器统计.
Click to show internal directories.
Click to hide internal directories.