tasks

package
v0.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CaptureResult

func CaptureResult(ctx context.Context, data []byte)

CaptureResult writes a result into the context's ResultCapture. It is a no-op if no capture is present in the context.

func WithResultCapture

func WithResultCapture(ctx context.Context, rc *ResultCapture) context.Context

WithResultCapture returns a context carrying the given ResultCapture.

Types

type Enqueuer

type Enqueuer interface {
	Enqueue(ctx context.Context, typeName string, payload any) (string, error)
	EnqueueAt(ctx context.Context, typeName string, payload any, runAt time.Time) (string, error)
	EnqueueBatch(ctx context.Context, typeName string, payloads []any) ([]string, error)
	EnqueueBatchAt(ctx context.Context, typeName string, payloads []any, runAt time.Time) ([]string, error)
	Dequeue(ctx context.Context, id string) error
}

Enqueuer provides job submission and cancellation. Use this interface for code that only needs to enqueue jobs, not register handlers.

EnqueueBatch and EnqueueBatchAt insert N jobs of one type atomically: either every payload becomes a job or none does. Job IDs are returned in input order, an empty payloads slice returns (nil, nil) without touching the queue, and each job keeps the independent retry/priority semantics of its type — only the insert is batched.

type HasJobs

type HasJobs interface {
	RegisterJobs(q Queue)
}

HasJobs is implemented by apps that register background job handlers. Called by the Queue implementation during PostConfigure(), after all apps have been configured and before workers start. This guarantees that state set in Configure() is available when RegisterJobs is invoked.

type JobConfig

type JobConfig struct {
	MaxRetries int
	Priority   int
}

JobConfig holds per-handler configuration.

type JobHandlerFunc

type JobHandlerFunc func(ctx context.Context, payload []byte) error

JobHandlerFunc is the signature for job handler functions. The context carries a deadline from the worker's shutdown timeout. Payload is the raw JSON bytes that were passed to Enqueue.

type JobOption

type JobOption func(*JobConfig)

JobOption configures job handler registration.

func WithMaxRetries

func WithMaxRetries(n int) JobOption

WithMaxRetries sets the maximum number of retries for a job type.

func WithPriority

func WithPriority(n int) JobOption

WithPriority sets the default priority for a job type. Higher values mean higher urgency — priority 10 jobs are claimed before priority 0 jobs. The default priority is 0.

type Queue

type Queue interface {
	Enqueuer
	Handle(typeName string, fn JobHandlerFunc, opts ...JobOption)
}

Queue provides job handler registration, enqueueing, and cancellation. contrib/jobs provides a Den-backed implementation that works on both SQLite and Postgres, and optionally against a database separate from the application's shared DB (see the jobs-database-dsn flag).

type ResultCapture

type ResultCapture struct {
	// contains filtered or unexported fields
}

ResultCapture collects a handler's return value so the worker can persist it. The worker injects a ResultCapture into the handler context; result-aware handlers (via ResultTask) write their marshalled result into it.

func (*ResultCapture) Result

func (rc *ResultCapture) Result() string

Result returns the captured JSON result, or empty string if nothing was captured.

type ResultTask

type ResultTask[P, R any] struct {
	// contains filtered or unexported fields
}

ResultTask is a type-safe wrapper for a job type whose handler returns both a result value and an error. It handles JSON marshalling of both payload and result automatically. The result is communicated back to the worker via the context's ResultCapture.

Create with DefineResultTask, wire with Register, enqueue with Enqueue:

var compute = tasks.DefineResultTask[Input, Output]("compute",
    func(ctx context.Context, in Input) (Output, error) { ... },
)

func (a *App) RegisterJobs(q tasks.Queue) { compute.Register(q) }

compute.Enqueue(ctx, Input{...})

func DefineResultTask

func DefineResultTask[P, R any](name string, handler func(context.Context, P) (R, error), opts ...JobOption) *ResultTask[P, R]

DefineResultTask creates a typed task definition with a result-returning handler. Call Register in your RegisterJobs method to wire it to a Queue.

func (*ResultTask[P, R]) Enqueue

func (t *ResultTask[P, R]) Enqueue(ctx context.Context, payload P) (string, error)

Enqueue marshals the payload and enqueues the task for immediate processing. Panics if called before Register.

func (*ResultTask[P, R]) EnqueueAt

func (t *ResultTask[P, R]) EnqueueAt(ctx context.Context, payload P, runAt time.Time) (string, error)

EnqueueAt marshals the payload and enqueues the task for processing at the given time. Panics if called before Register.

func (*ResultTask[P, R]) EnqueueBatch added in v0.30.0

func (t *ResultTask[P, R]) EnqueueBatch(ctx context.Context, payloads []P) ([]string, error)

EnqueueBatch enqueues all payloads for immediate processing in one atomic insert (see Enqueuer for the batch contract). Panics if called before Register.

func (*ResultTask[P, R]) EnqueueBatchAt added in v0.30.0

func (t *ResultTask[P, R]) EnqueueBatchAt(ctx context.Context, payloads []P, runAt time.Time) ([]string, error)

EnqueueBatchAt enqueues all payloads for processing at the given time in one atomic insert (see Enqueuer for the batch contract). Panics if called before Register.

func (*ResultTask[P, R]) Name

func (t *ResultTask[P, R]) Name() string

Name returns the task type name.

func (*ResultTask[P, R]) Register

func (t *ResultTask[P, R]) Register(q Queue)

Register wires the task to a Queue. It registers a JobHandlerFunc that unmarshals the payload, calls the typed handler, marshals the result into the context's ResultCapture, and returns the error.

type TaskDefinition

type TaskDefinition[P any] struct {
	// contains filtered or unexported fields
}

TaskDefinition is a type-safe wrapper for a job type. It handles JSON marshalling/unmarshalling of the payload automatically, ensuring that the enqueue site and handler always agree on the payload type at compile time.

Create a TaskDefinition with DefineTask, wire it to a Queue with Register (typically in your RegisterJobs method), then use Enqueue to submit work:

var sendEmail = tasks.DefineTask[EmailPayload]("send-email",
    func(ctx context.Context, p EmailPayload) error { ... },
    tasks.WithMaxRetries(5),
)

func (a *App) RegisterJobs(q tasks.Queue) { sendEmail.Register(q) }

sendEmail.Enqueue(ctx, EmailPayload{To: "x@y.com"})

func DefineTask

func DefineTask[P any](name string, handler func(context.Context, P) error, opts ...JobOption) *TaskDefinition[P]

DefineTask creates a typed task definition. Call Register in your RegisterJobs method to wire it to a Queue before enqueueing work.

func (*TaskDefinition[P]) Enqueue

func (t *TaskDefinition[P]) Enqueue(ctx context.Context, payload P) (string, error)

Enqueue marshals the payload and enqueues the task for immediate processing. Panics if called before Register.

func (*TaskDefinition[P]) EnqueueAt

func (t *TaskDefinition[P]) EnqueueAt(ctx context.Context, payload P, runAt time.Time) (string, error)

EnqueueAt marshals the payload and enqueues the task for processing at the given time. Panics if called before Register.

func (*TaskDefinition[P]) EnqueueBatch added in v0.30.0

func (t *TaskDefinition[P]) EnqueueBatch(ctx context.Context, payloads []P) ([]string, error)

EnqueueBatch enqueues all payloads for immediate processing in one atomic insert (see Enqueuer for the batch contract). Panics if called before Register.

func (*TaskDefinition[P]) EnqueueBatchAt added in v0.30.0

func (t *TaskDefinition[P]) EnqueueBatchAt(ctx context.Context, payloads []P, runAt time.Time) ([]string, error)

EnqueueBatchAt enqueues all payloads for processing at the given time in one atomic insert (see Enqueuer for the batch contract). Panics if called before Register.

func (*TaskDefinition[P]) Name

func (t *TaskDefinition[P]) Name() string

Name returns the task type name.

func (*TaskDefinition[P]) Register

func (t *TaskDefinition[P]) Register(q Queue)

Register wires the task to a Queue. It registers a JobHandlerFunc that automatically unmarshals the JSON payload into P before calling the typed handler, and stores the queue reference for Enqueue/EnqueueAt.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL