Documentation
¶
Overview ¶
Package queue implements a queue processor for delayed events. Events are maintained in an in-memory queue, where items are in the order of when they are to be executed. Users should interact with the Processor to process events in the queue. When the queue has at least 1 item, the processor uses a single background goroutine to wait on the next item to be executed.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Options ¶ added in v0.15.2
type Options[K comparable, T Queueable[K]] struct { ExecuteFn func(r T) Clock kclock.Clock }
type Processor ¶
type Processor[K comparable, T Queueable[K]] struct { // contains filtered or unexported fields }
Processor manages the queue of items and processes them at the correct time.
Example ¶
package main
import (
"fmt"
"time"
)
// queueableItem is an item that can be queued and it's used for testing.
type queueableItem struct {
Name string
ExecutionTime time.Time
}
// Key returns the key for this unique item.
func (r queueableItem) Key() string {
return r.Name
}
// ScheduledTime returns the time the item is scheduled to be executed at.
// This is implemented to comply with the queueable interface.
func (r queueableItem) ScheduledTime() time.Time {
return r.ExecutionTime
}
func main() {
// Method invoked when an item is to be executed
executed := make(chan string, 3)
executeFn := func(r *queueableItem) {
executed <- "Executed: " + r.Name
}
// Create the processor
processor := NewProcessor[string, *queueableItem](Options[string, *queueableItem]{
ExecuteFn: executeFn,
})
// Add items to the processor, in any order, using Enqueue
processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)})
processor.Enqueue(&queueableItem{Name: "item2", ExecutionTime: time.Now().Add(200 * time.Millisecond)})
processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(300 * time.Millisecond)})
processor.Enqueue(&queueableItem{Name: "item4", ExecutionTime: time.Now().Add(time.Second)})
// Items with the same value returned by Key() are considered the same, so will be replaced
processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(100 * time.Millisecond)})
// Using Dequeue allows removing an item from the queue
processor.Dequeue("item4")
for range 3 {
fmt.Println(<-executed)
}
}
Output: Executed: item3 Executed: item2 Executed: item1
func NewProcessor ¶
func NewProcessor[K comparable, T Queueable[K]](opts Options[K, T]) *Processor[K, T]
NewProcessor returns a new Processor object. executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.
func (*Processor[K, T]) Close ¶
Close stops the processor. This method blocks until the processor loop returns.
type Queueable ¶ added in v0.15.0
type Queueable[T comparable] interface { comparable Key() T ScheduledTime() time.Time }
Queueable is the interface for items that can be added to the queue.