queue

package
v0.15.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2025 License: Apache-2.0 Imports: 5 Imported by: 3

Documentation

Overview

Package queue implements a queue processor for delayed events. Events are maintained in an in-memory queue, where items are in the order of when they are to be executed. Users should interact with the Processor to process events in the queue. When the queue has at least 1 item, the processor uses a single background goroutine to wait on the next item to be executed.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Processor

type Processor[K comparable, T Queueable[K]] struct {
	// contains filtered or unexported fields
}

Processor manages the queue of items and processes them at the correct time.

Example
package main

import (
	"fmt"
	"time"
)

// queueableItem is an item that can be queued and it's used for testing.
type queueableItem struct {
	Name          string
	ExecutionTime time.Time
}

// Key returns the key for this unique item.
func (r queueableItem) Key() string {
	return r.Name
}

// ScheduledTime returns the time the item is scheduled to be executed at.
// This is implemented to comply with the queueable interface.
func (r queueableItem) ScheduledTime() time.Time {
	return r.ExecutionTime
}

func main() {
	// Method invoked when an item is to be executed
	executed := make(chan string, 3)
	executeFn := func(r *queueableItem) {
		executed <- "Executed: " + r.Name
	}

	// Create the processor
	processor := NewProcessor[string, *queueableItem](executeFn)

	// Add items to the processor, in any order, using Enqueue
	processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)})
	processor.Enqueue(&queueableItem{Name: "item2", ExecutionTime: time.Now().Add(200 * time.Millisecond)})
	processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(300 * time.Millisecond)})
	processor.Enqueue(&queueableItem{Name: "item4", ExecutionTime: time.Now().Add(time.Second)})

	// Items with the same value returned by Key() are considered the same, so will be replaced
	processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(100 * time.Millisecond)})

	// Using Dequeue allows removing an item from the queue
	processor.Dequeue("item4")

	for i := 0; i < 3; i++ {
		fmt.Println(<-executed)
	}
}
Output:

Executed: item3
Executed: item2
Executed: item1

func NewProcessor

func NewProcessor[K comparable, T Queueable[K]](executeFn func(r T)) *Processor[K, T]

NewProcessor returns a new Processor object. executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.

func (*Processor[K, T]) Close

func (p *Processor[K, T]) Close() error

Close stops the processor. This method blocks until the processor loop returns.

func (*Processor[K, T]) Dequeue

func (p *Processor[K, T]) Dequeue(key K)

Dequeue removes a item from the queue.

func (*Processor[K, T]) Enqueue

func (p *Processor[K, T]) Enqueue(r T)

Enqueue adds a new item to the queue. If a item with the same ID already exists, it'll be replaced.

func (*Processor[K, T]) WithClock added in v0.12.1

func (p *Processor[K, T]) WithClock(clock kclock.Clock) *Processor[K, T]

WithClock sets the clock used by the processor. Used for testing.

type Queueable added in v0.15.0

type Queueable[T comparable] interface {
	comparable
	Key() T
	ScheduledTime() time.Time
}

Queueable is the interface for items that can be added to the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL