Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrBatchClosed 表示批处理器已关闭 ErrBatchClosed = errors.New("batch processor is closed") // ErrRequestTimeout 表示请求等待批处理超时 ErrRequestTimeout = errors.New("request timeout waiting for batch processing") // ErrBatchSizeLimitExceeded 表示请求太大,超过批处理大小限制 ErrBatchSizeLimitExceeded = errors.New("request size exceeds batch size limit") // ErrContextCanceled 表示请求上下文被取消 ErrContextCanceled = errors.New("request context was canceled") )
Functions ¶
This section is empty.
Types ¶
type AdaptiveBatchOptions ¶
type AdaptiveBatchOptions struct {
// 最大批处理大小(条目数)
MaxBatchSize int
// 触发处理的最小批处理大小
MinBatchSize int
// 等待形成批处理的最大时间
MaxLatency time.Duration
// 单个请求的最大大小 (0表示无限制)
MaxRequestSize int
// 允许的最大并发批处理数
MaxConcurrentBatches int
// 负载基础批处理大小调整
// 高负载时的批处理大小倍率 (0-1,低于1表示减小批处理大小)
HighLoadFactor float64
// 低负载时的批处理大小倍率 (>1,大于1表示增加批处理大小)
LowLoadFactor float64
// 负载判断阈值 (0-1),超过此值认为是高负载
HighLoadThreshold float64
// 负载判断阈值 (0-1),低于此值认为是低负载
LowLoadThreshold float64
// 请求大小估算函数,若为nil则所有请求视为相同大小
RequestSizeFunc func(interface{}) int
// 请求合并函数,将多个请求合并为一个批处理请求
BatchRequestFunc func([]interface{}) interface{}
// 响应拆分函数,将批处理响应拆分为多个单独响应
SplitResponseFunc func(interface{}, int) []interface{}
// 错误处理函数,处理批处理过程中的错误
ErrorHandler func(error) error
// 统计收集间隔
StatsInterval time.Duration
}
AdaptiveBatchOptions 自适应批处理器配置
func DefaultAdaptiveBatchOptions ¶
func DefaultAdaptiveBatchOptions() AdaptiveBatchOptions
DefaultAdaptiveBatchOptions 返回默认的自适应批处理配置
type AdaptiveBatcher ¶
type AdaptiveBatcher struct {
// contains filtered or unexported fields
}
AdaptiveBatcher 自适应批处理器
func NewAdaptiveBatcher ¶
func NewAdaptiveBatcher(processor BatchProcessor, opts AdaptiveBatchOptions) *AdaptiveBatcher
NewAdaptiveBatcher 创建新的自适应批处理器
func NewMultiProcessorBatcher ¶
func NewMultiProcessorBatcher(processor MultiProcessor, opts AdaptiveBatchOptions) *AdaptiveBatcher
NewMultiProcessorBatcher 创建处理多个独立请求的批处理器
func (*AdaptiveBatcher) GetStats ¶
func (b *AdaptiveBatcher) GetStats() BatchStats
GetStats 获取当前统计信息
type BatchProcessor ¶
BatchProcessor 批处理执行器接口
type BatchStats ¶
type BatchStats struct {
// 总请求数
TotalRequests int64
// 总批次数
TotalBatches int64
// 平均批处理大小
AvgBatchSize float64
// 平均请求等待时间
AvgWaitTimeMs float64
// 平均处理时间
AvgProcessTimeMs float64
// 当前批处理大小
CurrentBatchSize int
// 当前等待请求数
CurrentWaitingRequests int
// 当前处理中的批次数
CurrentActiveBatches int
// 负载系数(0-1)
LoadFactor float64
// 批处理率(每秒批次数)
BatchRate float64
// 请求率(每秒请求数)
RequestRate float64
}
BatchStats 批处理统计信息
type MultiProcessor ¶
type MultiProcessor interface {
// ProcessMulti 处理多个独立的请求,返回对应的响应和错误
ProcessMulti(ctx context.Context, requests []interface{}) ([]interface{}, []error)
}
MultiProcessor 用于并行处理批请求,适用于请求之间没有依赖关系的场景
type MultiProcessorFunc ¶
MultiProcessorFunc 将函数转换为MultiProcessor接口
func (MultiProcessorFunc) ProcessMulti ¶
func (f MultiProcessorFunc) ProcessMulti(ctx context.Context, requests []interface{}) ([]interface{}, []error)
ProcessMulti 实现MultiProcessor接口
type Options ¶
type Options struct {
// 批次大小,达到此数量时将触发批处理
BatchSize int
// 最大等待时间,即使未达到BatchSize,经过此时间也会触发处理
MaxLatency time.Duration
// 工作器数量,即处理批次的并发数
Workers int
// 是否阻塞等待批处理完成
WaitForBatch bool
// 批处理队列的缓冲大小
QueueSize int
// 单个请求的默认超时时间
DefaultTimeout time.Duration
}
Options 配置批处理器参数
type Processor ¶
Processor 是一个通用的批量处理器
type RequestItem ¶
type RequestItem struct {
Request interface{}
Size int
Response interface{}
Error error
Done chan struct{}
// contains filtered or unexported fields
}
RequestItem 表示批处理中的单个请求项
Click to show internal directories.
Click to hide internal directories.