godq

package module
v0.1.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2024 License: MIT Imports: 6 Imported by: 0

README

godq: Go Disk Queue

A lightweight, sqlite-backed persistent queue implementation in Go.

Go GoDoc codecov

godq is a lightweight, persistent queue implementation in Go, using SQLite as the underlying storage mechanism. It provides various queue types to suit different use cases, including simple queues, acknowledged queues, and unique item queues.

Features

  • Persistent storage using SQLite
  • Multiple queue types:
    • Simple Queue
    • Acknowledged Queue
    • Unique Queue
    • Unique Acknowledged Queue
  • Blocking and non-blocking dequeue operations
  • Context support for cancellation and timeouts
  • Thread-safe operations

Installation

To use godq in your Go project, run:

go get github.com/mattdeak/godq

Make sure you have SQLite installed on your system.

Usage

Here are some examples of how to use different queue types:

Simple Queue
import "github.com/mattdeak/godq"

// Create a new simple queue
queue, err := godq.NewSimpleQueue("queue.db")
if err != nil {
    // Handle error
}

// Enqueue an item
err = queue.Enqueue([]byte("Hello, World!"))
if err != nil {
    // Handle error
}

// Dequeue an item
msg, err := queue.Dequeue()
if err != nil {
    // Handle error
}
fmt.Println(string(msg.Item))
Acknowledged Queue
import "github.com/mattdeak/godq"

// Create a new acknowledged queue with a 30-second ack timeout
queue, err := godq.NewAckQueue("ack_queue.db", godq.AckOpts{AckTimeout: 30 * time.Second})
if err != nil {
    // Handle error
}

// Enqueue an item
err = queue.Enqueue([]byte("Process me"))
if err != nil {
    // Handle error
}

// Dequeue an item
msg, err := queue.Dequeue()
if err != nil {
    // Handle error
}

// Process the item...

// Acknowledge the item
err = queue.Ack(msg.ID)
if err != nil {
    // Handle error
}
Unique Queue
import "github.com/mattdeak/godq"

// Create a new unique queue
queue, err := godq.NewUniqueQueue("unique_queue.db")
if err != nil {
    // Handle error
}

// Enqueue items (duplicates will be ignored)
queue.Enqueue([]byte("unique_item_1"))
queue.Enqueue([]byte("unique_item_1")) // This will be ignored
queue.Enqueue([]byte("unique_item_2"))

// Dequeue items
msg1, _ := queue.Dequeue()
msg2, _ := queue.Dequeue()
msg3, err := queue.TryDequeue() // This will return an error (queue is empty)

Queue Types

  1. SimpleQueue: Basic FIFO queue with no additional features.
  2. AckQueue: Queue with acknowledgment support. Items must be acknowledged after processing, or they will be requeued after a timeout.
  3. UniqueQueue: Queue that only allows unique items. Duplicate enqueue attempts are silently ignored.
  4. UniqueAckQueue: Combination of UniqueQueue and AckQueue. Ensures unique items with acknowledgment support.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License.

Future Work
  • More Queue Configurability
  • Various Efficiency Improvements

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckOpts

type AckOpts struct {
	// AckTimeout is the timeout for acknowledging a message.
	// A value of 0 means no timeout. A value of -1 means infinite timeout.
	AckTimeout time.Duration
}

AckOpts represents the queue-level settings for how acknowledgement of messages is handled.

type AckQueue

type AckQueue struct {
	// contains filtered or unexported fields
}

AckQueue is a queue that provides the ability to acknowledge messages.

func NewAckQueue

func NewAckQueue(filePath string, opts AckOpts) (*AckQueue, error)

NewAckQueue creates a new ack queue.

func (*AckQueue) Ack

func (pq *AckQueue) Ack(id int64) error

Ack marks an item as processed.

func (*AckQueue) Close

func (q *AckQueue) Close() error

func (*AckQueue) Dequeue

func (pq *AckQueue) Dequeue() (Msg, error)

Dequeue blocks until an item is available. It uses a background context.

func (*AckQueue) DequeueCtx

func (pq *AckQueue) DequeueCtx(ctx context.Context) (Msg, error)

DequeueCtx blocks until an item is available or the context is canceled.

func (*AckQueue) Enqueue

func (pq *AckQueue) Enqueue(item []byte) error

Enqueue adds an item to the queue.

func (*AckQueue) Len

func (pq *AckQueue) Len() (int, error)

Len returns the number of items in the queue.

func (*AckQueue) Nack

func (pq *AckQueue) Nack(id int64) error

Nack marks an item as not processed and re-queues it.

func (*AckQueue) TryDequeue

func (pq *AckQueue) TryDequeue() (Msg, error)

TryDequeue attempts to dequeue an item from the queue without blocking.

func (*AckQueue) TryDequeueCtx

func (pq *AckQueue) TryDequeueCtx(ctx context.Context) (Msg, error)

TryDequeueCtx attempts to dequeue an item from the queue. It returns the item and its ID, or an error if the item could not be dequeued.

type AckableQueue

type AckableQueue interface {
	DQueue

	// Ack acknowledges that an item has been successfully processed.
	// It takes the ID of the message to acknowledge.
	// Returns an error if the operation fails or the message doesn't exist.
	Ack(id int64) error

	// Nack indicates that an item processing has failed and should be requeued.
	// It takes the ID of the message to negative acknowledge.
	// Returns an error if the operation fails or the message doesn't exist.
	Nack(id int64) error
}

AckableQueue extends the DQueue interface with acknowledgement capabilities. It allows for explicit acknowledgement or negative acknowledgement of processed items.

type DQueue

type DQueue interface {
	// Enqueue adds an item to the queue.
	// It returns an error if the operation fails.
	Enqueue(item []byte) error

	// Dequeue removes and returns the next item from the queue.
	// It blocks if the queue is empty until an item becomes available.
	// Returns an error if the operation fails.
	Dequeue() (Msg, error)

	// DequeueCtx removes and returns the next item from the queue.
	// It blocks if the queue is empty until an item becomes available or the context is cancelled.
	// Returns an error if the operation fails or the context is cancelled.
	DequeueCtx(ctx context.Context) (Msg, error)

	// TryDequeue attempts to remove and return the next item from the queue.
	// It returns immediately, even if the queue is empty.
	// Returns an error if the operation fails or the queue is empty.
	TryDequeue() (Msg, error)

	// TryDequeueCtx attempts to remove and return the next item from the queue.
	// It returns immediately if an item is available, or waits until the context is cancelled.
	// Returns an error if the operation fails, the queue is empty, or the context is cancelled.
	TryDequeueCtx(ctx context.Context) (Msg, error)

	// Close closes the queue and releases any resources.
	// It returns an error if the operation fails.
	Close() error
}

DQueue represents a durable queue interface. It provides methods for enqueueing and dequeueing items, with both blocking and non-blocking operations.

type Msg

type Msg struct {
	// ID is a unique identifier for the message within the queue.
	ID int64

	// Item contains the actual message data.
	Item []byte
}

Msg represents a message in the queue. It contains the message ID and the actual data.

type SimpleQueue

type SimpleQueue struct {
	// contains filtered or unexported fields
}

func NewSimpleQueue

func NewSimpleQueue(filePath string) (*SimpleQueue, error)

NewSimpleQueue creates a new simple queue.

func (*SimpleQueue) Close

func (q *SimpleQueue) Close() error

func (*SimpleQueue) Dequeue

func (pq *SimpleQueue) Dequeue() (Msg, error)

Dequeue blocks until an item is available. Uses background context.

func (*SimpleQueue) DequeueCtx

func (pq *SimpleQueue) DequeueCtx(ctx context.Context) (Msg, error)

Dequeue blocks until an item is available or the context is canceled. If the context is canceled, it returns an empty Msg and an error.

func (*SimpleQueue) Enqueue

func (pq *SimpleQueue) Enqueue(item []byte) error

Enqueue adds an item to the queue.

func (*SimpleQueue) Len

func (pq *SimpleQueue) Len() (int, error)

Len returns the number of items in the queue.

func (*SimpleQueue) TryDequeue

func (pq *SimpleQueue) TryDequeue() (Msg, error)

TryDequeue attempts to dequeue an item without blocking. If no item is available, it returns an empty Msg and an error.

func (*SimpleQueue) TryDequeueCtx

func (pq *SimpleQueue) TryDequeueCtx(ctx context.Context) (Msg, error)

TryDequeueCtx attempts to dequeue an item without blocking using a context. If no item is available, it returns an empty Msg and an error.

type TryDequeuer

type TryDequeuer interface {
	TryDequeueCtx(ctx context.Context) (Msg, error)
}

type UniqueAckQueue

type UniqueAckQueue struct {
	// contains filtered or unexported fields
}

UniqueAckQueue is a acknowledgeable queue that ensures that each item is only processed once.

func NewUniqueAckQueue

func NewUniqueAckQueue(filePath string, opts AckOpts) (*UniqueAckQueue, error)

NewUniqueAckQueue creates a new unique ack queue.

func (*UniqueAckQueue) Ack

func (uaq *UniqueAckQueue) Ack(id int64) error

Ack marks an item as processed and removes it from the queue.

func (*UniqueAckQueue) Close

func (q *UniqueAckQueue) Close() error

func (*UniqueAckQueue) Dequeue

func (uaq *UniqueAckQueue) Dequeue() (Msg, error)

Dequeue blocks until an item is available. Uses background context.

func (*UniqueAckQueue) DequeueCtx

func (uaq *UniqueAckQueue) DequeueCtx(ctx context.Context) (Msg, error)

DequeueCtx attempts to dequeue an item without blocking using a context. If no item is available, it returns an empty Msg and an error.

func (*UniqueAckQueue) Enqueue

func (uaq *UniqueAckQueue) Enqueue(item []byte) error

Enqueue adds an item to the queue.

func (*UniqueAckQueue) Len

func (uaq *UniqueAckQueue) Len() (int, error)

Len returns the number of items in the queue.

func (*UniqueAckQueue) Nack

func (uaq *UniqueAckQueue) Nack(id int64) error

Nack marks an item as not processed and removes it from the queue.

func (*UniqueAckQueue) TryDequeue

func (uaq *UniqueAckQueue) TryDequeue() (Msg, error)

TryDequeue attempts to dequeue an item without blocking. If no item is available, it returns an empty Msg and an error.

func (*UniqueAckQueue) TryDequeueCtx

func (uaq *UniqueAckQueue) TryDequeueCtx(ctx context.Context) (Msg, error)

TryDequeueCtx attempts to dequeue an item without blocking using a context. If no item is available, it returns an empty Msg and an error.

type UniqueQueue

type UniqueQueue struct {
	// contains filtered or unexported fields
}

UniqueQueue is a queue that ensures that each item is only processed once.

func NewUniqueQueue

func NewUniqueQueue(filePath string) (*UniqueQueue, error)

NewUniqueQueue creates a new unique queue.

func (*UniqueQueue) Close

func (q *UniqueQueue) Close() error

func (*UniqueQueue) Dequeue

func (pq *UniqueQueue) Dequeue() (Msg, error)

Dequeue blocks until an item is available. Uses background context.

func (*UniqueQueue) DequeueCtx

func (pq *UniqueQueue) DequeueCtx(ctx context.Context) (Msg, error)

Dequeue blocks until an item is available or the context is canceled.

func (*UniqueQueue) Enqueue

func (pq *UniqueQueue) Enqueue(item []byte) error

Enqueue adds an item to the queue.

func (*UniqueQueue) Len

func (pq *UniqueQueue) Len() (int, error)

Len returns the number of items in the queue.

func (*UniqueQueue) TryDequeue

func (pq *UniqueQueue) TryDequeue() (Msg, error)

TryDequeue attempts to dequeue an item without blocking. If no item is available, it returns an empty Msg and an error.

func (*UniqueQueue) TryDequeueCtx

func (pq *UniqueQueue) TryDequeueCtx(ctx context.Context) (Msg, error)

TryDequeueCtx attempts to dequeue an item without blocking using a context. If no item is available, it returns an empty Msg and an error.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL