client

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrTaskNotFound is returned when the requested task does not exist.
	ErrTaskNotFound = errors.New("task not found")

	// ErrTaskRunning is returned when an operation cannot be performed
	// because the task is currently being processed.
	ErrTaskRunning = errors.New("task is currently running")

	// ErrTaskAlreadyFinalized is returned when an operation cannot be performed
	// because the task is already in a terminal state (completed).
	ErrTaskAlreadyFinalized = errors.New("task is already in a terminal state")

	// ErrTaskAlreadyAvailable is returned when trying to retry a task
	// that is already in a pending state.
	ErrTaskAlreadyAvailable = errors.New("task is already available for processing")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client provides task management and inspection operations. It is separate from Producer (which creates tasks) and Consumer (which processes them).

func New

func New(config Config) (*Client, error)

New creates a new Client with the given configuration.

Example
package main

import (
	"log"

	"github.com/jmoiron/sqlx"

	_ "github.com/lib/pq"
	"github.com/yakser/asynqpg/client"
)

func main() {
	db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}

	cl, err := client.New(client.Config{Pool: db})
	if err != nil {
		log.Fatal(err)
	}

	_ = cl
}

func (*Client) CancelTask

func (c *Client) CancelTask(ctx context.Context, id int64) (*TaskInfo, error)

CancelTask cancels a task by its ID.

Behavior by current task status:

  • pending: immediately set to cancelled with finalized_at = now()
  • failed: immediately set to cancelled with finalized_at = now()
  • running: set to cancelled; the consumer detects this and cancels the handler's context
  • cancelled: no-op, returns the existing task (idempotent)
  • completed: returns ErrTaskAlreadyFinalized
  • not found: returns ErrTaskNotFound

func (*Client) CancelTaskTx

func (c *Client) CancelTaskTx(ctx context.Context, tx asynqpg.Querier, id int64) (*TaskInfo, error)

CancelTaskTx cancels a task using the provided executor (transaction).

func (*Client) DeleteTask

func (c *Client) DeleteTask(ctx context.Context, id int64) (*TaskInfo, error)

DeleteTask deletes a task by its ID.

Running tasks (status = running) cannot be deleted. All other states are deletable. The returned TaskInfo contains the task data as it was before deletion.

Behavior by current task status:

  • pending: deleted, returns the deleted task info
  • failed: deleted, returns the deleted task info
  • cancelled: deleted, returns the deleted task info
  • completed: deleted, returns the deleted task info
  • running: returns ErrTaskRunning (running tasks cannot be deleted)
  • not found: returns ErrTaskNotFound

func (*Client) DeleteTaskTx

func (c *Client) DeleteTaskTx(ctx context.Context, tx asynqpg.Querier, id int64) (*TaskInfo, error)

DeleteTaskTx deletes a task using the provided executor (transaction).

func (*Client) GetTask

func (c *Client) GetTask(ctx context.Context, id int64) (*TaskInfo, error)

GetTask returns the full information about a task by its ID.

func (*Client) GetTaskTx

func (c *Client) GetTaskTx(ctx context.Context, tx asynqpg.Querier, id int64) (*TaskInfo, error)

GetTaskTx returns the full information about a task using the provided executor.

func (*Client) ListTasks

func (c *Client) ListTasks(ctx context.Context, params *ListParams) (*ListResult, error)

ListTasks returns tasks matching the given filters with pagination.

Example
package main

import (
	"context"
	"log"

	"github.com/jmoiron/sqlx"

	_ "github.com/lib/pq"
	"github.com/yakser/asynqpg"
	"github.com/yakser/asynqpg/client"
)

func main() {
	db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}

	cl, err := client.New(client.Config{Pool: db})
	if err != nil {
		log.Fatal(err)
	}

	result, err := cl.ListTasks(context.Background(), client.NewListParams().
		States(asynqpg.TaskStatusFailed, asynqpg.TaskStatusPending).
		Types("email:send").
		Limit(50).
		OrderBy(client.OrderByCreatedAt, client.SortDesc),
	)
	if err != nil {
		log.Fatal(err)
	}

	_ = result
}

func (*Client) ListTasksTx

func (c *Client) ListTasksTx(ctx context.Context, tx asynqpg.Querier, params *ListParams) (*ListResult, error)

ListTasksTx returns tasks matching the given filters using the provided executor.

func (*Client) RetryTask

func (c *Client) RetryTask(ctx context.Context, id int64) (*TaskInfo, error)

RetryTask moves a failed or cancelled task back to pending state.

If the task has exhausted all attempts (attempts_left = 0), it is set to 1 to allow at least one more processing attempt.

Behavior by current task status:

  • failed: set to pending, clear finalized_at, ensure attempts_left >= 1
  • cancelled: set to pending, clear finalized_at, ensure attempts_left >= 1
  • pending: returns ErrTaskAlreadyAvailable
  • running: returns ErrTaskRunning
  • completed: returns ErrTaskAlreadyFinalized
  • not found: returns ErrTaskNotFound

func (*Client) RetryTaskTx

func (c *Client) RetryTaskTx(ctx context.Context, tx asynqpg.Querier, id int64) (*TaskInfo, error)

RetryTaskTx moves a task back to pending using the provided executor (transaction).

type Config

type Config struct {
	Pool   asynqpg.Pool
	Logger *slog.Logger
	// TracerProvider for tracing. If nil, global OTel TracerProvider is used.
	TracerProvider trace.TracerProvider
}

Config holds configuration for creating a new Client.

type ListOrderField

type ListOrderField string

ListOrderField represents a column to sort results by.

const (
	OrderByID          ListOrderField = "id"
	OrderByCreatedAt   ListOrderField = "created_at"
	OrderByUpdatedAt   ListOrderField = "updated_at"
	OrderByBlockedTill ListOrderField = "blocked_till"
)

type ListParams

type ListParams struct {
	// contains filtered or unexported fields
}

ListParams configures how tasks are listed and filtered. Use NewListParams() to create with defaults, then chain builder methods.

func NewListParams

func NewListParams() *ListParams

NewListParams creates a new ListParams with default values.

func (*ListParams) IDs

func (p *ListParams) IDs(ids ...int64) *ListParams

IDs filters tasks by specific IDs.

func (*ListParams) Limit

func (p *ListParams) Limit(n int) *ListParams

Limit sets the maximum number of tasks to return. Must be between 1 and 10000. Values outside this range are clamped.

func (*ListParams) Offset

func (p *ListParams) Offset(n int) *ListParams

Offset sets the number of tasks to skip (for pagination).

func (*ListParams) OrderBy

func (p *ListParams) OrderBy(field ListOrderField, order SortOrder) *ListParams

OrderBy sets the sort field and direction.

func (*ListParams) States

func (p *ListParams) States(states ...asynqpg.TaskStatus) *ListParams

States filters tasks by status.

func (*ListParams) Types

func (p *ListParams) Types(types ...string) *ListParams

Types filters tasks by task type.

type ListResult

type ListResult struct {
	Tasks []*TaskInfo
	Total int
}

ListResult contains the list of tasks and the total count matching the filters.

type SortOrder

type SortOrder string

SortOrder represents the direction of sorting.

const (
	SortAsc  SortOrder = "ASC"
	SortDesc SortOrder = "DESC"
)

type TaskInfo

type TaskInfo struct {
	ID               int64
	Type             string
	Payload          []byte
	Status           asynqpg.TaskStatus
	IdempotencyToken *string
	Messages         []string
	BlockedTill      time.Time
	AttemptsLeft     int
	AttemptsElapsed  int
	CreatedAt        time.Time
	UpdatedAt        time.Time
	FinalizedAt      *time.Time
	AttemptedAt      *time.Time
}

TaskInfo represents the full state of a task as stored in the database. It is returned by Client methods for task inspection and management.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL