Documentation
¶
Index ¶
- func CaptureResult(ctx context.Context, data []byte)
- func WithResultCapture(ctx context.Context, rc *ResultCapture) context.Context
- type Enqueuer
- type HasJobs
- type JobConfig
- type JobHandlerFunc
- type JobOption
- type Queue
- type ResultCapture
- type ResultTask
- func (t *ResultTask[P, R]) Enqueue(ctx context.Context, payload P) (string, error)
- func (t *ResultTask[P, R]) EnqueueAt(ctx context.Context, payload P, runAt time.Time) (string, error)
- func (t *ResultTask[P, R]) EnqueueBatch(ctx context.Context, payloads []P) ([]string, error)
- func (t *ResultTask[P, R]) EnqueueBatchAt(ctx context.Context, payloads []P, runAt time.Time) ([]string, error)
- func (t *ResultTask[P, R]) Name() string
- func (t *ResultTask[P, R]) Register(q Queue)
- type TaskDefinition
- func (t *TaskDefinition[P]) Enqueue(ctx context.Context, payload P) (string, error)
- func (t *TaskDefinition[P]) EnqueueAt(ctx context.Context, payload P, runAt time.Time) (string, error)
- func (t *TaskDefinition[P]) EnqueueBatch(ctx context.Context, payloads []P) ([]string, error)
- func (t *TaskDefinition[P]) EnqueueBatchAt(ctx context.Context, payloads []P, runAt time.Time) ([]string, error)
- func (t *TaskDefinition[P]) Name() string
- func (t *TaskDefinition[P]) Register(q Queue)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CaptureResult ¶
CaptureResult writes a result into the context's ResultCapture. It is a no-op if no capture is present in the context.
func WithResultCapture ¶
func WithResultCapture(ctx context.Context, rc *ResultCapture) context.Context
WithResultCapture returns a context carrying the given ResultCapture.
Types ¶
type Enqueuer ¶
type Enqueuer interface {
Enqueue(ctx context.Context, typeName string, payload any) (string, error)
EnqueueAt(ctx context.Context, typeName string, payload any, runAt time.Time) (string, error)
EnqueueBatch(ctx context.Context, typeName string, payloads []any) ([]string, error)
EnqueueBatchAt(ctx context.Context, typeName string, payloads []any, runAt time.Time) ([]string, error)
Dequeue(ctx context.Context, id string) error
}
Enqueuer provides job submission and cancellation. Use this interface for code that only needs to enqueue jobs, not register handlers.
EnqueueBatch and EnqueueBatchAt insert N jobs of one type atomically: either every payload becomes a job or none does. Job IDs are returned in input order, an empty payloads slice returns (nil, nil) without touching the queue, and each job keeps the independent retry/priority semantics of its type — only the insert is batched.
type HasJobs ¶
type HasJobs interface {
RegisterJobs(q Queue)
}
HasJobs is implemented by apps that register background job handlers. Called by the Queue implementation during PostConfigure(), after all apps have been configured and before workers start. This guarantees that state set in Configure() is available when RegisterJobs is invoked.
type JobHandlerFunc ¶
JobHandlerFunc is the signature for job handler functions. The context carries a deadline from the worker's shutdown timeout. Payload is the raw JSON bytes that were passed to Enqueue.
type JobOption ¶
type JobOption func(*JobConfig)
JobOption configures job handler registration.
func WithMaxRetries ¶
WithMaxRetries sets the maximum number of retries for a job type.
func WithPriority ¶
WithPriority sets the default priority for a job type. Higher values mean higher urgency — priority 10 jobs are claimed before priority 0 jobs. The default priority is 0.
type Queue ¶
type Queue interface {
Enqueuer
Handle(typeName string, fn JobHandlerFunc, opts ...JobOption)
}
Queue provides job handler registration, enqueueing, and cancellation. contrib/jobs provides a Den-backed implementation that works on both SQLite and Postgres, and optionally against a database separate from the application's shared DB (see the jobs-database-dsn flag).
type ResultCapture ¶
type ResultCapture struct {
// contains filtered or unexported fields
}
ResultCapture collects a handler's return value so the worker can persist it. The worker injects a ResultCapture into the handler context; result-aware handlers (via ResultTask) write their marshalled result into it.
func (*ResultCapture) Result ¶
func (rc *ResultCapture) Result() string
Result returns the captured JSON result, or empty string if nothing was captured.
type ResultTask ¶
type ResultTask[P, R any] struct { // contains filtered or unexported fields }
ResultTask is a type-safe wrapper for a job type whose handler returns both a result value and an error. It handles JSON marshalling of both payload and result automatically. The result is communicated back to the worker via the context's ResultCapture.
Create with DefineResultTask, wire with Register, enqueue with Enqueue:
var compute = tasks.DefineResultTask[Input, Output]("compute",
func(ctx context.Context, in Input) (Output, error) { ... },
)
func (a *App) RegisterJobs(q tasks.Queue) { compute.Register(q) }
compute.Enqueue(ctx, Input{...})
func DefineResultTask ¶
func DefineResultTask[P, R any](name string, handler func(context.Context, P) (R, error), opts ...JobOption) *ResultTask[P, R]
DefineResultTask creates a typed task definition with a result-returning handler. Call Register in your RegisterJobs method to wire it to a Queue.
func (*ResultTask[P, R]) Enqueue ¶
func (t *ResultTask[P, R]) Enqueue(ctx context.Context, payload P) (string, error)
Enqueue marshals the payload and enqueues the task for immediate processing. Panics if called before Register.
func (*ResultTask[P, R]) EnqueueAt ¶
func (t *ResultTask[P, R]) EnqueueAt(ctx context.Context, payload P, runAt time.Time) (string, error)
EnqueueAt marshals the payload and enqueues the task for processing at the given time. Panics if called before Register.
func (*ResultTask[P, R]) EnqueueBatch ¶ added in v0.30.0
func (t *ResultTask[P, R]) EnqueueBatch(ctx context.Context, payloads []P) ([]string, error)
EnqueueBatch enqueues all payloads for immediate processing in one atomic insert (see Enqueuer for the batch contract). Panics if called before Register.
func (*ResultTask[P, R]) EnqueueBatchAt ¶ added in v0.30.0
func (t *ResultTask[P, R]) EnqueueBatchAt(ctx context.Context, payloads []P, runAt time.Time) ([]string, error)
EnqueueBatchAt enqueues all payloads for processing at the given time in one atomic insert (see Enqueuer for the batch contract). Panics if called before Register.
func (*ResultTask[P, R]) Name ¶
func (t *ResultTask[P, R]) Name() string
Name returns the task type name.
func (*ResultTask[P, R]) Register ¶
func (t *ResultTask[P, R]) Register(q Queue)
Register wires the task to a Queue. It registers a JobHandlerFunc that unmarshals the payload, calls the typed handler, marshals the result into the context's ResultCapture, and returns the error.
type TaskDefinition ¶
type TaskDefinition[P any] struct { // contains filtered or unexported fields }
TaskDefinition is a type-safe wrapper for a job type. It handles JSON marshalling/unmarshalling of the payload automatically, ensuring that the enqueue site and handler always agree on the payload type at compile time.
Create a TaskDefinition with DefineTask, wire it to a Queue with Register (typically in your RegisterJobs method), then use Enqueue to submit work:
var sendEmail = tasks.DefineTask[EmailPayload]("send-email",
func(ctx context.Context, p EmailPayload) error { ... },
tasks.WithMaxRetries(5),
)
func (a *App) RegisterJobs(q tasks.Queue) { sendEmail.Register(q) }
sendEmail.Enqueue(ctx, EmailPayload{To: "x@y.com"})
func DefineTask ¶
func DefineTask[P any](name string, handler func(context.Context, P) error, opts ...JobOption) *TaskDefinition[P]
DefineTask creates a typed task definition. Call Register in your RegisterJobs method to wire it to a Queue before enqueueing work.
func (*TaskDefinition[P]) Enqueue ¶
func (t *TaskDefinition[P]) Enqueue(ctx context.Context, payload P) (string, error)
Enqueue marshals the payload and enqueues the task for immediate processing. Panics if called before Register.
func (*TaskDefinition[P]) EnqueueAt ¶
func (t *TaskDefinition[P]) EnqueueAt(ctx context.Context, payload P, runAt time.Time) (string, error)
EnqueueAt marshals the payload and enqueues the task for processing at the given time. Panics if called before Register.
func (*TaskDefinition[P]) EnqueueBatch ¶ added in v0.30.0
func (t *TaskDefinition[P]) EnqueueBatch(ctx context.Context, payloads []P) ([]string, error)
EnqueueBatch enqueues all payloads for immediate processing in one atomic insert (see Enqueuer for the batch contract). Panics if called before Register.
func (*TaskDefinition[P]) EnqueueBatchAt ¶ added in v0.30.0
func (t *TaskDefinition[P]) EnqueueBatchAt(ctx context.Context, payloads []P, runAt time.Time) ([]string, error)
EnqueueBatchAt enqueues all payloads for processing at the given time in one atomic insert (see Enqueuer for the batch contract). Panics if called before Register.
func (*TaskDefinition[P]) Name ¶
func (t *TaskDefinition[P]) Name() string
Name returns the task type name.
func (*TaskDefinition[P]) Register ¶
func (t *TaskDefinition[P]) Register(q Queue)
Register wires the task to a Queue. It registers a JobHandlerFunc that automatically unmarshals the JSON payload into P before calling the typed handler, and stores the queue reference for Enqueue/EnqueueAt.