Documentation
¶
Index ¶
- type Aggregator
- func (agt *Aggregator) Enqueue(item interface{}) error
- func (agt *Aggregator) EnqueueWithRetry(item interface{}, maxRetries int, backoff time.Duration) bool
- func (agt *Aggregator) SafeStop()
- func (agt *Aggregator) Start()
- func (agt *Aggregator) Stop()
- func (agt *Aggregator) TryEnqueue(item interface{}) bool
- type AggregatorOption
- type BatchProcessFunc
- type ErrorHandlerFunc
- type SetAggregatorOptionFunc
- func WithBatchSize(batchSize int) SetAggregatorOptionFunc
- func WithChannelBufferSize(size int) SetAggregatorOptionFunc
- func WithErrorHandler(handler ErrorHandlerFunc) SetAggregatorOptionFunc
- func WithLingerTime(duration time.Duration) SetAggregatorOptionFunc
- func WithLogger(logger *log.Logger) SetAggregatorOptionFunc
- func WithWorkers(workers int) SetAggregatorOptionFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
Aggregator 聚合器结构体
func NewAggregator ¶
func NewAggregator(batchProcessor BatchProcessFunc, optionFuncs ...SetAggregatorOptionFunc) (*Aggregator, error)
NewAggregator 创建新的聚合器实例
func (*Aggregator) Enqueue ¶
func (agt *Aggregator) Enqueue(item interface{}) error
Enqueue 入队一个项目,会阻塞直到有空间
func (*Aggregator) EnqueueWithRetry ¶
func (agt *Aggregator) EnqueueWithRetry(item interface{}, maxRetries int, backoff time.Duration) bool
EnqueueWithRetry 入队一个项目,会重试直到成功或者达到最大重试次数
func (*Aggregator) TryEnqueue ¶
func (agt *Aggregator) TryEnqueue(item interface{}) bool
TryEnqueue 尝试入队一个项目,非阻塞
type AggregatorOption ¶
type AggregatorOption struct {
BatchSize int
Workers int
ChannelBufferSize int
LingerTime time.Duration
ErrorHandler ErrorHandlerFunc
Logger *log.Logger
}
AggregatorOption 聚合器选项
type ErrorHandlerFunc ¶
type ErrorHandlerFunc func(err error, items []interface{}, batchProcessFunc BatchProcessFunc, aggregator *Aggregator)
ErrorHandlerFunc 错误处理函数类型
type SetAggregatorOptionFunc ¶
type SetAggregatorOptionFunc func(option *AggregatorOption)
SetAggregatorOptionFunc 聚合器选项设置函数类型
func WithChannelBufferSize ¶
func WithChannelBufferSize(size int) SetAggregatorOptionFunc
func WithErrorHandler ¶
func WithErrorHandler(handler ErrorHandlerFunc) SetAggregatorOptionFunc
func WithLingerTime ¶
func WithLingerTime(duration time.Duration) SetAggregatorOptionFunc
func WithLogger ¶
func WithLogger(logger *log.Logger) SetAggregatorOptionFunc
func WithWorkers ¶
func WithWorkers(workers int) SetAggregatorOptionFunc
Click to show internal directories.
Click to hide internal directories.