worker

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2025 License: Apache-2.0, MIT Imports: 17 Imported by: 0

Documentation

Overview

Package jobqueue provides a Worker which can run registered job [Func]s by name, when a message for it is received on the underlying queue.

It provides:

  • Limit on how many jobs can be run simultaneously
  • Automatic message timeout extension while the job is running
  • Graceful shutdown

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Permanent

func Permanent(err error) error

Permanent wraps the given err in a *PermanentError.

Types

type Config

type Config struct {
	Log           logger.StandardLogger
	JobCountLimit int
	PollInterval  time.Duration
	Extend        time.Duration
	QueueName     string
}

Config holds all parameters needed to initialize a Worker.

type JobFn

type JobFn[T any] = func(ctx context.Context, msg T) error

JobFn is the job function to run.

type JobOption

type JobOption[T any] func(*jobRegistration[T])

JobOption configures a job registration

func WithOnFailure

func WithOnFailure[T any](onFailure OnFailureFn[T]) JobOption[T]

WithOnFailure sets a callback to be invoked when the job fails after max retries or returns a PermanentError The Worker only supports a single OnFailure callback for a job, multiple OnFailure options must not be provided.

type OnFailureFn

type OnFailureFn[T any] = func(ctx context.Context, msg T, err error) error

OnFailureFn is the function that runs if the job never completes successfully after all retries or returns a PermanentError.

type Option

type Option func(*Config)

Option modifies a Config before creating the Worker.

func WithExtend

func WithExtend(d time.Duration) Option

WithExtend configures the frequency running jobs are polled for completion

func WithLimit

func WithLimit(limit int) Option

func WithLog

func WithLog(l logger.StandardLogger) Option

func WithPollInterval

func WithPollInterval(interval time.Duration) Option

func WithQueueName added in v0.2.1

func WithQueueName(name string) Option

WithQueueName sets the queue name for telemetry labels.

type PermanentError

type PermanentError struct {
	Err error
}

PermanentError signals that the operation should not be retried.

func (*PermanentError) Error

func (e *PermanentError) Error() string

Error returns a string representation of the Permanent error.

func (*PermanentError) Unwrap

func (e *PermanentError) Unwrap() error

Unwrap returns the wrapped error.

type Worker

type Worker[T any] struct {
	// contains filtered or unexported fields
}

func New

func New[T any](q queue.Interface, ser serializer.Serializer[T], options ...Option) (*Worker[T], error)

func (*Worker[T]) Enqueue

func (r *Worker[T]) Enqueue(ctx context.Context, name string, msg T) error

func (*Worker[T]) EnqueueTx

func (r *Worker[T]) EnqueueTx(ctx context.Context, tx *sql.Tx, name string, msg T) error

func (*Worker[T]) Register

func (r *Worker[T]) Register(name string, fn JobFn[T], opts ...JobOption[T]) error

Register must be called before `Start`

func (*Worker[T]) Start

func (r *Worker[T]) Start(ctx context.Context)

Start the Worker, blocking until the given context is cancelled. When the context is cancelled, waits for the jobs to finish.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL