queue

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2025 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueClosed = errors.New("queue is closed")
	ErrQueueFull   = errors.New("queue is full")
	ErrQueueEmpty  = errors.New("queue is empty")
)

Errors

View Source
var (
	DefaultPollInterval = 20 * time.Millisecond
)

Functions

This section is empty.

Types

type Factory

type Factory interface {
	// Create a new queue if name does not exist
	// If name already exists, return the existing queue
	GetOrCreate(name string, maxSize int) Queue
}

type Handler

type Handler func([]byte) error

type Kind

type Kind uint8
const (
	KindFIFO Kind = iota + 1
	KindStandard
)

type MemoryFactory

type MemoryFactory struct {
	// contains filtered or unexported fields
}

func NewMemoryFactory

func NewMemoryFactory() *MemoryFactory

func (*MemoryFactory) GetOrCreate

func (f *MemoryFactory) GetOrCreate(name string, maxSize int) Queue

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

func NewMemoryQueue

func NewMemoryQueue(maxSize int, pollInterval time.Duration) *MemoryQueue

func (*MemoryQueue) Close

func (q *MemoryQueue) Close()

func (*MemoryQueue) Dequeue

func (q *MemoryQueue) Dequeue() ([]byte, error)

func (*MemoryQueue) Enqueue

func (q *MemoryQueue) Enqueue(data []byte) error

func (*MemoryQueue) Kind

func (q *MemoryQueue) Kind() Kind

func (*MemoryQueue) MaxSize

func (q *MemoryQueue) MaxSize() int

func (*MemoryQueue) Recover

func (q *MemoryQueue) Recover(b []byte) error

func (*MemoryQueue) Subscribe

func (q *MemoryQueue) Subscribe(cb Handler)

type Queue

type Queue interface {
	Kind() Kind

	// Reports max size of queue
	MaxSize() int

	// Push data to end of queue
	// Failed if queue is full or closed
	Enqueue([]byte) error

	// Pop data from beginning of queue without message confirmation
	// Failed if queue is empty
	Dequeue() ([]byte, error)

	// Subscribe queue with message confirmation.
	// Once handler returns error, it'll automatically put message back to queue using `Recover` mechanism internally.
	Subscribe(h Handler)
}

The interface of queue The implementation of queue should be thread-safe

type RecoverableQueue

type RecoverableQueue interface {
	Queue

	// or just does not implement it
	Recover([]byte) error
}

type SafeQueue

type SafeQueue struct {
	// contains filtered or unexported fields
}

func NewSafeQueue

func NewSafeQueue(queue Queue) *SafeQueue

func (*SafeQueue) Dequeue

func (q *SafeQueue) Dequeue() ([]byte, error)

func (*SafeQueue) Enqueue

func (q *SafeQueue) Enqueue(data []byte) error

func (*SafeQueue) IsRecoverable

func (q *SafeQueue) IsRecoverable() bool

func (*SafeQueue) Kind

func (q *SafeQueue) Kind() Kind

func (*SafeQueue) MaxSize

func (q *SafeQueue) MaxSize() int

func (*SafeQueue) Recover

func (q *SafeQueue) Recover(b []byte) error

func (*SafeQueue) Subscribe

func (q *SafeQueue) Subscribe(cb Handler)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL