Documentation
¶
Index ¶
- type AckOpts
- type AckQueue
- func (pq *AckQueue) Ack(id int64) error
- func (q *AckQueue) Close() error
- func (pq *AckQueue) Dequeue() (Msg, error)
- func (pq *AckQueue) DequeueCtx(ctx context.Context) (Msg, error)
- func (pq *AckQueue) Enqueue(item []byte) error
- func (pq *AckQueue) Len() (int, error)
- func (pq *AckQueue) Nack(id int64) error
- func (pq *AckQueue) TryDequeue() (Msg, error)
- func (pq *AckQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
- type AckableQueue
- type DQueue
- type Msg
- type SimpleQueue
- func (q *SimpleQueue) Close() error
- func (pq *SimpleQueue) Dequeue() (Msg, error)
- func (pq *SimpleQueue) DequeueCtx(ctx context.Context) (Msg, error)
- func (pq *SimpleQueue) Enqueue(item []byte) error
- func (pq *SimpleQueue) Len() (int, error)
- func (pq *SimpleQueue) TryDequeue() (Msg, error)
- func (pq *SimpleQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
- type TryDequeuer
- type UniqueAckQueue
- func (uaq *UniqueAckQueue) Ack(id int64) error
- func (q *UniqueAckQueue) Close() error
- func (uaq *UniqueAckQueue) Dequeue() (Msg, error)
- func (uaq *UniqueAckQueue) DequeueCtx(ctx context.Context) (Msg, error)
- func (uaq *UniqueAckQueue) Enqueue(item []byte) error
- func (uaq *UniqueAckQueue) Len() (int, error)
- func (uaq *UniqueAckQueue) Nack(id int64) error
- func (uaq *UniqueAckQueue) TryDequeue() (Msg, error)
- func (uaq *UniqueAckQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
- type UniqueQueue
- func (q *UniqueQueue) Close() error
- func (pq *UniqueQueue) Dequeue() (Msg, error)
- func (pq *UniqueQueue) DequeueCtx(ctx context.Context) (Msg, error)
- func (pq *UniqueQueue) Enqueue(item []byte) error
- func (pq *UniqueQueue) Len() (int, error)
- func (pq *UniqueQueue) TryDequeue() (Msg, error)
- func (pq *UniqueQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckOpts ¶
type AckOpts struct {
// AckTimeout is the timeout for acknowledging a message.
// A value of 0 means no timeout. A value of -1 means infinite timeout.
AckTimeout time.Duration
}
AckOpts represents the queue-level settings for how acknowledgement of messages is handled.
type AckQueue ¶
type AckQueue struct {
// contains filtered or unexported fields
}
AckQueue is a queue that provides the ability to acknowledge messages.
func NewAckQueue ¶
NewAckQueue creates a new ack queue.
func (*AckQueue) DequeueCtx ¶
DequeueCtx blocks until an item is available or the context is canceled.
func (*AckQueue) TryDequeue ¶
TryDequeue attempts to dequeue an item from the queue without blocking.
type AckableQueue ¶
type AckableQueue interface {
DQueue
// Ack acknowledges that an item has been successfully processed.
// It takes the ID of the message to acknowledge.
// Returns an error if the operation fails or the message doesn't exist.
Ack(id int64) error
// Nack indicates that an item processing has failed and should be requeued.
// It takes the ID of the message to negative acknowledge.
// Returns an error if the operation fails or the message doesn't exist.
Nack(id int64) error
}
AckableQueue extends the DQueue interface with acknowledgement capabilities. It allows for explicit acknowledgement or negative acknowledgement of processed items.
type DQueue ¶
type DQueue interface {
// Enqueue adds an item to the queue.
// It returns an error if the operation fails.
Enqueue(item []byte) error
// Dequeue removes and returns the next item from the queue.
// It blocks if the queue is empty until an item becomes available.
// Returns an error if the operation fails.
Dequeue() (Msg, error)
// DequeueCtx removes and returns the next item from the queue.
// It blocks if the queue is empty until an item becomes available or the context is cancelled.
// Returns an error if the operation fails or the context is cancelled.
DequeueCtx(ctx context.Context) (Msg, error)
// TryDequeue attempts to remove and return the next item from the queue.
// It returns immediately, even if the queue is empty.
// Returns an error if the operation fails or the queue is empty.
TryDequeue() (Msg, error)
// TryDequeueCtx attempts to remove and return the next item from the queue.
// It returns immediately if an item is available, or waits until the context is cancelled.
// Returns an error if the operation fails, the queue is empty, or the context is cancelled.
TryDequeueCtx(ctx context.Context) (Msg, error)
// Close closes the queue and releases any resources.
// It returns an error if the operation fails.
Close() error
}
DQueue represents a durable queue interface. It provides methods for enqueueing and dequeueing items, with both blocking and non-blocking operations.
type Msg ¶
type Msg struct {
// ID is a unique identifier for the message within the queue.
ID int64
// Item contains the actual message data.
Item []byte
}
Msg represents a message in the queue. It contains the message ID and the actual data.
type SimpleQueue ¶
type SimpleQueue struct {
// contains filtered or unexported fields
}
func NewSimpleQueue ¶
func NewSimpleQueue(filePath string) (*SimpleQueue, error)
NewSimpleQueue creates a new simple queue.
func (*SimpleQueue) Dequeue ¶
func (pq *SimpleQueue) Dequeue() (Msg, error)
Dequeue blocks until an item is available. Uses background context.
func (*SimpleQueue) DequeueCtx ¶
func (pq *SimpleQueue) DequeueCtx(ctx context.Context) (Msg, error)
Dequeue blocks until an item is available or the context is canceled. If the context is canceled, it returns an empty Msg and an error.
func (*SimpleQueue) Enqueue ¶
func (pq *SimpleQueue) Enqueue(item []byte) error
Enqueue adds an item to the queue.
func (*SimpleQueue) Len ¶
func (pq *SimpleQueue) Len() (int, error)
Len returns the number of items in the queue.
func (*SimpleQueue) TryDequeue ¶
func (pq *SimpleQueue) TryDequeue() (Msg, error)
TryDequeue attempts to dequeue an item without blocking. If no item is available, it returns an empty Msg and an error.
func (*SimpleQueue) TryDequeueCtx ¶
func (pq *SimpleQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
TryDequeueCtx attempts to dequeue an item without blocking using a context. If no item is available, it returns an empty Msg and an error.
type UniqueAckQueue ¶
type UniqueAckQueue struct {
// contains filtered or unexported fields
}
UniqueAckQueue is a acknowledgeable queue that ensures that each item is only processed once.
func NewUniqueAckQueue ¶
func NewUniqueAckQueue(filePath string, opts AckOpts) (*UniqueAckQueue, error)
NewUniqueAckQueue creates a new unique ack queue.
func (*UniqueAckQueue) Ack ¶
func (uaq *UniqueAckQueue) Ack(id int64) error
Ack marks an item as processed and removes it from the queue.
func (*UniqueAckQueue) Dequeue ¶
func (uaq *UniqueAckQueue) Dequeue() (Msg, error)
Dequeue blocks until an item is available. Uses background context.
func (*UniqueAckQueue) DequeueCtx ¶
func (uaq *UniqueAckQueue) DequeueCtx(ctx context.Context) (Msg, error)
DequeueCtx attempts to dequeue an item without blocking using a context. If no item is available, it returns an empty Msg and an error.
func (*UniqueAckQueue) Enqueue ¶
func (uaq *UniqueAckQueue) Enqueue(item []byte) error
Enqueue adds an item to the queue.
func (*UniqueAckQueue) Len ¶
func (uaq *UniqueAckQueue) Len() (int, error)
Len returns the number of items in the queue.
func (*UniqueAckQueue) Nack ¶
func (uaq *UniqueAckQueue) Nack(id int64) error
Nack marks an item as not processed and removes it from the queue.
func (*UniqueAckQueue) TryDequeue ¶
func (uaq *UniqueAckQueue) TryDequeue() (Msg, error)
TryDequeue attempts to dequeue an item without blocking. If no item is available, it returns an empty Msg and an error.
func (*UniqueAckQueue) TryDequeueCtx ¶
func (uaq *UniqueAckQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
TryDequeueCtx attempts to dequeue an item without blocking using a context. If no item is available, it returns an empty Msg and an error.
type UniqueQueue ¶
type UniqueQueue struct {
// contains filtered or unexported fields
}
UniqueQueue is a queue that ensures that each item is only processed once.
func NewUniqueQueue ¶
func NewUniqueQueue(filePath string) (*UniqueQueue, error)
NewUniqueQueue creates a new unique queue.
func (*UniqueQueue) Dequeue ¶
func (pq *UniqueQueue) Dequeue() (Msg, error)
Dequeue blocks until an item is available. Uses background context.
func (*UniqueQueue) DequeueCtx ¶
func (pq *UniqueQueue) DequeueCtx(ctx context.Context) (Msg, error)
Dequeue blocks until an item is available or the context is canceled.
func (*UniqueQueue) Enqueue ¶
func (pq *UniqueQueue) Enqueue(item []byte) error
Enqueue adds an item to the queue.
func (*UniqueQueue) Len ¶
func (pq *UniqueQueue) Len() (int, error)
Len returns the number of items in the queue.
func (*UniqueQueue) TryDequeue ¶
func (pq *UniqueQueue) TryDequeue() (Msg, error)
TryDequeue attempts to dequeue an item without blocking. If no item is available, it returns an empty Msg and an error.
func (*UniqueQueue) TryDequeueCtx ¶
func (pq *UniqueQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
TryDequeueCtx attempts to dequeue an item without blocking using a context. If no item is available, it returns an empty Msg and an error.