batch

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2025 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrBatchClosed 表示批处理器已关闭
	ErrBatchClosed = errors.New("batch processor is closed")
	// ErrRequestTimeout 表示请求等待批处理超时
	ErrRequestTimeout = errors.New("request timeout waiting for batch processing")
	// ErrBatchSizeLimitExceeded 表示请求太大,超过批处理大小限制
	ErrBatchSizeLimitExceeded = errors.New("request size exceeds batch size limit")
	// ErrContextCanceled 表示请求上下文被取消
	ErrContextCanceled = errors.New("request context was canceled")
)

Functions

This section is empty.

Types

type AdaptiveBatchOptions

type AdaptiveBatchOptions struct {
	// 最大批处理大小(条目数)
	MaxBatchSize int
	// 触发处理的最小批处理大小
	MinBatchSize int
	// 等待形成批处理的最大时间
	MaxLatency time.Duration
	// 单个请求的最大大小 (0表示无限制)
	MaxRequestSize int
	// 允许的最大并发批处理数
	MaxConcurrentBatches int
	// 负载基础批处理大小调整
	// 高负载时的批处理大小倍率 (0-1,低于1表示减小批处理大小)
	HighLoadFactor float64
	// 低负载时的批处理大小倍率 (>1,大于1表示增加批处理大小)
	LowLoadFactor float64
	// 负载判断阈值 (0-1),超过此值认为是高负载
	HighLoadThreshold float64
	// 负载判断阈值 (0-1),低于此值认为是低负载
	LowLoadThreshold float64
	// 请求大小估算函数,若为nil则所有请求视为相同大小
	RequestSizeFunc func(interface{}) int
	// 请求合并函数,将多个请求合并为一个批处理请求
	BatchRequestFunc func([]interface{}) interface{}
	// 响应拆分函数,将批处理响应拆分为多个单独响应
	SplitResponseFunc func(interface{}, int) []interface{}
	// 错误处理函数,处理批处理过程中的错误
	ErrorHandler func(error) error
	// 统计收集间隔
	StatsInterval time.Duration
}

AdaptiveBatchOptions 自适应批处理器配置

func DefaultAdaptiveBatchOptions

func DefaultAdaptiveBatchOptions() AdaptiveBatchOptions

DefaultAdaptiveBatchOptions 返回默认的自适应批处理配置

type AdaptiveBatcher

type AdaptiveBatcher struct {
	// contains filtered or unexported fields
}

AdaptiveBatcher 自适应批处理器

func NewAdaptiveBatcher

func NewAdaptiveBatcher(processor BatchProcessor, opts AdaptiveBatchOptions) *AdaptiveBatcher

NewAdaptiveBatcher 创建新的自适应批处理器

func NewMultiProcessorBatcher

func NewMultiProcessorBatcher(processor MultiProcessor, opts AdaptiveBatchOptions) *AdaptiveBatcher

NewMultiProcessorBatcher 创建处理多个独立请求的批处理器

func (*AdaptiveBatcher) Close

func (b *AdaptiveBatcher) Close()

Close 关闭批处理器

func (*AdaptiveBatcher) Flush

func (b *AdaptiveBatcher) Flush()

Flush 立即处理所有待处理的请求

func (*AdaptiveBatcher) GetStats

func (b *AdaptiveBatcher) GetStats() BatchStats

GetStats 获取当前统计信息

func (*AdaptiveBatcher) Process

func (b *AdaptiveBatcher) Process(ctx context.Context, request interface{}) (interface{}, error)

Process 添加一个请求到批处理

type BatchProcessor

type BatchProcessor func(context.Context, interface{}) (interface{}, error)

BatchProcessor 批处理执行器接口

type BatchStats

type BatchStats struct {
	// 总请求数
	TotalRequests int64
	// 总批次数
	TotalBatches int64
	// 平均批处理大小
	AvgBatchSize float64
	// 平均请求等待时间
	AvgWaitTimeMs float64
	// 平均处理时间
	AvgProcessTimeMs float64
	// 当前批处理大小
	CurrentBatchSize int
	// 当前等待请求数
	CurrentWaitingRequests int
	// 当前处理中的批次数
	CurrentActiveBatches int
	// 负载系数(0-1)
	LoadFactor float64
	// 批处理率(每秒批次数)
	BatchRate float64
	// 请求率(每秒请求数)
	RequestRate float64
}

BatchStats 批处理统计信息

type MultiProcessor

type MultiProcessor interface {
	// ProcessMulti 处理多个独立的请求,返回对应的响应和错误
	ProcessMulti(ctx context.Context, requests []interface{}) ([]interface{}, []error)
}

MultiProcessor 用于并行处理批请求,适用于请求之间没有依赖关系的场景

type MultiProcessorFunc

type MultiProcessorFunc func(context.Context, []interface{}) ([]interface{}, []error)

MultiProcessorFunc 将函数转换为MultiProcessor接口

func (MultiProcessorFunc) ProcessMulti

func (f MultiProcessorFunc) ProcessMulti(ctx context.Context, requests []interface{}) ([]interface{}, []error)

ProcessMulti 实现MultiProcessor接口

type Options

type Options struct {
	// 批次大小,达到此数量时将触发批处理
	BatchSize int
	// 最大等待时间,即使未达到BatchSize,经过此时间也会触发处理
	MaxLatency time.Duration
	// 工作器数量,即处理批次的并发数
	Workers int
	// 是否阻塞等待批处理完成
	WaitForBatch bool
	// 批处理队列的缓冲大小
	QueueSize int
	// 单个请求的默认超时时间
	DefaultTimeout time.Duration
}

Options 配置批处理器参数

func DefaultOptions

func DefaultOptions() Options

DefaultOptions 返回默认的批处理器配置

type Processor

type Processor[T any, R any] struct {
	// contains filtered or unexported fields
}

Processor 是一个通用的批量处理器

func NewProcessor

func NewProcessor[T any, R any](
	process func(context.Context, []T) ([]R, []error),
	opts Options,
) *Processor[T, R]

NewProcessor 创建一个新的批处理器

func (*Processor[T, R]) Close

func (p *Processor[T, R]) Close() error

Close 关闭批处理器

func (*Processor[T, R]) Process

func (p *Processor[T, R]) Process(ctx context.Context, item T) (R, error)

Process 处理单个请求,如果达到批次大小会立即处理

type RequestItem

type RequestItem struct {
	Request  interface{}
	Size     int
	Response interface{}
	Error    error
	Done     chan struct{}
	// contains filtered or unexported fields
}

RequestItem 表示批处理中的单个请求项

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL