queue

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2025 License: Apache-2.0, MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Setup

func Setup(ctx context.Context, db *sql.DB) error

Setup the queue in the database.

Types

type ID

type ID string

type Interface

type Interface interface {
	MaxReceive() int
	Timeout() time.Duration
	Send(context.Context, Message) error
	SendTx(context.Context, *sql.Tx, Message) error
	SendAndGetID(context.Context, Message) (ID, error)
	Receive(context.Context) (*Message, error)
	ReceiveAndWait(context.Context, time.Duration) (*Message, error)
	Extend(context.Context, ID, time.Duration) error
	Delete(context.Context, ID) error
	MoveToDeadLetter(context.Context, ID, string, string, string) error
}

type Message

type Message struct {
	ID       ID
	Delay    time.Duration
	Received int
	Body     []byte
}

type NewOpts

type NewOpts struct {
	DB         *sql.DB
	MaxReceive int // Max receive count for messages before they cannot be received anymore.
	Name       string
	Timeout    time.Duration // Default timeout for messages before they can be re-received.
	Logger     logger.StandardLogger
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func New

func New(opts NewOpts) (*Queue, error)

New Queue with the given options. Defaults if not given: - Logs are discarded. - Max receive count is 3. - Timeout is five seconds.

func (*Queue) Delete

func (q *Queue) Delete(ctx context.Context, id ID) error

Delete a Message from the queue by id.

func (*Queue) Extend

func (q *Queue) Extend(ctx context.Context, id ID, delay time.Duration) error

Extend a Message timeout by the given delay from now.

func (*Queue) MaxReceive

func (q *Queue) MaxReceive() int

func (*Queue) MoveToDeadLetter

func (q *Queue) MoveToDeadLetter(ctx context.Context, id ID, jobName, failureReason, errorMsg string) error

MoveToDeadLetter moves a message from the main queue to the dead letter queue. This is used for jobs that fail permanently or exceed max retries.

func (*Queue) Receive

func (q *Queue) Receive(ctx context.Context) (*Message, error)

Receive a Message from the queue, or nil if there is none.

func (*Queue) ReceiveAndWait

func (q *Queue) ReceiveAndWait(ctx context.Context, interval time.Duration) (*Message, error)

ReceiveAndWait for a Message from the queue, polling at the given interval, until the context is cancelled. If the context is cancelled, the error will be non-nil. See context.Context.Err.

func (*Queue) Send

func (q *Queue) Send(ctx context.Context, m Message) error

Send a Message to the queue with an optional delay.

func (*Queue) SendAndGetID

func (q *Queue) SendAndGetID(ctx context.Context, m Message) (ID, error)

SendAndGetID is like Send, but also returns the message ID, which can be used to interact with the message without receiving it first.

func (*Queue) SendTx

func (q *Queue) SendTx(ctx context.Context, tx *sql.Tx, m Message) error

SendTx is like Send, but within an existing transaction.

func (*Queue) Timeout

func (q *Queue) Timeout() time.Duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL