Documentation
¶
Index ¶
- Constants
- type AckOpts
- type AckableQueue
- type AcknowledgeableQueue
- func (q *AcknowledgeableQueue) Ack(id int64) error
- func (q *AcknowledgeableQueue) Dequeue() (Msg, error)
- func (q *AcknowledgeableQueue) DequeueCtx(ctx context.Context) (Msg, error)
- func (q *AcknowledgeableQueue) ExpireAck(id int64) error
- func (q *AcknowledgeableQueue) Len() (int, error)
- func (q *AcknowledgeableQueue) Nack(id int64) error
- func (q *AcknowledgeableQueue) RegisterBehaviourOnFailure(fn func(msg Msg) error)
- func (q *AcknowledgeableQueue) RegisterDeadLetterQueue(dlq Enqueuer)
- func (q *AcknowledgeableQueue) RegisterOnFailureCallback(fn func(msg Msg) error)
- func (q *AcknowledgeableQueue) TryDequeue() (Msg, error)
- func (q *AcknowledgeableQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
- type Dequeuer
- type Enqueuer
- type ErrDBLocked
- type ErrNoItemsWaiting
- type Msg
- type Queue
- func (q *Queue) Close() error
- func (q *Queue) Dequeue() (Msg, error)
- func (q *Queue) DequeueCtx(ctx context.Context) (Msg, error)
- func (q *Queue) Enqueue(item []byte) error
- func (q *Queue) EnqueueCtx(ctx context.Context, item []byte) error
- func (q *Queue) Len() (int, error)
- func (q *Queue) TryDequeue() (Msg, error)
- func (q *Queue) TryDequeueCtx(ctx context.Context) (Msg, error)
- func (q *Queue) TryEnqueue(item []byte) error
- func (q *Queue) TryEnqueueCtx(ctx context.Context, item []byte) error
- type Queuer
- type TryDequeuer
Constants ¶
const (
InfiniteRetries = -1
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckableQueue ¶
type AckableQueue interface {
Queuer
// Ack acknowledges that an item has been successfully processed.
// It takes the ID of the message to acknowledge.
// Returns an error if the operation fails or the message doesn't exist.
Ack(id int64) error
// Nack indicates that an item processing has failed and should be requeued.
// It takes the ID of the message to negative acknowledge.
// Returns an error if the operation fails or the message doesn't exist.
Nack(id int64) error
}
AckableQueue extends the DQueue interface with acknowledgement capabilities. It allows for explicit acknowledgement or negative acknowledgement of processed items.
type AcknowledgeableQueue ¶
func NewAckQueue ¶
func NewAckQueue(filePath string, opts AckOpts) (*AcknowledgeableQueue, error)
NewAckQueue creates a new ack queue. If filePath is empty, the queue will be created in memory.
func NewUniqueAckQueue ¶
func NewUniqueAckQueue(filePath string, opts AckOpts) (*AcknowledgeableQueue, error)
NewUniqueAckQueue creates a new unique ack queue.
func (*AcknowledgeableQueue) Ack ¶
func (q *AcknowledgeableQueue) Ack(id int64) error
Ack acknowledges that an item has been successfully processed. It takes the ID of the message to acknowledge and returns an error if the operation fails.
func (*AcknowledgeableQueue) Dequeue ¶
func (q *AcknowledgeableQueue) Dequeue() (Msg, error)
Dequeue removes and returns the next item from the queue. It blocks if the queue is empty until an item becomes available. It uses a background context internally.
func (*AcknowledgeableQueue) DequeueCtx ¶
func (q *AcknowledgeableQueue) DequeueCtx(ctx context.Context) (Msg, error)
DequeueCtx removes and returns the next item from the queue. It blocks if the queue is empty until an item becomes available or the context is cancelled.
func (*AcknowledgeableQueue) ExpireAck ¶
func (q *AcknowledgeableQueue) ExpireAck(id int64) error
ExpireAck expires the acknowledgement deadline for an item, which requeues it to the front of the queue. It takes the ID of the message to expire the acknowledgement deadline for. Returns an error if the operation fails or the message doesn't exist.
func (*AcknowledgeableQueue) Len ¶
func (q *AcknowledgeableQueue) Len() (int, error)
func (*AcknowledgeableQueue) Nack ¶
func (q *AcknowledgeableQueue) Nack(id int64) error
Nack indicates that an item processing has failed and should be requeued. It takes the ID of the message to negative acknowledge. Returns an error if the operation fails or the message doesn't exist.
func (*AcknowledgeableQueue) RegisterBehaviourOnFailure ¶
func (q *AcknowledgeableQueue) RegisterBehaviourOnFailure(fn func(msg Msg) error)
SetBehaviourOnFailure sets the behaviour on failure for the queue. This occurs if a message receives more Nacks than the max retries. It takes a function that takes a message and returns an error. You can manually requeue it, put it in a different queue, or do whatever else. The default behaviour is to drop the message.
func (*AcknowledgeableQueue) RegisterDeadLetterQueue ¶
func (q *AcknowledgeableQueue) RegisterDeadLetterQueue(dlq Enqueuer)
RegisterDeadLetterQueue sets the dead letter queue for this AcknowledgeableQueue. This is shorthand for RegisterFailureCallback -> dlq.Enqueue.
func (*AcknowledgeableQueue) RegisterOnFailureCallback ¶
func (q *AcknowledgeableQueue) RegisterOnFailureCallback(fn func(msg Msg) error)
RegisterOnFailureCallback adds a callback to the queue that is called when a message fails to acknowledge.
func (*AcknowledgeableQueue) TryDequeue ¶
func (q *AcknowledgeableQueue) TryDequeue() (Msg, error)
TryDequeue attempts to remove and return the next item from the queue. It returns immediately, even if the queue is empty. It uses a background context internally.
func (*AcknowledgeableQueue) TryDequeueCtx ¶
func (q *AcknowledgeableQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
TryDequeueCtx attempts to remove and return the next item from the queue. It returns immediately if an item is available, or waits until the context is cancelled.
type Dequeuer ¶
type Dequeuer interface {
// Dequeue removes and returns the next item from the queue.
// It blocks if the queue is empty until an item becomes available.
// Returns an error if the operation fails.
Dequeue() (Msg, error)
// DequeueCtx removes and returns the next item from the queue.
// It blocks if the queue is empty until an item becomes available or the context is cancelled.
// Returns an error if the operation fails or the context is cancelled.
DequeueCtx(ctx context.Context) (Msg, error)
// TryDequeue attempts to remove and return the next item from the queue.
// It returns immediately, even if the queue is empty.
// Returns an error if the operation fails or the queue is empty.
TryDequeue() (Msg, error)
// TryDequeueCtx attempts to remove and return the next item from the queue.
// It returns immediately if an item is available, or waits until the context is cancelled.
// Returns an error if the operation fails, the queue is empty, or the context is cancelled.
TryDequeueCtx(ctx context.Context) (Msg, error)
}
Dequeuer provides methods for dequeueing items from the queue.
type Enqueuer ¶
type Enqueuer interface {
// Enqueue adds an item to the queue.
// It returns an error if the operation fails.
Enqueue(item []byte) error
EnqueueCtx(ctx context.Context, item []byte) error
TryEnqueue(item []byte) error
TryEnqueueCtx(ctx context.Context, item []byte) error
}
Enqueuer provides methods for enqueueing items to the queue.
type ErrDBLocked ¶
type ErrDBLocked struct{}
func (*ErrDBLocked) Error ¶
func (e *ErrDBLocked) Error() string
type ErrNoItemsWaiting ¶
type ErrNoItemsWaiting struct{}
func (*ErrNoItemsWaiting) Error ¶
func (e *ErrNoItemsWaiting) Error() string
type Msg ¶
type Msg struct {
// ID is a unique identifier for the message within the queue.
ID int64
// Item contains the actual message data.
Item []byte
}
Msg represents a message in the queue. It contains the message ID and the actual data.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue represents the basic queue structure. It contains the database connection, queue name, and other necessary fields for queue operations.
func NewSimpleQueue ¶
NewSimpleQueue creates a new simple queue. If filePath is empty, the queue will be created in memory.
func NewUniqueQueue ¶
NewUniqueQueue creates a new unique queue.
func (*Queue) Close ¶
Close closes the database connection associated with the queue. It should be called when the queue is no longer needed to free up resources.
func (*Queue) DequeueCtx ¶
Dequeue blocks until an item is available or the context is canceled. If the context is canceled, it returns an empty Msg and an error.
func (*Queue) Enqueue ¶
Enqueue adds an item to the queue. It returns an error if the operation fails.
func (*Queue) Len ¶
Len returns the number of items in the queue. It returns the count and any error encountered during the operation.
func (*Queue) TryDequeue ¶
func (*Queue) TryEnqueue ¶
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
ack_queue_example
command
|
|
|
concurrent_queue_example
command
|
|
|
dead_letter_queue_example
command
|
|
|
simple_queue_example
command
|
|
|
tiered_dlq_example
command
|
|
|
unique_queue_example
command
|
|