batchqueue

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2022 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxMessageSize limit message size for transfer.
	MaxMessageSize = 5 * 1024 * 1024
	// MaxBatchSize will be the largest size for a batch sent from this particular producer.
	// This is used as a baseline to allocate a new buffer that can hold the entire batch
	// without needing costly re-allocations.
	MaxBatchSize = 128 * 1024
	// DefaultMaxMessagesPerBatch init default num of entries in per batch.
	DefaultMaxMessagesPerBatch = 1000
)

Variables

View Source
var (
	DefaultSequenceID = uint64(0)
)

Functions

func GetAndAdd

func GetAndAdd(n *uint64, diff uint64) uint64

GetAndAdd perform atomic read and update.

Types

type BatchBuilder

type BatchBuilder struct {
	// contains filtered or unexported fields
}

BatchBuilder wraps the objects needed to build a batch.

func NewBatchBuilder

func NewBatchBuilder(maxMessages uint) *BatchBuilder

NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.

func (*BatchBuilder) Add

func (bb *BatchBuilder) Add(payload interface{}) (isFull bool)

Add will add single message to batch.

func (*BatchBuilder) Flush

func (bb *BatchBuilder) Flush() (batchData []interface{}, sequenceID uint64)

Flush all the messages buffered in the client and wait until all messages have been successfully persisted.

func (*BatchBuilder) IsFull

func (bb *BatchBuilder) IsFull() bool

IsFull check if the size in the current batch exceeds the maximum size allowed by the batch.

type BatchSink

type BatchSink interface {
	Send(ctx context.Context, msg interface{}) error
	Flush(ctx context.Context) error
	Close()
}

func NewBatchSink

func NewBatchSink(ctx context.Context, conf *Config) (BatchSink, error)

type BlockingQueue

type BlockingQueue interface {
	// Put enqueue one item, block if the queue is full.
	Put(item interface{})

	// Take dequeue one item, block until it's available.
	Take() interface{}

	// Poll dequeue one item, return nil if queue is empty.
	Poll() interface{}

	// Peek return the first item without dequeing, return nil if queue is empty.
	Peek() interface{}

	// PeekLast return last item in queue without dequeing, return nil if queue is empty.
	PeekLast() interface{}

	// Size return the current size of the queue.
	Size() int

	// Iterator return an iterator for the queue.
	Iterator() BlockingQueueIterator
}

BlockingQueue is a interface of block queue.

func NewBlockingQueue

func NewBlockingQueue(maxSize int) BlockingQueue

NewBlockingQueue init block queue and returns a BlockingQueue.

type BlockingQueueIterator

type BlockingQueueIterator interface {
	HasNext() bool
	Next() interface{}
}

BlockingQueueIterator abstract a interface of block queue iterator.

type CallbackFn

type CallbackFn func(sequenceID uint64, e error)

type Config

type Config struct {
	Name     string
	DoSinkFn ProcessFn
	// BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000)
	MaxBatching int
	// MaxPendingMessages set the max size of the queue.
	MaxPendingMessages uint
	// BatchingMaxFlushDelay set the time period within which the messages sent will be batched (default: 10ms)
	BatchingMaxFlushDelay time.Duration
}

func (*Config) GetBatchingMaxFlushDelay

func (c *Config) GetBatchingMaxFlushDelay() time.Duration

func (*Config) GetMaxBatching

func (c *Config) GetMaxBatching() uint

func (*Config) GetMaxPendingMessages

func (c *Config) GetMaxPendingMessages() int

type ProcessFn

type ProcessFn func(msgs []interface{}) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL