Documentation
¶
Index ¶
- Variables
- type Client
- func (c *Client) CancelTask(ctx context.Context, id int64) (*TaskInfo, error)
- func (c *Client) CancelTaskTx(ctx context.Context, tx asynqpg.Querier, id int64) (*TaskInfo, error)
- func (c *Client) DeleteTask(ctx context.Context, id int64) (*TaskInfo, error)
- func (c *Client) DeleteTaskTx(ctx context.Context, tx asynqpg.Querier, id int64) (*TaskInfo, error)
- func (c *Client) GetTask(ctx context.Context, id int64) (*TaskInfo, error)
- func (c *Client) GetTaskTx(ctx context.Context, tx asynqpg.Querier, id int64) (*TaskInfo, error)
- func (c *Client) ListTasks(ctx context.Context, params *ListParams) (*ListResult, error)
- func (c *Client) ListTasksTx(ctx context.Context, tx asynqpg.Querier, params *ListParams) (*ListResult, error)
- func (c *Client) RetryTask(ctx context.Context, id int64) (*TaskInfo, error)
- func (c *Client) RetryTaskTx(ctx context.Context, tx asynqpg.Querier, id int64) (*TaskInfo, error)
- type Config
- type ListOrderField
- type ListParams
- func (p *ListParams) IDs(ids ...int64) *ListParams
- func (p *ListParams) Limit(n int) *ListParams
- func (p *ListParams) Offset(n int) *ListParams
- func (p *ListParams) OrderBy(field ListOrderField, order SortOrder) *ListParams
- func (p *ListParams) States(states ...asynqpg.TaskStatus) *ListParams
- func (p *ListParams) Types(types ...string) *ListParams
- type ListResult
- type SortOrder
- type TaskInfo
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTaskNotFound is returned when the requested task does not exist. ErrTaskNotFound = errors.New("task not found") // ErrTaskRunning is returned when an operation cannot be performed // because the task is currently being processed. ErrTaskRunning = errors.New("task is currently running") // ErrTaskAlreadyFinalized is returned when an operation cannot be performed // because the task is already in a terminal state (completed). ErrTaskAlreadyFinalized = errors.New("task is already in a terminal state") // ErrTaskAlreadyAvailable is returned when trying to retry a task // that is already in a pending state. ErrTaskAlreadyAvailable = errors.New("task is already available for processing") )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client provides task management and inspection operations. It is separate from Producer (which creates tasks) and Consumer (which processes them).
func New ¶
New creates a new Client with the given configuration.
Example ¶
package main
import (
"log"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/yakser/asynqpg/client"
)
func main() {
db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
if err != nil {
log.Fatal(err)
}
cl, err := client.New(client.Config{Pool: db})
if err != nil {
log.Fatal(err)
}
_ = cl
}
func (*Client) CancelTask ¶
CancelTask cancels a task by its ID.
Behavior by current task status:
- pending: immediately set to cancelled with finalized_at = now()
- failed: immediately set to cancelled with finalized_at = now()
- running: set to cancelled; the consumer detects this and cancels the handler's context
- cancelled: no-op, returns the existing task (idempotent)
- completed: returns ErrTaskAlreadyFinalized
- not found: returns ErrTaskNotFound
func (*Client) CancelTaskTx ¶
CancelTaskTx cancels a task using the provided executor (transaction).
func (*Client) DeleteTask ¶
DeleteTask deletes a task by its ID.
Running tasks (status = running) cannot be deleted. All other states are deletable. The returned TaskInfo contains the task data as it was before deletion.
Behavior by current task status:
- pending: deleted, returns the deleted task info
- failed: deleted, returns the deleted task info
- cancelled: deleted, returns the deleted task info
- completed: deleted, returns the deleted task info
- running: returns ErrTaskRunning (running tasks cannot be deleted)
- not found: returns ErrTaskNotFound
func (*Client) DeleteTaskTx ¶
DeleteTaskTx deletes a task using the provided executor (transaction).
func (*Client) GetTaskTx ¶
GetTaskTx returns the full information about a task using the provided executor.
func (*Client) ListTasks ¶
func (c *Client) ListTasks(ctx context.Context, params *ListParams) (*ListResult, error)
ListTasks returns tasks matching the given filters with pagination.
Example ¶
package main
import (
"context"
"log"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/yakser/asynqpg"
"github.com/yakser/asynqpg/client"
)
func main() {
db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
if err != nil {
log.Fatal(err)
}
cl, err := client.New(client.Config{Pool: db})
if err != nil {
log.Fatal(err)
}
result, err := cl.ListTasks(context.Background(), client.NewListParams().
States(asynqpg.TaskStatusFailed, asynqpg.TaskStatusPending).
Types("email:send").
Limit(50).
OrderBy(client.OrderByCreatedAt, client.SortDesc),
)
if err != nil {
log.Fatal(err)
}
_ = result
}
func (*Client) ListTasksTx ¶
func (c *Client) ListTasksTx(ctx context.Context, tx asynqpg.Querier, params *ListParams) (*ListResult, error)
ListTasksTx returns tasks matching the given filters using the provided executor.
func (*Client) RetryTask ¶
RetryTask moves a failed or cancelled task back to pending state.
If the task has exhausted all attempts (attempts_left = 0), it is set to 1 to allow at least one more processing attempt.
Behavior by current task status:
- failed: set to pending, clear finalized_at, ensure attempts_left >= 1
- cancelled: set to pending, clear finalized_at, ensure attempts_left >= 1
- pending: returns ErrTaskAlreadyAvailable
- running: returns ErrTaskRunning
- completed: returns ErrTaskAlreadyFinalized
- not found: returns ErrTaskNotFound
type Config ¶
type Config struct {
Pool asynqpg.Pool
Logger *slog.Logger
// TracerProvider for tracing. If nil, global OTel TracerProvider is used.
TracerProvider trace.TracerProvider
}
Config holds configuration for creating a new Client.
type ListOrderField ¶
type ListOrderField string
ListOrderField represents a column to sort results by.
const ( OrderByID ListOrderField = "id" OrderByCreatedAt ListOrderField = "created_at" OrderByUpdatedAt ListOrderField = "updated_at" OrderByBlockedTill ListOrderField = "blocked_till" )
type ListParams ¶
type ListParams struct {
// contains filtered or unexported fields
}
ListParams configures how tasks are listed and filtered. Use NewListParams() to create with defaults, then chain builder methods.
func NewListParams ¶
func NewListParams() *ListParams
NewListParams creates a new ListParams with default values.
func (*ListParams) IDs ¶
func (p *ListParams) IDs(ids ...int64) *ListParams
IDs filters tasks by specific IDs.
func (*ListParams) Limit ¶
func (p *ListParams) Limit(n int) *ListParams
Limit sets the maximum number of tasks to return. Must be between 1 and 10000. Values outside this range are clamped.
func (*ListParams) Offset ¶
func (p *ListParams) Offset(n int) *ListParams
Offset sets the number of tasks to skip (for pagination).
func (*ListParams) OrderBy ¶
func (p *ListParams) OrderBy(field ListOrderField, order SortOrder) *ListParams
OrderBy sets the sort field and direction.
func (*ListParams) States ¶
func (p *ListParams) States(states ...asynqpg.TaskStatus) *ListParams
States filters tasks by status.
func (*ListParams) Types ¶
func (p *ListParams) Types(types ...string) *ListParams
Types filters tasks by task type.
type ListResult ¶
ListResult contains the list of tasks and the total count matching the filters.
type TaskInfo ¶
type TaskInfo struct {
ID int64
Type string
Payload []byte
Status asynqpg.TaskStatus
IdempotencyToken *string
Messages []string
BlockedTill time.Time
AttemptsLeft int
AttemptsElapsed int
CreatedAt time.Time
UpdatedAt time.Time
FinalizedAt *time.Time
AttemptedAt *time.Time
}
TaskInfo represents the full state of a task as stored in the database. It is returned by Client methods for task inspection and management.