queue

package
v0.16.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2025 License: Apache-2.0 Imports: 5 Imported by: 3

Documentation

Overview

Package queue implements a queue processor for delayed events. Events are maintained in an in-memory queue, where items are in the order of when they are to be executed. Users should interact with the Processor to process events in the queue. When the queue has at least 1 item, the processor uses a single background goroutine to wait on the next item to be executed.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Options added in v0.15.2

type Options[K comparable, T Queueable[K]] struct {
	ExecuteFn func(r T)
	Clock     kclock.Clock
}

type Processor

type Processor[K comparable, T Queueable[K]] struct {
	// contains filtered or unexported fields
}

Processor manages the queue of items and processes them at the correct time.

Example
package main

import (
	"fmt"
	"time"
)

// queueableItem is an item that can be queued and it's used for testing.
type queueableItem struct {
	Name          string
	ExecutionTime time.Time
}

// Key returns the key for this unique item.
func (r queueableItem) Key() string {
	return r.Name
}

// ScheduledTime returns the time the item is scheduled to be executed at.
// This is implemented to comply with the queueable interface.
func (r queueableItem) ScheduledTime() time.Time {
	return r.ExecutionTime
}

func main() {
	// Method invoked when an item is to be executed
	executed := make(chan string, 3)
	executeFn := func(r *queueableItem) {
		executed <- "Executed: " + r.Name
	}

	// Create the processor
	processor := NewProcessor[string, *queueableItem](Options[string, *queueableItem]{
		ExecuteFn: executeFn,
	})

	// Add items to the processor, in any order, using Enqueue
	processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)})
	processor.Enqueue(&queueableItem{Name: "item2", ExecutionTime: time.Now().Add(200 * time.Millisecond)})
	processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(300 * time.Millisecond)})
	processor.Enqueue(&queueableItem{Name: "item4", ExecutionTime: time.Now().Add(time.Second)})

	// Items with the same value returned by Key() are considered the same, so will be replaced
	processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: time.Now().Add(100 * time.Millisecond)})

	// Using Dequeue allows removing an item from the queue
	processor.Dequeue("item4")

	for range 3 {
		fmt.Println(<-executed)
	}
}
Output:

Executed: item3
Executed: item2
Executed: item1

func NewProcessor

func NewProcessor[K comparable, T Queueable[K]](opts Options[K, T]) *Processor[K, T]

NewProcessor returns a new Processor object. executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.

func (*Processor[K, T]) Close

func (p *Processor[K, T]) Close() error

Close stops the processor. This method blocks until the processor loop returns.

func (*Processor[K, T]) Dequeue

func (p *Processor[K, T]) Dequeue(key K)

Dequeue removes a item from the queue.

func (*Processor[K, T]) Enqueue

func (p *Processor[K, T]) Enqueue(rs ...T)

Enqueue adds a new items to the queue. If a item with the same ID already exists, it'll be replaced.

type Queueable added in v0.15.0

type Queueable[T comparable] interface {
	comparable
	Key() T
	ScheduledTime() time.Time
}

Queueable is the interface for items that can be added to the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL