Documentation
¶
Overview ¶
Package mb - queue with message batching feature
Example ¶
var ctx = context.Background()
// bufSize - whole capacity of batcher
var bufSize = 100
// create the new batcher
batcher := mb.New[Item](bufSize)
// start goroutine that will wait items
// it can be a lot of the wait goroutines
var done = make(chan struct{})
go func() {
defer close(done)
for {
// wait items
items, err := batcher.Wait(context.Background())
if err != nil {
fmt.Printf("waiter received error: %v; stop goroutine\n", err)
return
}
// insert batch to db
// while this func works, the batcher collects new item
BatchInsert(items)
}
}()
// add two items to batcher
batcher.Add(ctx, Item{Id: 1}, Item{Id: 2})
time.Sleep(time.Millisecond)
// add more items to batcher
for i := 0; i < 10; i++ {
// it's safe to call Add from other goroutines
batcher.Add(ctx, Item{Id: i + 3})
}
// close batcher
batcher.Close()
// and wait until inserter exits
<-done
Output: inserted 2 items inserted 10 items waiter received error: mb: MB closed; stop goroutine
Example (WithTimeLimit) ¶
var ctx = context.Background()
// bufSize - whole capacity of batcher
var bufSize = 100
// create the new batcher
batcher := mb.New[Item](bufSize)
// start goroutine that will wait items
// it can be a lot of the wait goroutines
var done = make(chan struct{})
go func() {
defer close(done)
ctxWithTimeLimit := mb.CtxWithTimeLimit(ctx, time.Millisecond*200)
cond := batcher.NewCond().WithMin(10).WithMax(15)
for {
// get at least 10 items or after 200 ms get at least 1 item
items, err := cond.Wait(ctxWithTimeLimit)
if err != nil {
fmt.Printf("waiter received error: %v; stop goroutine\n", err)
return
}
// insert batch to db
// while this func works, the batcher collects new item
BatchInsert(items)
}
}()
// add two items to batcher
batcher.Add(ctx, Item{Id: 1}, Item{Id: 2})
time.Sleep(time.Millisecond * 300)
// add more items to batcher
for i := 0; i < 20; i++ {
// it's safe to call Add from other goroutines
batcher.Add(ctx, Item{Id: i + 3})
}
time.Sleep(time.Second)
// close batcher
batcher.Close()
// and wait until inserter exits
<-done
Output: inserted 2 items inserted 15 items inserted 5 items waiter received error: mb: MB closed; stop goroutine
Index ¶
- Variables
- func CtxWithTimeLimit(ctx context.Context, timeLimit time.Duration) context.Context
- type MB
- func (mb *MB[T]) Add(ctx context.Context, msgs ...T) (err error)
- func (mb *MB[T]) Close() (err error)
- func (mb *MB[T]) GetAll() (msgs []T)
- func (mb *MB[T]) Len() (l int)
- func (mb *MB[T]) NewCond() WaitCond[T]
- func (mb *MB[T]) Pause()
- func (mb *MB[T]) Resume()
- func (mb *MB[T]) Stats() (addCount, addMsgsCount, getCount, getMsgsCount int64)
- func (mb *MB[T]) TryAdd(msgs ...T) (err error)
- func (mb *MB[T]) Wait(ctx context.Context) (msgs []T, err error)
- func (mb *MB[T]) WaitCond(ctx context.Context, cond WaitCond[T]) (msgs []T, err error)
- func (mb *MB[T]) WaitOne(ctx context.Context) (msg T, err error)
- type WaitCond
- func (wc WaitCond[T]) Wait(ctx context.Context) (msgs []T, err error)
- func (wc WaitCond[T]) WaitOne(ctx context.Context) (msg T, err error)
- func (wc WaitCond[T]) WithFilter(f func(v T) bool) WaitCond[T]
- func (wc WaitCond[T]) WithMax(max int) WaitCond[T]
- func (wc WaitCond[T]) WithMin(min int) WaitCond[T]
- func (wc WaitCond[T]) WithPriority(priority float64) WaitCond[T]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("mb: MB closed")
ErrClosed is returned when you add message to closed queue
var ErrOverflowed = errors.New("mb: overflowed")
ErrOverflowed means new messages can't be added until there is free space in the queue
var ErrTooManyMessages = errors.New("mb: too many messages")
ErrTooManyMessages means that adding more messages (at one call) than the limit
Functions ¶
func CtxWithTimeLimit ¶
CtxWithTimeLimit makes child context with given timeLimit duration This context can be passed to all Wait methods. When a given min param can't be achieved within a time limit then a min param will be reseted
Types ¶
type MB ¶
type MB[T any] struct { // contains filtered or unexported fields }
MB - message batching object
func (*MB[T]) Add ¶
Add - adds new messages to queue. When queue is closed - returning ErrClosed When count messages bigger then queue size - returning ErrTooManyMessages When the queue is full - wait until will free place
func (*MB[T]) Close ¶
Close closes the queue All added messages will be available for active Wait When queue is paused, messages do not be released for Wait (use GetAll for fetching them)
func (*MB[T]) GetAll ¶
func (mb *MB[T]) GetAll() (msgs []T)
GetAll return all messages and flush queue Works on closed queue
func (*MB[T]) Stats ¶
Stats returning current statistic of queue usage addCount - count of calls Add addMsgsCount - count of added messages getCount - count of calls Wait getMsgsCount - count of issued messages
func (*MB[T]) TryAdd ¶
TryAdd - adds new messages to queue. When queue is closed - returning ErrClosed When count messages bigger then queue size - returning ErrTooManyMessages When the queue is full - returning ErrOverflowed
type WaitCond ¶
type WaitCond[T any] struct { Priority float64 Min int Max int Filter func(v T) bool // contains filtered or unexported fields }
WaitCond describes condition for messages
func (WaitCond[T]) WithFilter ¶
WithFilter adds filter to conditions filter function should return true for acceptable message and false for unacceptable
func (WaitCond[T]) WithPriority ¶
WithPriority adds priority to conditions