Documentation
¶
Index ¶
- func NewDefaultHandler(adapter handler.SyncHandlerAdapter, metricStorage handler.MetricStorage) handler.Sync
- type Client
- type DBProvider
- type Observer
- func (o Observer) JobCompleted(ctx context.Context, job bgjob.Job)
- func (o Observer) JobMovedToDlq(ctx context.Context, job bgjob.Job, err error)
- func (o Observer) JobRescheduled(ctx context.Context, job bgjob.Job, after time.Duration)
- func (o Observer) JobStarted(ctx context.Context, job bgjob.Job)
- func (o Observer) JobWillBeRetried(ctx context.Context, job bgjob.Job, after time.Duration, err error)
- func (o Observer) WorkerError(ctx context.Context, err error)
- type WorkerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewDefaultHandler ¶ added in v1.53.0
func NewDefaultHandler(adapter handler.SyncHandlerAdapter, metricStorage handler.MetricStorage) handler.Sync
NewDefaultHandler creates a handler with standard middleware applied. It wraps the provided adapter with metrics collection, panic recovery, and request ID propagation middleware.
The middleware stack is applied in the following order:
- RequestId - propagates request IDs to the context
- Recovery - catches panics and moves jobs to DLQ
- Metrics - records execution duration and job outcomes
Returns a Sync handler ready to be used with workers.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client manages background job workers and provides functionality for enqueuing and processing jobs. It handles worker lifecycle, including startup, graceful shutdown, and configuration updates.
The Client is safe for concurrent use by multiple goroutines.
func NewClient ¶
func NewClient(db DBProvider, logger log.Logger) *Client
NewClient creates a new Client instance with the provided database provider and logger. The client is ready to be configured with workers via Upgrade.
func (*Client) BulkEnqueue ¶
BulkEnqueue adds multiple jobs to the background job queue in a single operation. If any request in the list does not contain a RequestId, it inherits the main RequestId from the context (or generates one if not present).
Returns an error if the database connection cannot be established.
func (*Client) Close ¶
func (c *Client) Close()
Close gracefully shuts down all running workers. This method should be called when the application is stopping to ensure in-flight jobs are properly handled.
func (*Client) Enqueue ¶
Enqueue adds a single job to the background job queue. If the request does not contain a RequestId, it attempts to extract one from the context, or generates a new one if none is present.
Returns an error if the database connection cannot be established.
func (*Client) Upgrade ¶
func (c *Client) Upgrade(ctx context.Context, workerConfigs []WorkerConfig) error
Upgrade configures and starts workers based on the provided worker configurations. It gracefully stops any previously running workers before starting the new ones. Each worker polls its designated queue for jobs and processes them using the configured handler.
Returns an error if the database connection cannot be established or if the job store cannot be created.
type DBProvider ¶
DBProvider defines an interface for obtaining a database client.
type Observer ¶
type Observer struct {
bgjob.NoopObserver
// contains filtered or unexported fields
}
Observer implements the bgjob.Observer interface to provide logging and metrics collection for job lifecycle events. It records job state changes such as completion, retries, and failures to the dead letter queue.
func (Observer) JobCompleted ¶
JobCompleted is called when a job finishes successfully. It logs the completion and increments the success counter for the queue and job type.
func (Observer) JobMovedToDlq ¶
JobMovedToDlq is called when a job is moved to the dead letter queue after exhausting all retry attempts. It logs the error and increments the DLQ counter.
func (Observer) JobRescheduled ¶
JobRescheduled is called when a job is rescheduled for future execution. It logs the rescheduled time for the job.
func (Observer) JobStarted ¶
JobStarted is called when a job begins processing. It logs the job ID, request ID, and job type at debug level.
type WorkerConfig ¶
type WorkerConfig struct {
Queue string
Concurrency int
PollInterval time.Duration
Handle handler.SyncHandlerAdapter
}
WorkerConfig defines the configuration for a background job worker. It specifies the queue to poll, concurrency level, polling interval, and the handler that processes jobs.
func (WorkerConfig) GetConcurrency ¶
func (c WorkerConfig) GetConcurrency() int
GetConcurrency returns the configured concurrency level. If Concurrency is not set or is less than or equal to zero, it returns 1.
func (WorkerConfig) GetPollInterval ¶
func (c WorkerConfig) GetPollInterval() time.Duration
GetPollInterval returns the configured polling interval. If PollInterval is not set or is less than or equal to zero, it returns 1 second.