Versions in this module Expand all Collapse all v0 v0.4.1 Mar 7, 2022 Changes in this version + const DefaultMaxMessagesPerBatch + const MaxBatchSize + const MaxMessageSize + var DefaultSequenceID = uint64(0) + func GetAndAdd(n *uint64, diff uint64) uint64 + type BatchBuilder struct + func NewBatchBuilder(maxMessages uint) *BatchBuilder + func (bb *BatchBuilder) Add(payload interface{}) (isFull bool) + func (bb *BatchBuilder) Flush() (batchData []interface{}, sequenceID uint64) + func (bb *BatchBuilder) IsFull() bool + type BatchSink interface + Close func() + Flush func(ctx context.Context) error + Send func(ctx context.Context, msg interface{}) error + func NewBatchSink(ctx context.Context, conf *Config) (BatchSink, error) + type BlockingQueue interface + Iterator func() BlockingQueueIterator + Peek func() interface{} + PeekLast func() interface{} + Poll func() interface{} + Put func(item interface{}) + Size func() int + Take func() interface{} + func NewBlockingQueue(maxSize int) BlockingQueue + type BlockingQueueIterator interface + HasNext func() bool + Next func() interface{} + type CallbackFn func(sequenceID uint64, e error) + type Config struct + BatchingMaxFlushDelay time.Duration + DoSinkFn ProcessFn + MaxBatching int + MaxPendingMessages uint + Name string + func (c *Config) GetBatchingMaxFlushDelay() time.Duration + func (c *Config) GetMaxBatching() uint + func (c *Config) GetMaxPendingMessages() int + type ProcessFn func(msgs []interface{}) (err error)