Documentation
¶
Index ¶
- Variables
- func BatchConsume(ctx context.Context, r BatchReader, w BatchWriter, opts *BatchConsumeOptions) error
- type BatchConsumeOptions
- type BatchReader
- type BatchWriter
- type BufferedWriter
- type Comparer
- type ConsumeStatistics
- type Counter
- type Data
- type EmptyIterator
- func (it EmptyIterator) Close() error
- func (it EmptyIterator) Error() error
- func (it EmptyIterator) First() bool
- func (it EmptyIterator) Last() bool
- func (it EmptyIterator) Next() bool
- func (it EmptyIterator) Prev() bool
- func (it EmptyIterator) Total() (int64, error)
- func (it EmptyIterator) Value() interface{}
- type Flusher
- type Int64Comparer
- type Iterator
- type ListIterator
- func (it *ListIterator) Close() error
- func (it *ListIterator) Error() error
- func (it *ListIterator) First() bool
- func (it *ListIterator) Last() bool
- func (it *ListIterator) Next() bool
- func (it *ListIterator) Prev() bool
- func (it *ListIterator) Total() (int64, error)
- func (it *ListIterator) Value() Data
- type Matcher
- type MultiplierBackoff
- type NopWriter
- type StdoutWriter
- type TimerBackoff
- type Writer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrInvalidData . ErrInvalidData = errors.New("invalid data") // ErrIteratorClosed . ErrIteratorClosed = errors.New("iterator closed") // ErrWriterClosed . ErrWriterClosed = errors.New("writer closed") // ErrOpNotSupported . ErrOpNotSupported = errors.New("operation not supported") )
View Source
var DefaultLatencyBuckets = []float64{1, 50, 100, 250, 500, 750, 1000, 5000, 10000}
Milliseconds
View Source
var DefaultNopWriter = NopWriter{}
DefaultNopWriter .
View Source
var DefaultStdoutWriter = StdoutWriter{ Filter: func(val Data) bool { return true }, }
DefaultStdoutWriter .
View Source
var ErrExitConsume = errors.New("exit consume")
ErrExitConsume .
Functions ¶
func BatchConsume ¶
func BatchConsume(ctx context.Context, r BatchReader, w BatchWriter, opts *BatchConsumeOptions) error
BatchConsume .
Types ¶
type BatchConsumeOptions ¶
type BatchConsumeOptions struct {
BufferSize int
ReadTimeout time.Duration
// ReadErrorHandler return err to exit, return nil to continue
ReadErrorHandler func(err error) error
// WriteErrorHandler return err to retry write, return nil to continue
WriteErrorHandler func(list []Data, err error) error
// ConfirmErrorHandler return err to exit, return nil to continue
ConfirmErrorHandler func(err error) error
Backoff TimerBackoff
Statistics ConsumeStatistics
}
BatchConsumeOptions .
type BatchReader ¶
type BatchReader interface {
ReadN(buf []Data, timeout time.Duration) (int, error)
Confirm() error
Close() error
}
BatchReader .
func NewMockBatchReader ¶
func NewMockBatchReader(ctx context.Context, interval time.Duration, batchSize int, creator func() interface{}) BatchReader
NewMockBatchReader .
type BatchWriter ¶
BatchWriter .
type BufferedWriter ¶
type BufferedWriter struct {
// contains filtered or unexported fields
}
BufferedWriter .
func NewBufferedWriter ¶
func NewBufferedWriter(w BatchWriter, capacity int) *BufferedWriter
NewBufferedWriter .
type ConsumeStatistics ¶
type ConsumeStatistics interface {
ReadError(err error)
WriteError(data []Data, err error)
ConfirmError(data []Data, err error)
Success(data []Data)
ObserveReadLatency(start time.Time)
ObserveWriteLatency(start time.Time)
}
ConsumeStatistics .
var NopConsumeStatistics ConsumeStatistics = &nopConsumeStatistics{}
type Iterator ¶
type Iterator interface {
First() bool
Last() bool
Next() bool
Prev() bool
Value() Data
Error() error
Close() error
}
Iterator .
func MergedHeadOverlappedIterator ¶
MergedHeadOverlappedIterator .
func OrderedIterator ¶ added in v1.5.0
OrderedIterator .
type ListIterator ¶
type ListIterator struct {
// contains filtered or unexported fields
}
ListIterator .
type MultiplierBackoff ¶
type MultiplierBackoff struct {
Base time.Duration
Max time.Duration
Duration time.Duration
Factor float64
}
MultiplierBackoff .
func (*MultiplierBackoff) Reset ¶
func (b *MultiplierBackoff) Reset()
func (*MultiplierBackoff) Wait ¶
func (b *MultiplierBackoff) Wait() <-chan time.Time
type StdoutWriter ¶
StdoutWriter .
func (StdoutWriter) Close ¶
func (w StdoutWriter) Close() error
func (StdoutWriter) Write ¶
func (w StdoutWriter) Write(val Data) error
Source Files
¶
Click to show internal directories.
Click to hide internal directories.